LearningCore

Pleasant machine-learning library inspired by ``requests``. Design goals: * The pandas / sklearn ``X``/``y`` split is hidden. Users work with two opaque types — ``DataFrame`` (all columns intact) and ``Dataset`` (features + a designated target column). * Classification and regression are distinguished at the type level (``SklearnClassifier`` vs ``SklearnRegressor``). Metrics are typed against the right kind of model — ``accuracy`` will not type-check against a regressor, ``mse`` will not type-check against a classifier. * A few dataset-consuming operations are linear (``(1 ds: Dataset)``) so callers who bind their source with ``let 1 df = ...`` get a type error if they keep using the pre-transformed dataset. * Refinements carry shape and class invariants. Common ML bugs are rejected at compile time. ─── Pipeline-defect detection (Silva 2024) ───────────────────────────── This library encodes the component contracts from Pedro Silva's MSc thesis *"Static Analysis for Detection of Defects in Machine Learning Pipelines"* (ULisboa 2024). The thesis specifies each component with first-order pre/post-conditions over per-line predicates (``lineInDs``, ``lineCausality``, ``synthesizedLine`` …). Aeon's liquid refinements are quantifier-free, so instead of quantifying over lines we lift those predicates to **dataset-level summary flags** whose values are *fully determined* by each component's post-condition (biconditionals, not implications). This keeps everything decidable and needs no global axioms — the propagation laws live entirely in the contracts below. Summary flags (all uninterpreted ``Dataset -> Bool``): * ``is_timeseries`` — instances are chronologically ordered. * ``has_missing`` — some feature has missing values. * ``coupled`` — a whole-dataset transform (scaling, imputation, resampling, SMOTE) has made rows statistically depend on one another. Splitting a *coupled* dataset leaks information across the partition. * ``clean_test`` — this dataset is a held-out test set produced by a *safe* split (source not coupled, and not a shuffled time series). Only ``split`` ever sets it true, so evaluating on anything else is a type error. * ``independent_target`` — the target does not depend on any feature (no proxy / leaky feature). Defects caught at compile time: - Pre-processing (scale / impute / resample / SMOTE) **before** the train/test split → ``coupled`` source → ``clean_test`` is false. - Temporal leakage: shuffling a time series before splitting → ``clean_test`` is false. - Evaluating on the training data / on a non-split dataset → ``clean_test`` cannot be proven true. - Feeding missing values to an estimator → ``has_missing`` precondition. - Too few instances (``ds_rows <= ds_features * 10``) → ``split`` precondition (decidable for ``create_dataset``; assert it on the CSV path with ``require_enough_data`` before splitting). - A balance-sensitive metric (``accuracy`` / ``precision`` / ``recall``) on an imbalanced test set → ``is_balanced`` precondition. Runtime witnesses (``is_multiclass``, ``is_splittable``, ``is_nonempty``, ``balanced``) refine their Bool return type so they can discharge the corresponding precondition inside an ``if ... then ... else ...`` guard. ─── Opaque types ───────────────────────────────────────────────────────
Imports
open List; open Tuple; open Pair; import Tensor (Vector);
Table of Contents

Types

DataFrame

(type)
type DataFrame

Dataset

(type)
type Dataset

SklearnClassifier

(type)
type SklearnClassifier

SklearnRegressor

(type)
type SklearnRegressor

Functions

read_csv

Read a CSV file into a ``DataFrame`` (all columns kept).
def read_csv (path: String) : {df : DataFrame | df_cols df ≥ 1 && df_rows df ≥ 0}

target

Designate ``column`` as the target. Consumes the source DataFrame. ``temporal`` declares whether the rows are chronologically ordered and ``missing`` whether any feature still has missing values — both are modelling facts the static analyser cannot infer from a CSV path.
def target (df: DataFrame) (column: String) (temporal: Bool) (missing: Bool) : {ds : Dataset | ds_rows ds = df_rows df && (ds_features ds = df_cols df - 1 && (ds_target ds column = True && (is_timeseries ds = temporal && (has_missing ds = missing && (coupled ds = False && independent_target ds = True)))))}

