Skip to content

Drift Detection & Sampling

Monitoring score changes over time and selecting representative eval samples. Drift detection catches performance regressions before they reach users. Stratified sampling gives you representative coverage while keeping evaluation costs manageable.


Drift Detection

Why Drift Detection?

LLM performance degrades over time -- model updates, data distribution shifts, prompt changes, upstream API changes. A system that scored 92% last month may silently drop to 84% this month. Drift detection catches these regressions automatically.

detect_drift

from latent.stats import detect_drift

result = detect_drift(
    baseline_scores=baseline_scores,
    current_scores=current_scores,
    metric_name="accuracy",
    score_type="binary",
    confidence_level=0.95,
    seed=None,
)

Compares two eval runs to detect statistically significant performance changes. The method is chosen automatically based on score_type:

  • Binary scores: Bootstrap CI on the difference of means
  • Ordinal scores: Mann-Whitney U test

Parameters

Name Type Default Description
baseline_scores np.ndarray required Scores from the reference run
current_scores np.ndarray required Scores from the current run
metric_name str "" Label for the metric (used in reporting)
score_type str "binary" "binary" or "ordinal"
confidence_level float 0.95 Confidence level for the CI
seed int \| None None Random seed for reproducibility

Returns: DriftResult with fields severity, delta, ci_lower, ci_upper, p_value, effect_size, metric_name, baseline_value, and current_value.

Example -- comparing weekly evaluation runs:

import numpy as np
from latent.stats import detect_drift

last_week_scores = np.array([1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1])
this_week_scores = np.array([1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1])

result = detect_drift(
    baseline_scores=last_week_scores,
    current_scores=this_week_scores,
    metric_name="accuracy",
    score_type="binary",
)
print(f"Severity: {result.severity}")  # "no_drift", "warning", or "alert"
print(f"Delta: {result.delta:+.3f} (p={result.p_value:.4f})")
print(f"Effect size: {result.effect_size:.3f}")

Severity Levels

Severity is determined by combining statistical significance (p-value) with practical significance (effect size):

Severity Condition Action
no_drift p > 0.05 or small effect No action needed
warning p < 0.05 with small-medium effect Investigate the cause
alert p < 0.01 with large effect Immediate attention required

Effect size matters

