Drift Detection & Sampling¶
Monitoring score changes over time and selecting representative eval samples. Drift detection catches performance regressions before they reach users. Stratified sampling gives you representative coverage while keeping evaluation costs manageable.
Drift Detection¶
Why Drift Detection?¶
LLM performance degrades over time -- model updates, data distribution shifts, prompt changes, upstream API changes. A system that scored 92% last month may silently drop to 84% this month. Drift detection catches these regressions automatically.
detect_drift¶
from latent.stats import detect_drift
result = detect_drift(
baseline_scores=baseline_scores,
current_scores=current_scores,
metric_name="accuracy",
score_type="binary",
confidence_level=0.95,
seed=None,
)
Compares two eval runs to detect statistically significant performance changes. The method is chosen automatically based on score_type:
- Binary scores: Bootstrap CI on the difference of means
- Ordinal scores: Mann-Whitney U test
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
baseline_scores |
np.ndarray |
required | Scores from the reference run |
current_scores |
np.ndarray |
required | Scores from the current run |
metric_name |
str |
"" |
Label for the metric (used in reporting) |
score_type |
str |
"binary" |
"binary" or "ordinal" |
confidence_level |
float |
0.95 |
Confidence level for the CI |
seed |
int \| None |
None |
Random seed for reproducibility |
Returns: DriftResult with fields severity, delta, ci_lower, ci_upper, p_value, effect_size, metric_name, baseline_value, and current_value.
Example -- comparing weekly evaluation runs:
import numpy as np
from latent.stats import detect_drift
last_week_scores = np.array([1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1])
this_week_scores = np.array([1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1])
result = detect_drift(
baseline_scores=last_week_scores,
current_scores=this_week_scores,
metric_name="accuracy",
score_type="binary",
)
print(f"Severity: {result.severity}") # "no_drift", "warning", or "alert"
print(f"Delta: {result.delta:+.3f} (p={result.p_value:.4f})")
print(f"Effect size: {result.effect_size:.3f}")
Severity Levels¶
Severity is determined by combining statistical significance (p-value) with practical significance (effect size):
| Severity | Condition | Action |
|---|---|---|
no_drift |
p > 0.05 or small effect | No action needed |
warning |
p < 0.05 with small-medium effect | Investigate the cause |
alert |
p < 0.01 with large effect | Immediate attention required |
Effect size matters
A large dataset can produce a tiny p-value for a meaningless difference. Severity uses effect size (Cohen's d for binary, rank-biserial for ordinal) to avoid false alarms. A statistically significant but practically irrelevant change stays at no_drift.
drift_report¶
from latent.stats import drift_report
results = drift_report(
baseline_scores={"accuracy": baseline_acc, "f1": baseline_f1},
current_scores={"accuracy": current_acc, "f1": current_f1},
score_types={"accuracy": "binary", "f1": "binary"},
confidence_level=0.95,
seed=None,
)
Runs drift detection across multiple metrics at once. Returns results sorted by severity (most severe first), so the most urgent regressions surface at the top.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
baseline_scores |
dict[str, np.ndarray] |
required | Metric name to baseline scores |
current_scores |
dict[str, np.ndarray] |
required | Metric name to current scores |
score_types |
dict[str, str] \| None |
None |
Metric name to score type. Defaults to "binary" for all. |
confidence_level |
float |
0.95 |
Confidence level for CIs |
seed |
int \| None |
None |
Random seed |
Returns: list[DriftResult] sorted by severity (alert > warning > no_drift).
Example -- multi-metric monitoring:
from latent.stats import drift_report
results = drift_report(
baseline_scores={"accuracy": baseline_acc, "faithfulness": baseline_faith},
current_scores={"accuracy": current_acc, "faithfulness": current_faith},
score_types={"accuracy": "binary", "faithfulness": "ordinal"},
)
for r in results:
print(f"{r.metric_name}: {r.severity} (delta={r.delta:+.3f}, p={r.p_value:.4f})")
multi_run_trend¶
from latent.stats import multi_run_trend
trend = multi_run_trend(
runs=[week1_scores, week2_scores, week3_scores, week4_scores],
metric_name="quality",
confidence_level=0.95,
seed=None,
higher_is_better=True,
)
Tracks a metric across three or more consecutive runs to detect directional trends. While detect_drift compares two snapshots, multi_run_trend identifies sustained improvement or degradation over time.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
runs |
list[np.ndarray] |
required | Scores from 3+ consecutive runs (chronological order) |
metric_name |
str |
"" |
Label for the metric |
confidence_level |
float |
0.95 |
Confidence level for per-run CIs |
seed |
int \| None |
None |
Random seed |
higher_is_better |
bool |
True |
If True, increasing values are "improving". Set to False for metrics where lower is better (e.g. latency, error rate). |
Returns: dict with keys:
| Key | Type | Description |
|---|---|---|
values |
list[float] |
Point estimate per run |
trend |
str |
"improving", "degrading", or "stable" |
is_monotonic |
bool |
Whether the trend is strictly monotonic |
cis |
list[tuple[float, float]] |
Per-run confidence intervals |
Example -- tracking quality over four weeks:
import numpy as np
from latent.stats import multi_run_trend
week1 = np.array([1, 1, 1, 0, 1, 1, 1, 1, 0, 1])
week2 = np.array([1, 1, 0, 0, 1, 1, 1, 0, 0, 1])
week3 = np.array([1, 0, 0, 0, 1, 1, 0, 0, 0, 1])
week4 = np.array([0, 0, 0, 0, 1, 0, 0, 0, 0, 1])
trend = multi_run_trend(
runs=[week1, week2, week3, week4],
metric_name="quality",
)
print(f"Trend: {trend['trend']}") # "degrading"
print(f"Monotonic: {trend['is_monotonic']}") # True
print(f"Values: {trend['values']}") # [0.8, 0.6, 0.4, 0.2]
Minimum three runs
multi_run_trend requires at least 3 runs. For comparing just two runs, use detect_drift instead.
Sampling¶
Why Stratified Sampling?¶
You cannot evaluate every item. Naive random sampling risks underrepresenting rare but important categories -- the 2% of "escalation" tickets that matter most may not appear at all in a 200-item sample. Stratified sampling guarantees coverage across categories while keeping costs manageable.
stratified_sample¶
from latent.stats import stratified_sample
sample = stratified_sample(
df=df,
stratum_column="category",
n_total=200,
allocation="proportional",
min_per_stratum=0,
seed=None,
)
Selects a representative subset by stratifying on a column. Each stratum's share in the sample matches its share in the population (proportional allocation) or is equalized (equal allocation).
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | Source data |
stratum_column |
str |
required | Column to stratify on |
n_total |
int |
required | Total sample size |
allocation |
str |
"proportional" |
"proportional" or "equal" |
min_per_stratum |
int |
0 |
Minimum samples per stratum (overrides allocation if needed) |
seed |
int \| None |
None |
Random seed |
Returns: pd.DataFrame -- the sampled subset, preserving all original columns.
Example -- proportional sampling for an eval set:
import pandas as pd
from latent.stats import stratified_sample
df = pd.DataFrame({
"text": ["How do I get a refund?", "Track my order", ...],
"category": ["billing", "shipping", "billing", "refund", ...],
})
sample = stratified_sample(df, stratum_column="category", n_total=200, seed=42)
print(sample["category"].value_counts())
# billing 82 (proportional to original)
# shipping 68
# refund 50
Equal allocation for rare-category analysis
Use allocation="equal" when you need enough examples per category to compute per-class metrics with meaningful confidence intervals, even if it oversamples rare categories.
difficulty_based_sample¶
from latent.stats import difficulty_based_sample
sample = difficulty_based_sample(
df=df,
stratum_column="category",
n_total=200,
oversample_factor=2.0,
min_per_stratum=10,
seed=None,
)
Over-samples rare or difficult categories for better coverage. Smaller strata receive a higher sampling rate (up to oversample_factor times their proportional share), ensuring they are well-represented in the eval set.
Parameters
| Name | Type | Default | Description |
|---|---|---|---|
df |
pd.DataFrame |
required | Source data |
stratum_column |
str |
required | Column to stratify on |
n_total |
int |
required | Total sample size |
oversample_factor |
float |
2.0 |
Maximum oversampling multiplier for small strata |
min_per_stratum |
int |
10 |
Floor per stratum |
seed |
int \| None |
None |
Random seed |
Returns: pd.DataFrame -- the sampled subset.
Example -- ensuring rare categories are covered:
from latent.stats import difficulty_based_sample
# "escalation" is only 2% of data but critical to evaluate
sample = difficulty_based_sample(
df, stratum_column="category", n_total=200, oversample_factor=3.0, seed=42,
)
print(sample["category"].value_counts())
# billing 72
# shipping 58
# refund 40
# escalation 30 (3x oversampled from proportional share of ~10)
When to use difficulty-based sampling
Use this when some categories are both rare and high-stakes. If all categories are equally important regardless of frequency, use stratified_sample with allocation="equal" instead.
Building a Monitoring Pipeline¶
Combine sampling and drift detection into an end-to-end monitoring workflow:
import numpy as np
import pandas as pd
from latent.stats import (
stratified_sample,
drift_report,
multi_run_trend,
)
# 1. Select a representative eval set
eval_set = stratified_sample(
production_data,
stratum_column="category",
n_total=500,
min_per_stratum=20,
seed=42,
)
# 2. Run your evaluation (your scoring logic here)
current_scores = run_evaluation(eval_set)
# 3. Compare against the baseline
results = drift_report(
baseline_scores=last_week_scores,
current_scores=current_scores,
score_types={"accuracy": "binary", "faithfulness": "ordinal"},
)
alerts = [r for r in results if r.severity == "alert"]
if alerts:
for a in alerts:
print(f"ALERT: {a.metric_name} dropped by {a.delta:+.3f} (p={a.p_value:.4f})")
send_alert(alerts) # integrate with your alerting system
# 4. Track trends over time
history = load_score_history() # list of score arrays from previous runs
for metric_name, runs in history.items():
trend = multi_run_trend(runs, metric_name=metric_name)
if trend["trend"] == "degrading":
print(f"WARNING: {metric_name} has been degrading over {len(runs)} runs")
# 5. Log results for the next comparison
save_as_baseline(current_scores)
Automate with CI/CD
Run this pipeline on a schedule (daily or weekly) or as part of your deployment pipeline. Gate deployments on drift_report results -- block releases when any metric hits alert severity.