target_at

Designate the column at position ``index`` (0-based) as the target.
def target_at (df: DataFrame) (index: {n : Int | n ≥ 0 && n < df_cols df}) (temporal: Bool) (missing: Bool) : {ds : Dataset | ds_rows ds = df_rows df && (ds_features ds = df_cols df - 1 && (is_timeseries ds = temporal && (has_missing ds = missing && (coupled ds = False && independent_target ds = True))))}

load

def load (path: String) (temporal: Bool) (missing: Bool) : {ds : Dataset | ds_features ds ≥ 0 && (ds_rows ds ≥ 0 && (is_timeseries ds = temporal && (has_missing ds = missing && (coupled ds = False && independent_target ds = True))))}

create_dataset

─── Synthetic dataset (thesis-style static modelling) ────────────────── ``create_dataset`` declares concrete row/feature counts and balance, so shape-dependent checks (minimum data, metric suitability) become statically decidable — mirroring the thesis ``create_dataset`` component used throughout its evaluation pipelines.
def create_dataset (rows: {n : Int | n ≥ 0}) (features: {n : Int | n ≥ 0}) (bal: Bool) (temporal: Bool) (missing: Bool) : {ds : Dataset | ds_rows ds = rows && (ds_features ds = features && (is_balanced ds = bal && (is_timeseries ds = temporal && (has_missing ds = missing && (coupled ds = False && independent_target ds = True)))))}

require_enough_data

Trusted runtime assertion that a (statically unknown) dataset has at least an order of magnitude more rows than features. Returns the dataset unchanged but introduces the ``ds_rows > ds_features * 10`` fact; raises at runtime if violated. Use it to discharge a trainer's minimum-data precondition on the CSV path, where counts are unknown.
def require_enough_data (ds: Dataset) : {d : Dataset | d = ds && ds_rows d > ds_features d * 10}

columns

def columns (df: DataFrame) : List

has_column

def has_column (df: DataFrame) (column: String) : Bool

n_columns

def n_columns (df: DataFrame) : {n : Int | n = df_cols df && n ≥ 0}

n_rows

def n_rows (ds: Dataset) : {n : Int | n = ds_rows ds && n ≥ 0}

n_features

def n_features (ds: Dataset) : {n : Int | n = ds_features ds && n ≥ 0}

balanced

Runtime witness for ``is_balanced``.
def balanced (ds: Dataset) : {b : Bool | b = is_balanced ds}

is_nonempty

Runtime witnesses that discharge precondition refinements inside an ``if ... then ... else ...`` branch.
def is_nonempty (ds: Dataset) : {b : Bool | b = ds_rows ds > 0}

is_splittable

def is_splittable (ds: Dataset) : {b : Bool | b = ds_rows ds ≥ 2}

is_multiclass

def is_multiclass (ds: Dataset) : {b : Bool | b = ds_classes ds ≥ 2}

class_counts

List of ``[label, count]`` pairs sorted by label.
def class_counts (ds: Dataset) : List

split

Linear split into ``(training, testing)``. Consumes the source so training on the un-split dataset becomes a type error. ``shuffle`` selects whether rows are randomised before partitioning. The two halves preserve the source's feature/class counts and the defect flags (``is_timeseries`` / ``has_missing`` / ``coupled`` / ``independent_target``). Balance is preserved *only when shuffling* (the thesis: an unshuffled split of class-ordered data is imbalanced). The testing half is stamped ``clean_test`` iff the split is sound: the source is not coupled and is not a shuffled time series. That single flag is what every metric below checks, so any pre-split contamination or temporal leakage surfaces at evaluation time.
def split (ds: {d : Dataset | ds_rows d > ds_features d * 10}) (fraction: {f : Float | 0.0 < f && f < 1.0}) (shuffle: Bool) : {p : Pair Dataset Dataset | ds_features (Pair_mk_fst p) = ds_features ds && (ds_features (Pair_mk_snd p) = ds_features ds && (ds_classes (Pair_mk_fst p) = ds_classes ds && (ds_classes (Pair_mk_snd p) = ds_classes ds && (is_timeseries (Pair_mk_fst p) = is_timeseries ds && (is_timeseries (Pair_mk_snd p) = is_timeseries ds && (has_missing (Pair_mk_fst p) = has_missing ds && (has_missing (Pair_mk_snd p) = has_missing ds && (coupled (Pair_mk_fst p) = coupled ds && (coupled (Pair_mk_snd p) = coupled ds && (independent_target (Pair_mk_fst p) = independent_target ds && (independent_target (Pair_mk_snd p) = independent_target ds && (--> shuffle (is_balanced (Pair_mk_fst p) = is_balanced ds) && (--> shuffle (is_balanced (Pair_mk_snd p) = is_balanced ds) && clean_test (Pair_mk_snd p) = (coupled ds = False && --> (is_timeseries ds) (shuffle = False)))))))))))))))}