A large dataset can produce a tiny p-value for a meaningless difference. Severity uses effect size (Cohen's d for binary, rank-biserial for ordinal) to avoid false alarms. A statistically significant but practically irrelevant change stays at no_drift.


drift_report

from latent.stats import drift_report

results = drift_report(
    baseline_scores={"accuracy": baseline_acc, "f1": baseline_f1},
    current_scores={"accuracy": current_acc, "f1": current_f1},
    score_types={"accuracy": "binary", "f1": "binary"},
    confidence_level=0.95,
    seed=None,
)

Runs drift detection across multiple metrics at once. Returns results sorted by severity (most severe first), so the most urgent regressions surface at the top.

Parameters

Name Type Default Description
baseline_scores dict[str, np.ndarray] required Metric name to baseline scores
current_scores dict[str, np.ndarray] required Metric name to current scores
score_types dict[str, str] \| None None Metric name to score type. Defaults to "binary" for all.
confidence_level float 0.95 Confidence level for CIs
seed int \| None None Random seed

Returns: list[DriftResult] sorted by severity (alert > warning > no_drift).

Example -- multi-metric monitoring:

from latent.stats import drift_report

results = drift_report(
    baseline_scores={"accuracy": baseline_acc, "faithfulness": baseline_faith},
    current_scores={"accuracy": current_acc, "faithfulness": current_faith},
    score_types={"accuracy": "binary", "faithfulness": "ordinal"},
)

for r in results:
    print(f"{r.metric_name}: {r.severity} (delta={r.delta:+.3f}, p={r.p_value:.4f})")
faithfulness: alert (delta=-0.340, p=0.0012)
accuracy: no_drift (delta=-0.020, p=0.4231)

multi_run_trend

from latent.stats import multi_run_trend

trend = multi_run_trend(
    runs=[week1_scores, week2_scores, week3_scores, week4_scores],
    metric_name="quality",
    confidence_level=0.95,
    seed=None,
    higher_is_better=True,
)

Tracks a metric across three or more consecutive runs to detect directional trends. While detect_drift compares two snapshots, multi_run_trend identifies sustained improvement or degradation over time.

Parameters

Name Type Default Description
runs list[np.ndarray] required Scores from 3+ consecutive runs (chronological order)
metric_name str "" Label for the metric
confidence_level float 0.95 Confidence level for per-run CIs
seed int \| None None Random seed
higher_is_better bool True If True, increasing values are "improving". Set to False for metrics where lower is better (e.g. latency, error rate).

Returns: dict with keys:

Key Type Description
values list[float] Point estimate per run
trend str "improving", "degrading", or "stable"
is_monotonic bool Whether the trend is strictly monotonic
cis list[tuple[float, float]] Per-run confidence intervals

Example -- tracking quality over four weeks:

import numpy as np
from latent.stats import multi_run_trend

week1 = np.array([1, 1, 1, 0, 1, 1, 1, 1, 0, 1])
week2 = np.array([1, 1, 0, 0, 1, 1, 1, 0, 0, 1])
week3 = np.array([1, 0, 0, 0, 1, 1, 0, 0, 0, 1])
week4 = np.array([0, 0, 0, 0, 1, 0, 0, 0, 0, 1])

trend = multi_run_trend(
    runs=[week1, week2, week3, week4],
    metric_name="quality",
)
print(f"Trend: {trend['trend']}")        # "degrading"
print(f"Monotonic: {trend['is_monotonic']}")  # True
print(f"Values: {trend['values']}")       # [0.8, 0.6, 0.4, 0.2]

Minimum three runs

multi_run_trend requires at least 3 runs. For comparing just two runs, use detect_drift instead.


Sampling

Why Stratified Sampling?

You cannot evaluate every item. Naive random sampling risks underrepresenting rare but important categories -- the 2% of "escalation" tickets that matter most may not appear at all in a 200-item sample. Stratified sampling guarantees coverage across categories while keeping costs manageable.

stratified_sample

from latent.stats import stratified_sample

sample = stratified_sample(
    df=df,
    stratum_column="category",
    n_total=200,
    allocation="proportional",
    min_per_stratum=0,
    seed=None,
)

Selects a representative subset by stratifying on a column. Each stratum's share in the sample matches its share in the population (proportional allocation) or is equalized (equal allocation).

Parameters

Name Type Default Description
df pd.DataFrame required Source data
stratum_column str required Column to stratify on
n_total int required Total sample size
allocation str "proportional" "proportional" or "equal"
min_per_stratum int 0 Minimum samples per stratum (overrides allocation if needed)
seed int \| None None Random seed

Returns: pd.DataFrame -- the sampled subset, preserving all original columns.

Example -- proportional sampling for an eval set:

import pandas as pd
from latent.stats import stratified_sample

df = pd.DataFrame({
    "text": ["How do I get a refund?", "Track my order", ...],
    "category": ["billing", "shipping", "billing", "refund", ...],
})

sample = stratified_sample(df, stratum_column="category", n_total=200, seed=42)
print(sample["category"].value_counts())
# billing     82   (proportional to original)
# shipping    68
# refund      50

Equal allocation for rare-category analysis

Use allocation="equal" when you need enough examples per category to compute per-class metrics with meaningful confidence intervals, even if it oversamples rare categories.


difficulty_based_sample

from latent.stats import difficulty_based_sample

sample = difficulty_based_sample(
    df=df,
    stratum_column="category",
    n_total=200,
    oversample_factor=2.0,
    min_per_stratum=10,
    seed=None,
)

Over-samples rare or difficult categories for better coverage. Smaller strata receive a higher sampling rate (up to oversample_factor times their proportional share), ensuring they are well-represented in the eval set.

Parameters

Name Type Default Description
df pd.DataFrame required Source data
stratum_column str required Column to stratify on
n_total int required Total sample size
oversample_factor float 2.0 Maximum oversampling multiplier for small strata
min_per_stratum int 10 Floor per stratum
seed int \| None None Random seed

Returns: pd.DataFrame -- the sampled subset.

Example -- ensuring rare categories are covered:

from latent.stats import difficulty_based_sample

# "escalation" is only 2% of data but critical to evaluate
sample = difficulty_based_sample(
    df, stratum_column="category", n_total=200, oversample_factor=3.0, seed=42,
)
print(sample["category"].value_counts())
# billing       72
# shipping      58
# refund        40
# escalation    30   (3x oversampled from proportional share of ~10)

When to use difficulty-based sampling

Use this when some categories are both rare and high-stakes. If all categories are equally important regardless of frequency, use stratified_sample with allocation="equal" instead.


Building a Monitoring Pipeline

Combine sampling and drift detection into an end-to-end monitoring workflow:

import numpy as np
import pandas as pd
from latent.stats import (
    stratified_sample,
    drift_report,
    multi_run_trend,
)

# 1. Select a representative eval set
eval_set = stratified_sample(
    production_data,
    stratum_column="category",
    n_total=500,
    min_per_stratum=20,
    seed=42,
)

# 2. Run your evaluation (your scoring logic here)
current_scores = run_evaluation(eval_set)

# 3. Compare against the baseline
results = drift_report(
    baseline_scores=last_week_scores,
    current_scores=current_scores,
    score_types={"accuracy": "binary", "faithfulness": "ordinal"},
)

alerts = [r for r in results if r.severity == "alert"]
if alerts:
    for a in alerts:
        print(f"ALERT: {a.metric_name} dropped by {a.delta:+.3f} (p={a.p_value:.4f})")
    send_alert(alerts)  # integrate with your alerting system

# 4. Track trends over time
history = load_score_history()  # list of score arrays from previous runs
for metric_name, runs in history.items():
    trend = multi_run_trend(runs, metric_name=metric_name)
    if trend["trend"] == "degrading":
        print(f"WARNING: {metric_name} has been degrading over {len(runs)} runs")

# 5. Log results for the next comparison
save_as_baseline(current_scores)

Automate with CI/CD

Run this pipeline on a schedule (daily or weekly) or as part of your deployment pipeline. Gate deployments on drift_report results -- block releases when any metric hits alert severity.