Pre-built Evaluation Flows¶
Latent ships a library of composable evaluation flows that handle scoring, statistical analysis, quality gating, and MLflow logging out of the box. All flows live in latent.flows and require the [eval] extra:
Every flow returns a dict with structured results, a StatisticalReport, and rendered Markdown. All flows auto-log metrics to MLflow when an active run exists.
Choosing a Flow¶
| Task | Flow | Input |
|---|---|---|
| Score free-text outputs with LLM judge | judge_flow |
DataFrame |
| Binary/multi-class classification | classification_flow |
DataFrame + ground_truth column |
| Compare model A vs. baseline B | comparison_flow |
DataFrame + baseline score arrays |
| Multi-turn conversation quality | conversation_scoring_flow |
list[Conversation] |
| SOP/checklist compliance | conversation_sop_flow |
list[Conversation] + checklist |
| Turn-score trajectory analysis | conversation_trajectory_flow |
list[Conversation] with scores |
| Simulate + score conversations | conversation_simulation_flow |
Agent class + scenarios |
| End-to-end agent evaluation | agent_eval_flow |
DataFrame + agent factory + judge |
| Agent inference only (no scoring) | agent_inference_flow |
DataFrame + agent factory |
| Retrieval/RAG quality | retrieval_eval_flow |
DataFrame + retriever |
| Score distribution drift | drift_flow |
Two score dicts |
Scoring Flows¶
judge_flow¶
Score every row in a DataFrame with an LLM judge, run bootstrap statistical analysis, and check quality gates.
Signature:
def judge_flow(
eval_data: pd.DataFrame,
judge: Judge[T],
*,
gates: dict[str, float] | None = None,
score_types: dict[str, str] | None = None,
rubrics: dict | None = None,
n_resamples: int = 10_000,
confidence_level: float = 0.95,
seed: int | None = None,
) -> dict[str, Any]
Returns: scored_data (DataFrame), report (StatisticalReport), markdown (str), all_passed (bool).
Score types and rubrics are auto-detected from the judge's output_type annotations. Pass score_types or rubrics explicitly to override.
from typing import Annotated
from latent.agents import Judge
from latent.agents.scores import ScoredModel, OrdinalScore
from latent.flows import judge_flow
class QAScores(ScoredModel):
quality: Annotated[int, OrdinalScore(scale=(1,2,3,4,5), pass_threshold=3)]
judge = Judge("qa_judge", model="gpt-4o", output_type=QAScores)
result = judge_flow(eval_data=df, judge=judge, gates={"quality": 3.5})
print(result["markdown"]) # Statistical report with CIs
assert result["all_passed"] # Gate check
Auto-detected configuration
ScoredModel fields annotated with OrdinalScore, BinaryScore, or ContinuousScore are automatically extracted as score types and rubrics. You rarely need to pass score_types manually.
comparison_flow¶
Score a dataset and compare against baseline scores. Wraps judge_flow with paired comparison statistics.
Signature:
def comparison_flow(
eval_data: pd.DataFrame,
judge: Judge[T],
*,
baseline_scores: dict[str, np.ndarray] | None = None,
gates: dict[str, float] | None = None,
score_types: dict[str, str] | None = None,
rubrics: dict | None = None,
n_resamples: int = 10_000,
confidence_level: float = 0.95,
seed: int | None = None,
) -> dict[str, Any]
Returns: scored_data, report, markdown, all_passed.
If baseline_scores is None, the flow runs in first-run mode (no comparison, just scoring).
from latent.flows import comparison_flow
import numpy as np
# First run: establish baseline
baseline_result = judge_flow(eval_data=df_v1, judge=judge)
baseline = {
"quality": baseline_result["scored_data"]["quality"].values
}
# Second run: compare against baseline
result = comparison_flow(
eval_data=df_v2,
judge=judge,
baseline_scores=baseline,
gates={"quality": 3.5},
)
# result["report"] includes paired CIs, effect sizes, p-values
Classification Flows¶
classification_flow¶
Run a Classifier on a dataset and compute accuracy, precision, recall, and F1 with confidence intervals.
Signature:
def classification_flow(
eval_data: pd.DataFrame,
classifier: Classifier[T],
*,
labels: list[str] | None = None,
gates: dict[str, float] | None = None,
confidence_level: float = 0.95,
n_resamples: int = 10_000,
seed: int | None = None,
) -> dict[str, Any]
Returns: predictions_data (DataFrame), metrics (dict of MetricResult), all_passed (bool).
Required column
The input DataFrame must have a ground_truth column. The classifier's output model must have a prediction field.
from pydantic import BaseModel
from latent.agents import Classifier
from latent.flows import classification_flow
class Intent(BaseModel):
prediction: str
classifier = Classifier(
"intent",
model="gpt-4o",
output_type=Intent,
prompt_template="Classify intent: {text}\nLabels: {labels}",
)
result = classification_flow(
eval_data=df, # must have "ground_truth" column
classifier=classifier,
labels=["billing", "support", "sales"],
gates={"accuracy": 0.85, "f1": 0.80},
)
print(result["metrics"]["accuracy"].point_estimate)
Conversation Flows¶
conversation_scoring_flow¶
Score multi-turn conversations at the turn level, aggregate per conversation, and run statistical analysis.
Signature:
def conversation_scoring_flow(
conversations: list[Conversation],
judge: Judge[T],
*,
aggregation: str = "mean", # "mean" | "min" | "max" | "last"
gates: dict[str, float] | None = None,
n_resamples: int = 10_000,
confidence_level: float = 0.95,
seed: int | None = None,
) -> dict[str, Any]
Returns: scored_conversations (list), aggregated_scores (dict), report, markdown.
Only assistant turns are scored. The aggregation parameter controls how per-turn scores collapse into a single per-conversation value.
from latent.stats.conversation import Conversation, Turn
from latent.flows import conversation_scoring_flow
convos = [
Conversation(turns=[
Turn(role="user", content="How do I reset my password?"),
Turn(role="assistant", content="Go to Settings > Security > Reset Password."),
]),
]
result = conversation_scoring_flow(
convos,
judge=judge,
aggregation="mean",
gates={"quality": 3.0},
)
| Aggregation | Behavior |
|---|---|
mean |
Average of all turn scores |
min |
Worst turn determines conversation score |
max |
Best turn determines conversation score |
last |
Final assistant turn score only |
conversation_sop_flow¶
Check conversations against an SOP (Standard Operating Procedure) compliance checklist.
Signature:
def conversation_sop_flow(
conversations: list[Conversation],
judge: Judge[T],
*,
sop_checklist: list[dict[str, Any]],
confidence_level: float = 0.95,
n_resamples: int = 10_000,
seed: int | None = None,
) -> dict[str, Any]
Returns: compliance_results (list of per-conversation dicts), aggregate_rates (per-item stats with CIs), report, markdown.
Each checklist item is a dict with name, description, and required keys. The judge evaluates each item against the full conversation text.
from latent.flows import conversation_sop_flow
checklist = [
{"name": "greeting", "description": "Agent greets the customer", "required": True},
{"name": "verify_identity", "description": "Agent verifies customer identity", "required": True},
{"name": "resolution", "description": "Agent provides a resolution or next steps", "required": True},
]
result = conversation_sop_flow(
conversations=convos,
judge=judge,
sop_checklist=checklist,
)
# result["aggregate_rates"]["greeting"]["mean"] -> 0.92
conversation_trajectory_flow¶
Analyze how scores evolve across turns. Computes L2 distances between conversation trajectories (or against a reference pattern).
Signature:
def conversation_trajectory_flow(
conversations: list[Conversation],
*,
score_key: str = "score",
reference_pattern: list[float] | None = None,
confidence_level: float = 0.95,
n_resamples: int = 10_000,
seed: int | None = None,
) -> dict[str, Any]
Returns: trajectories (list of score sequences), distances (ndarray), report, markdown.
Conversations must have turn scores in their metadata (typically from a prior conversation_scoring_flow run). If reference_pattern is provided, each trajectory is compared to it; otherwise pairwise distances are computed.
from latent.flows import conversation_trajectory_flow
# Ideal pattern: quality should stay high across 5 turns
result = conversation_trajectory_flow(
scored_conversations,
score_key="quality",
reference_pattern=[4.0, 4.0, 4.0, 4.0, 4.0],
)
conversation_simulation_flow¶
Simulate conversations between an agent under test and a synthetic human, then optionally score the results.
Signature:
def conversation_simulation_flow(
agent_class: type,
agent_kwargs: dict[str, Any] | None = None,
*,
scenarios: pd.DataFrame | None = None,
human_class: type | None = None,
human_kwargs: dict[str, Any] | None = None,
judge_class: Callable[[], Judge[T]] | None = None,
judge_scoring: JudgeScoringConfig | None = None,
simulation_guardrails: SimulationGuardrails | None = None,
context_column: str = "context",
) -> dict[str, Any]
Returns: simulation_results (DataFrame with turn data), report (scoring dict or None).
If scenarios is not provided, the flow loads them from the catalog. If judge_class is provided, conversations are scored automatically after simulation.
from latent.flows import conversation_simulation_flow
result = conversation_simulation_flow(
agent_class=MyAgent,
agent_kwargs={"model": "gpt-4o"},
scenarios=scenarios_df, # must have "context" column
judge_class=lambda: judge, # factory for scoring
judge_scoring={"scope": "conversation", "gates": {"quality": 3.5}},
simulation_guardrails={"max_turns_per_conversation": 20},
)
Scoring scope
Set judge_scoring["scope"] to "conversation" to score each conversation as a whole, or "turn" to score each assistant turn individually and aggregate. Quality gates are auto-derived from the judge's output_type pass_threshold annotations and can be overridden via judge_scoring["gates"].
Agent Evaluation Flows¶
agent_eval_flow¶
Full end-to-end agent evaluation pipeline: inference, LLM judging, optional deterministic scorers, category breakdowns, and failure mode classification.
Signature:
def agent_eval_flow(
eval_data: pd.DataFrame,
agent_factory: Callable[[], BaseAgent],
judge: Judge[T],
*,
question_column: str = "question",
expected_column: str | None = None,
category_column: str | None = None,
id_column: str | None = None,
context_columns: list[str] | None = None,
context_formatter: Callable[[dict], str] | None = None,
gates: dict[str, float] | None = None,
scorers: list[Callable] | None = None,
failure_taxonomy: dict[str, str] | None = None,
failure_filter_fn: Callable | None = None,
concurrency: int = 1,
n_resamples: int = 10_000,
confidence_level: float = 0.95,
seed: int | None = None,
) -> dict[str, Any]
Returns: inference_results (list), scored_data (DataFrame), report (StatisticalReport), markdown (str), all_passed (bool).
This flow orchestrates five stages:
- Inference -- run the agent on every row via
agent_inference_flow - Judging -- score outputs with an LLM judge via
judge_flow - Deterministic scoring -- run optional scorer functions
- Category breakdown -- slice metrics by category column
- Failure classification -- classify failure modes on low-scoring records
from latent.flows import agent_eval_flow
result = agent_eval_flow(
eval_data=df,
agent_factory=lambda: MyRAGAgent(model="gpt-4o"),
judge=judge,
question_column="question",
expected_column="expected_answer",
category_column="topic",
gates={"quality": 3.5, "faithfulness": 4.0},
failure_taxonomy={
"hallucination": "Agent fabricated information not in sources",
"refusal": "Agent refused to answer when it should have",
"incomplete": "Agent gave a partial answer missing key details",
},
concurrency=5,
)
Deterministic scorers
The scorers parameter accepts callables with signature (scored_data: DataFrame, inference_results: list[dict]) -> dict[str, MetricResult]. Use these for exact-match, BLEU, ROUGE, or any non-LLM metric.
agent_inference_flow¶
Run an agent on every row of a DataFrame and collect outputs, tool calls, token counts, and latency. Uses Prefect task.map() for concurrent execution.
Signature:
def agent_inference_flow(
eval_data: pd.DataFrame,
agent_factory: Callable[[], BaseAgent],
*,
question_column: str = "question",
context_columns: list[str] | None = None,
context_formatter: Callable[[dict], str] | None = None,
id_column: str | None = None,
concurrency: int = 1,
include_events: bool = False,
) -> dict[str, Any]
Returns: inference_results (list of dicts with output, thinking, tool_calls, tool_results, latency_ms, input_tokens, output_tokens, error), summary (aggregate counts and mean latency).
from latent.flows import agent_inference_flow
result = agent_inference_flow(
eval_data=df,
agent_factory=lambda: MyAgent(model="gpt-4o"),
question_column="prompt",
concurrency=10,
)
print(result["summary"])
# {"total_count": 200, "success_count": 198, "error_count": 2, ...}
Agent factory isolation
Each Prefect task creates its own agent via agent_factory(). Never pass a shared agent instance -- the factory pattern prevents mutable state leaking between concurrent tasks.
RAG Evaluation Flows¶
retrieval_eval_flow¶
Evaluate retrieval quality independently of generation. For each question+expected answer pair, retrieves chunks and optionally classifies answerability, grades coverage, and labels chunk relevance.
Signature:
def retrieval_eval_flow(
eval_data: pd.DataFrame,
retriever: Any,
*,
question_column: str = "question",
expected_column: str = "expected",
id_column: str | None = None,
k: int = 10,
judge_model: str = "claude-sonnet-4-5-20250929",
answerability_check: bool = True,
chunk_relevance_labels: bool = True,
coverage_grading: bool = True,
n_resamples: int = 10_000,
confidence_level: float = 0.95,
seed: int | None = None,
) -> dict[str, Any]
Returns: results (list of per-row dicts), report (StatisticalReport with coverage metrics), markdown (str), relevant_chunk_ids (dict mapping row ID to relevant chunk IDs).
The retriever must implement a .search(query, k=k) method returning objects with content, source, and optionally id and score attributes.
from latent.flows import retrieval_eval_flow
result = retrieval_eval_flow(
eval_data=df, # columns: question, expected
retriever=my_chroma, # .search(query, k) -> list[Chunk]
k=10,
judge_model="gpt-4o",
)
# Per-row coverage grades (1-5)
for r in result["results"]:
print(f"{r['id']}: relevance={r.get('relevance_grade')}, "
f"info_match={r.get('info_match_grade')}")
Evaluation dimensions:
| Dimension | Toggle | What it measures |
|---|---|---|
| Answerability | answerability_check |
Whether the question needs retrieval at all |
| Coverage grading | coverage_grading |
Relevance (1-5) and info completeness (1-5) of retrieved chunks |
| Chunk relevance | chunk_relevance_labels |
Per-chunk labels: relevant, contradicts, or not_relevant |
Drift Detection¶
drift_flow¶
Detect distribution drift between a reference (baseline) and current set of scores.
Signature:
def drift_flow(
reference_scores: dict[str, np.ndarray],
current_scores: dict[str, np.ndarray],
*,
score_types: dict[str, str] | None = None,
threshold: float = 0.05,
confidence_level: float = 0.95,
seed: int | None = None,
) -> dict[str, Any]
Returns: drift_results (list of DriftResult), drift_detected (bool), report_text (str).
Drift is flagged when any metric's p-value falls below threshold (default 0.05). Each DriftResult includes delta, p_value, effect_size, and severity.
import numpy as np
from latent.flows import drift_flow
result = drift_flow(
reference_scores={"quality": np.array([4, 5, 4, 3, 5, 4])},
current_scores={"quality": np.array([3, 2, 3, 4, 2, 3])},
threshold=0.05,
)
if result["drift_detected"]:
print(result["report_text"])
Quality Gates¶
All scoring flows support quality gates via the gates parameter -- a dict mapping metric names to minimum thresholds. The flow checks each gate against the point estimate from statistical analysis and sets all_passed accordingly.
result = judge_flow(
eval_data=df,
judge=judge,
gates={
"quality": 3.5, # mean quality must be >= 3.5
"faithfulness": 4.0, # mean faithfulness must be >= 4.0
},
)
if not result["all_passed"]:
for gate in result["report"].gates:
if not gate.passed:
print(f"FAILED: {gate.metric_name} = {gate.observed:.2f} < {gate.threshold}")
Gate strictness
Flow-level gates use the point estimate by default. For stricter gating based on the lower CI bound, use latent.stats.threshold_gate directly with strictness="lower_ci".
Statistical Analysis Integration¶
Every flow that produces scores calls latent.stats.analyze() internally, which provides:
- Bootstrap confidence intervals for all metrics
- Paired comparisons when baseline scores are available (via
comparison_flow) - Effect sizes (Cohen's d, odds ratio)
- Gate evaluation against thresholds
- MLflow logging of all metrics, comparisons, and gate results
All flows accept these common statistical parameters:
| Parameter | Default | Purpose |
|---|---|---|
n_resamples |
10_000 |
Bootstrap iterations for CI estimation |
confidence_level |
0.95 |
Confidence interval width |
seed |
None |
Random seed for reproducibility |
Common Patterns¶
Chaining flows¶
Use retrieval_eval_flow to validate retrieval quality before running the full agent_eval_flow:
from latent.flows import retrieval_eval_flow, agent_eval_flow
# Step 1: Check retrieval quality
retrieval_result = retrieval_eval_flow(
eval_data=df,
retriever=my_retriever,
k=10,
)
# Step 2: Only proceed if coverage is acceptable
coverage = retrieval_result["report"].metrics[0].point_estimate
if coverage >= 3.5:
eval_result = agent_eval_flow(
eval_data=df,
agent_factory=lambda: MyRAGAgent(retriever=my_retriever),
judge=judge,
gates={"quality": 3.5},
)
Simulation-to-scoring pipeline¶
Simulate conversations, then run detailed scoring and trajectory analysis:
from latent.flows import (
conversation_simulation_flow,
conversation_scoring_flow,
conversation_trajectory_flow,
results_to_conversations,
)
# Simulate
sim = conversation_simulation_flow(
agent_class=MyAgent,
scenarios=scenarios_df,
)
# Convert to Conversation objects
convos = results_to_conversations(sim["simulation_results"])
# Score
scored = conversation_scoring_flow(convos, judge=judge, aggregation="mean")
# Analyze trajectory patterns
traj = conversation_trajectory_flow(
scored["scored_conversations"],
score_key="quality",
reference_pattern=[4.0, 4.0, 4.0],
)
Drift monitoring across runs¶
Store baseline scores and check for regression on each new evaluation:
import numpy as np
from latent.flows import judge_flow, drift_flow
# Current run
current = judge_flow(eval_data=df, judge=judge)
# Load baseline from previous run (e.g., from MLflow or disk)
baseline_scores = {"quality": np.load("baseline_quality.npy")}
current_scores = {"quality": current["scored_data"]["quality"].values}
drift = drift_flow(
reference_scores=baseline_scores,
current_scores=current_scores,
threshold=0.05,
)
if drift["drift_detected"]:
print("Quality regression detected")
print(drift["report_text"])
Custom deterministic scorers with agent_eval_flow¶
from latent.stats.models import MetricResult
def exact_match_scorer(scored_data, inference_results):
matches = sum(
1 for r in inference_results
if r.get("output", "").strip() == r.get("expected", "").strip()
)
total = len(inference_results)
return {
"exact_match": MetricResult(
name="exact_match",
point_estimate=matches / total if total else 0.0,
sample_size=total,
)
}
result = agent_eval_flow(
eval_data=df,
agent_factory=lambda: MyAgent(),
judge=judge,
scorers=[exact_match_scorer],
)