train_of

Runtime accessors for the training / testing halves, refined so the returned dataset is linked to the source ``Pair`` via the auto- generated projection. ``let train = train_of parts`` makes ``train == Pair_mk_fst parts`` visible to the SMT solver, so refinements about ``Pair_mk_fst parts`` (from the split's output type) carry to ``train`` directly.
def train_of (p: Pair Dataset Dataset) : {d : Dataset | d = Pair_mk_fst p}

test_of

def test_of (p: Pair Dataset Dataset) : {d : Dataset | d = Pair_mk_snd p}

smote

SMOTE oversampling; requires multi-class input (guard with ``is_multiclass``). Output is balanced and preserves the feature count; row count only ever grows.
def smote (ds: {d : Dataset | ds_classes d ≥ 2}) : {out : Dataset | is_balanced out = True && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (ds_rows out ≥ ds_rows ds && (coupled out = True && (has_missing out = has_missing ds && (is_timeseries out = is_timeseries ds && independent_target out = independent_target ds))))))}

random_oversample

def random_oversample (ds: {d : Dataset | ds_classes d ≥ 2}) : {out : Dataset | is_balanced out = True && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (ds_rows out ≥ ds_rows ds && (coupled out = True && (has_missing out = has_missing ds && (is_timeseries out = is_timeseries ds && independent_target out = independent_target ds))))))}

random_undersample

def random_undersample (ds: {d : Dataset | ds_classes d ≥ 2}) : {out : Dataset | is_balanced out = True && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (ds_rows out ≤ ds_rows ds && (coupled out = True && (has_missing out = has_missing ds && (is_timeseries out = is_timeseries ds && independent_target out = independent_target ds))))))}

standard_scale

def standard_scale (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (coupled out = True && (has_missing out = has_missing ds && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

minmax_scale

def minmax_scale (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (coupled out = True && (has_missing out = has_missing ds && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

robust_scale

def robust_scale (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (coupled out = True && (has_missing out = has_missing ds && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

fill_mean

def fill_mean (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (has_missing out = False && (coupled out = True && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

fill_median

def fill_median (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (has_missing out = False && (coupled out = True && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

fill_most_frequent

def fill_most_frequent (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (has_missing out = False && (coupled out = True && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

fill_constant

def fill_constant (value: Float) (ds: Dataset) : {out : Dataset | ds_rows out = ds_rows ds && (ds_features out = ds_features ds && (ds_classes out = ds_classes ds && (is_balanced out = is_balanced ds && (has_missing out = False && (coupled out = True && (is_timeseries out = is_timeseries ds && (independent_target out = independent_target ds && clean_test out = clean_test ds)))))))}

predict

def predict (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d}) : List

predict_proba

def predict_proba (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d}) : List

predict_value

def predict_value (model: SklearnRegressor) (ds: {d : Dataset | reg_features model = ds_features d}) : List

accuracy

def accuracy (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && (clean_test d = True && is_balanced d = True)}) : {f : Float | 0.0 ≤ f && f ≤ 1.0}

precision

def precision (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && (clean_test d = True && is_balanced d = True)}) : {f : Float | 0.0 ≤ f && f ≤ 1.0}

recall

def recall (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && (clean_test d = True && is_balanced d = True)}) : {f : Float | 0.0 ≤ f && f ≤ 1.0}

f1

def f1 (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && clean_test d = True}) : {f : Float | 0.0 ≤ f && f ≤ 1.0}

roc_auc

def roc_auc (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && clean_test d = True}) : {f : Float | 0.0 ≤ f && f ≤ 1.0}

confusion_matrix

def confusion_matrix (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && clean_test d = True}) : List

classification_report

def classification_report (model: SklearnClassifier) (ds: {d : Dataset | clf_features model = ds_features d && clean_test d = True}) : String

mse

def mse (model: SklearnRegressor) (ds: {d : Dataset | reg_features model = ds_features d && clean_test d = True}) : {f : Float | f ≥ 0.0}

rmse

def rmse (model: SklearnRegressor) (ds: {d : Dataset | reg_features model = ds_features d && clean_test d = True}) : {f : Float | f ≥ 0.0}

mae

def mae (model: SklearnRegressor) (ds: {d : Dataset | reg_features model = ds_features d && clean_test d = True}) : {f : Float | f ≥ 0.0}

r2

def r2 (model: SklearnRegressor) (ds: {d : Dataset | reg_features model = ds_features d && clean_test d = True}) : Float

explained_variance

def explained_variance (model: SklearnRegressor) (ds: {d : Dataset | reg_features model = ds_features d && clean_test d = True}) : Float

classify_one

Class-probability vector for one feature vector. Class-score vector for one feature vector. Uses ``predict_proba`` when the estimator provides it, otherwise a one-hot of the hard prediction (so even non-probabilistic classifiers — LinearSVC, RidgeClassifier, Perceptron … — satisfy the ``Models.Classifier`` interface).
def classify_one (model: SklearnClassifier) (x: Vector) : Vector

regress_one

Predicted value for one feature vector.
def regress_one (model: SklearnRegressor) (x: Vector) : Float

clf_n_features

Runtime model introspection, refined against the shape measures.
def clf_n_features (model: SklearnClassifier) : {n : Int | n = clf_features model && n ≥ 0}

clf_n_classes

def clf_n_classes (model: SklearnClassifier) : {n : Int | n = clf_classes model && n ≥ 0}

reg_n_features

def reg_n_features (model: SklearnRegressor) : {n : Int | n = reg_features model && n ≥ 0}

Uninterpreted

Functions declared as def f ... = uninterpreted: only their signature is known to the verifier; they have no body.

df_rows

uninterpreted
def df_rows : (df : DataFrame) → Int

df_cols

uninterpreted
def df_cols : (df : DataFrame) → Int

ds_rows

uninterpreted
def ds_rows : (ds : Dataset) → Int

ds_features

uninterpreted
def ds_features : (ds : Dataset) → Int

ds_classes

uninterpreted
def ds_classes : (ds : Dataset) → Int

is_balanced

uninterpreted
Every class label appears the same number of times.
def is_balanced : (ds : Dataset) → Bool

clf_features

uninterpreted
Shape of a fitted model.
def clf_features : (m : SklearnClassifier) → Int

clf_classes

uninterpreted
def clf_classes : (m : SklearnClassifier) → Int

reg_features

uninterpreted
def reg_features : (m : SklearnRegressor) → Int

is_timeseries

uninterpreted
Instances are indexed in chronological order (time-stamped data).
def is_timeseries : (ds : Dataset) → Bool

has_missing

uninterpreted
Some feature still has missing values.
def has_missing : (ds : Dataset) → Bool

coupled

uninterpreted
A whole-dataset transform has coupled the rows (so a later split leaks).
def coupled : (ds : Dataset) → Bool

clean_test

uninterpreted
Produced by a safe split — sound to use as a held-out test set.
def clean_test : (ds : Dataset) → Bool

independent_target

uninterpreted
The target does not depend on any feature (no proxy/leaky feature).
def independent_target : (ds : Dataset) → Bool

ds_target

uninterpreted
The dataset's target column has the given name.
def ds_target : (ds : Dataset) → (name : String) → Bool