Skip to content

Pre-built Evaluation Flows

Latent ships a library of composable evaluation flows that handle scoring, statistical analysis, quality gating, and MLflow logging out of the box. All flows live in latent.flows and require the [eval] extra:

pip install "latent[eval]"

Every flow returns a dict with structured results, a StatisticalReport, and rendered Markdown. All flows auto-log metrics to MLflow when an active run exists.


Choosing a Flow

Task Flow Input
Score free-text outputs with LLM judge judge_flow DataFrame
Binary/multi-class classification classification_flow DataFrame + ground_truth column
Compare model A vs. baseline B comparison_flow DataFrame + baseline score arrays
Multi-turn conversation quality conversation_scoring_flow list[Conversation]
SOP/checklist compliance conversation_sop_flow list[Conversation] + checklist
Turn-score trajectory analysis conversation_trajectory_flow list[Conversation] with scores
Simulate + score conversations conversation_simulation_flow Agent class + scenarios
End-to-end agent evaluation agent_eval_flow DataFrame + agent factory + judge
Agent inference only (no scoring) agent_inference_flow DataFrame + agent factory
Retrieval/RAG quality retrieval_eval_flow DataFrame + retriever
Score distribution drift drift_flow Two score dicts

Scoring Flows

judge_flow

Score every row in a DataFrame with an LLM judge, run bootstrap statistical analysis, and check quality gates.

Signature:

def judge_flow(
    eval_data: pd.DataFrame,
    judge: Judge[T],
    *,
    gates: dict[str, float] | None = None,
    score_types: dict[str, str] | None = None,
    rubrics: dict | None = None,
    n_resamples: int = 10_000,
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> dict[str, Any]

Returns: scored_data (DataFrame), report (StatisticalReport), markdown (str), all_passed (bool).

Score types and rubrics are auto-detected from the judge's output_type annotations. Pass score_types or rubrics explicitly to override.

from typing import Annotated
from latent.agents import Judge
from latent.agents.scores import ScoredModel, OrdinalScore
from latent.flows import judge_flow

class QAScores(ScoredModel):
    quality: Annotated[int, OrdinalScore(scale=(1,2,3,4,5), pass_threshold=3)]

judge = Judge("qa_judge", model="gpt-4o", output_type=QAScores)
result = judge_flow(eval_data=df, judge=judge, gates={"quality": 3.5})

print(result["markdown"])       # Statistical report with CIs
assert result["all_passed"]     # Gate check

Auto-detected configuration

ScoredModel fields annotated with OrdinalScore, BinaryScore, or ContinuousScore are automatically extracted as score types and rubrics. You rarely need to pass score_types manually.


comparison_flow

Score a dataset and compare against baseline scores. Wraps judge_flow with paired comparison statistics.

Signature:

def comparison_flow(
    eval_data: pd.DataFrame,
    judge: Judge[T],
    *,
    baseline_scores: dict[str, np.ndarray] | None = None,
    gates: dict[str, float] | None = None,
    score_types: dict[str, str] | None = None,
    rubrics: dict | None = None,
    n_resamples: int = 10_000,
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> dict[str, Any]

Returns: scored_data, report, markdown, all_passed.

If baseline_scores is None, the flow runs in first-run mode (no comparison, just scoring).

from latent.flows import comparison_flow
import numpy as np

# First run: establish baseline
baseline_result = judge_flow(eval_data=df_v1, judge=judge)
baseline = {
    "quality": baseline_result["scored_data"]["quality"].values
}

# Second run: compare against baseline
result = comparison_flow(
    eval_data=df_v2,
    judge=judge,
    baseline_scores=baseline,
    gates={"quality": 3.5},
)
# result["report"] includes paired CIs, effect sizes, p-values

Classification Flows

classification_flow

Run a Classifier on a dataset and compute accuracy, precision, recall, and F1 with confidence intervals.

Signature:

def classification_flow(
    eval_data: pd.DataFrame,
    classifier: Classifier[T],
    *,
    labels: list[str] | None = None,
    gates: dict[str, float] | None = None,
    confidence_level: float = 0.95,
    n_resamples: int = 10_000,
    seed: int | None = None,
) -> dict[str, Any]

Returns: predictions_data (DataFrame), metrics (dict of MetricResult), all_passed (bool).

Required column

The input DataFrame must have a ground_truth column. The classifier's output model must have a prediction field.

from pydantic import BaseModel
from latent.agents import Classifier
from latent.flows import classification_flow

class Intent(BaseModel):
    prediction: str

classifier = Classifier(
    "intent",
    model="gpt-4o",
    output_type=Intent,
    prompt_template="Classify intent: {text}\nLabels: {labels}",
)

result = classification_flow(
    eval_data=df,  # must have "ground_truth" column
    classifier=classifier,
    labels=["billing", "support", "sales"],
    gates={"accuracy": 0.85, "f1": 0.80},
)
print(result["metrics"]["accuracy"].point_estimate)

Conversation Flows

conversation_scoring_flow

Score multi-turn conversations at the turn level, aggregate per conversation, and run statistical analysis.

Signature:

def conversation_scoring_flow(
    conversations: list[Conversation],
    judge: Judge[T],
    *,
    aggregation: str = "mean",  # "mean" | "min" | "max" | "last"
    gates: dict[str, float] | None = None,
    n_resamples: int = 10_000,
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> dict[str, Any]

Returns: scored_conversations (list), aggregated_scores (dict), report, markdown.

Only assistant turns are scored. The aggregation parameter controls how per-turn scores collapse into a single per-conversation value.

from latent.stats.conversation import Conversation, Turn
from latent.flows import conversation_scoring_flow

convos = [
    Conversation(turns=[
        Turn(role="user", content="How do I reset my password?"),
        Turn(role="assistant", content="Go to Settings > Security > Reset Password."),
    ]),
]

result = conversation_scoring_flow(
    convos,
    judge=judge,
    aggregation="mean",
    gates={"quality": 3.0},
)
Aggregation Behavior
mean Average of all turn scores
min Worst turn determines conversation score
max Best turn determines conversation score
last Final assistant turn score only

conversation_sop_flow

Check conversations against an SOP (Standard Operating Procedure) compliance checklist.

Signature:

def conversation_sop_flow(
    conversations: list[Conversation],
    judge: Judge[T],
    *,
    sop_checklist: list[dict[str, Any]],
    confidence_level: float = 0.95,
    n_resamples: int = 10_000,
    seed: int | None = None,
) -> dict[str, Any]

Returns: compliance_results (list of per-conversation dicts), aggregate_rates (per-item stats with CIs), report, markdown.

Each checklist item is a dict with name, description, and required keys. The judge evaluates each item against the full conversation text.

from latent.flows import conversation_sop_flow

checklist = [
    {"name": "greeting", "description": "Agent greets the customer", "required": True},
    {"name": "verify_identity", "description": "Agent verifies customer identity", "required": True},
    {"name": "resolution", "description": "Agent provides a resolution or next steps", "required": True},
]

result = conversation_sop_flow(
    conversations=convos,
    judge=judge,
    sop_checklist=checklist,
)
# result["aggregate_rates"]["greeting"]["mean"] -> 0.92

conversation_trajectory_flow

Analyze how scores evolve across turns. Computes L2 distances between conversation trajectories (or against a reference pattern).

Signature:

def conversation_trajectory_flow(
    conversations: list[Conversation],
    *,
    score_key: str = "score",
    reference_pattern: list[float] | None = None,
    confidence_level: float = 0.95,
    n_resamples: int = 10_000,
    seed: int | None = None,
) -> dict[str, Any]

Returns: trajectories (list of score sequences), distances (ndarray), report, markdown.

Conversations must have turn scores in their metadata (typically from a prior conversation_scoring_flow run). If reference_pattern is provided, each trajectory is compared to it; otherwise pairwise distances are computed.

from latent.flows import conversation_trajectory_flow

# Ideal pattern: quality should stay high across 5 turns
result = conversation_trajectory_flow(
    scored_conversations,
    score_key="quality",
    reference_pattern=[4.0, 4.0, 4.0, 4.0, 4.0],
)

conversation_simulation_flow

Simulate conversations between an agent under test and a synthetic human, then optionally score the results.

Signature:

def conversation_simulation_flow(
    agent_class: type,
    agent_kwargs: dict[str, Any] | None = None,
    *,
    scenarios: pd.DataFrame | None = None,
    human_class: type | None = None,
    human_kwargs: dict[str, Any] | None = None,
    judge_class: Callable[[], Judge[T]] | None = None,
    judge_scoring: JudgeScoringConfig | None = None,
    simulation_guardrails: SimulationGuardrails | None = None,
    context_column: str = "context",
) -> dict[str, Any]

Returns: simulation_results (DataFrame with turn data), report (scoring dict or None).

If scenarios is not provided, the flow loads them from the catalog. If judge_class is provided, conversations are scored automatically after simulation.

from latent.flows import conversation_simulation_flow

result = conversation_simulation_flow(
    agent_class=MyAgent,
    agent_kwargs={"model": "gpt-4o"},
    scenarios=scenarios_df,          # must have "context" column
    judge_class=lambda: judge,       # factory for scoring
    judge_scoring={"scope": "conversation", "gates": {"quality": 3.5}},
    simulation_guardrails={"max_turns_per_conversation": 20},
)

Scoring scope

Set judge_scoring["scope"] to "conversation" to score each conversation as a whole, or "turn" to score each assistant turn individually and aggregate. Quality gates are auto-derived from the judge's output_type pass_threshold annotations and can be overridden via judge_scoring["gates"].


Agent Evaluation Flows

agent_eval_flow

Full end-to-end agent evaluation pipeline: inference, LLM judging, optional deterministic scorers, category breakdowns, and failure mode classification.

Signature:

def agent_eval_flow(
    eval_data: pd.DataFrame,
    agent_factory: Callable[[], BaseAgent],
    judge: Judge[T],
    *,
    question_column: str = "question",
    expected_column: str | None = None,
    category_column: str | None = None,
    id_column: str | None = None,
    context_columns: list[str] | None = None,
    context_formatter: Callable[[dict], str] | None = None,
    gates: dict[str, float] | None = None,
    scorers: list[Callable] | None = None,
    failure_taxonomy: dict[str, str] | None = None,
    failure_filter_fn: Callable | None = None,
    concurrency: int = 1,
    n_resamples: int = 10_000,
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> dict[str, Any]

Returns: inference_results (list), scored_data (DataFrame), report (StatisticalReport), markdown (str), all_passed (bool).

This flow orchestrates five stages:

  1. Inference -- run the agent on every row via agent_inference_flow
  2. Judging -- score outputs with an LLM judge via judge_flow
  3. Deterministic scoring -- run optional scorer functions
  4. Category breakdown -- slice metrics by category column
  5. Failure classification -- classify failure modes on low-scoring records
from latent.flows import agent_eval_flow

result = agent_eval_flow(
    eval_data=df,
    agent_factory=lambda: MyRAGAgent(model="gpt-4o"),
    judge=judge,
    question_column="question",
    expected_column="expected_answer",
    category_column="topic",
    gates={"quality": 3.5, "faithfulness": 4.0},
    failure_taxonomy={
        "hallucination": "Agent fabricated information not in sources",
        "refusal": "Agent refused to answer when it should have",
        "incomplete": "Agent gave a partial answer missing key details",
    },
    concurrency=5,
)

Deterministic scorers

The scorers parameter accepts callables with signature (scored_data: DataFrame, inference_results: list[dict]) -> dict[str, MetricResult]. Use these for exact-match, BLEU, ROUGE, or any non-LLM metric.


agent_inference_flow

Run an agent on every row of a DataFrame and collect outputs, tool calls, token counts, and latency. Uses Prefect task.map() for concurrent execution.

Signature:

def agent_inference_flow(
    eval_data: pd.DataFrame,
    agent_factory: Callable[[], BaseAgent],
    *,
    question_column: str = "question",
    context_columns: list[str] | None = None,
    context_formatter: Callable[[dict], str] | None = None,
    id_column: str | None = None,
    concurrency: int = 1,
    include_events: bool = False,
) -> dict[str, Any]

Returns: inference_results (list of dicts with output, thinking, tool_calls, tool_results, latency_ms, input_tokens, output_tokens, error), summary (aggregate counts and mean latency).

from latent.flows import agent_inference_flow

result = agent_inference_flow(
    eval_data=df,
    agent_factory=lambda: MyAgent(model="gpt-4o"),
    question_column="prompt",
    concurrency=10,
)
print(result["summary"])
# {"total_count": 200, "success_count": 198, "error_count": 2, ...}

Agent factory isolation

Each Prefect task creates its own agent via agent_factory(). Never pass a shared agent instance -- the factory pattern prevents mutable state leaking between concurrent tasks.


RAG Evaluation Flows

retrieval_eval_flow

Evaluate retrieval quality independently of generation. For each question+expected answer pair, retrieves chunks and optionally classifies answerability, grades coverage, and labels chunk relevance.

Signature:

def retrieval_eval_flow(
    eval_data: pd.DataFrame,
    retriever: Any,
    *,
    question_column: str = "question",
    expected_column: str = "expected",
    id_column: str | None = None,
    k: int = 10,
    judge_model: str = "claude-sonnet-4-5-20250929",
    answerability_check: bool = True,
    chunk_relevance_labels: bool = True,
    coverage_grading: bool = True,
    n_resamples: int = 10_000,
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> dict[str, Any]

Returns: results (list of per-row dicts), report (StatisticalReport with coverage metrics), markdown (str), relevant_chunk_ids (dict mapping row ID to relevant chunk IDs).

The retriever must implement a .search(query, k=k) method returning objects with content, source, and optionally id and score attributes.

from latent.flows import retrieval_eval_flow

result = retrieval_eval_flow(
    eval_data=df,         # columns: question, expected
    retriever=my_chroma,  # .search(query, k) -> list[Chunk]
    k=10,
    judge_model="gpt-4o",
)

# Per-row coverage grades (1-5)
for r in result["results"]:
    print(f"{r['id']}: relevance={r.get('relevance_grade')}, "
          f"info_match={r.get('info_match_grade')}")

Evaluation dimensions:

Dimension Toggle What it measures
Answerability answerability_check Whether the question needs retrieval at all
Coverage grading coverage_grading Relevance (1-5) and info completeness (1-5) of retrieved chunks
Chunk relevance chunk_relevance_labels Per-chunk labels: relevant, contradicts, or not_relevant

Drift Detection

drift_flow

Detect distribution drift between a reference (baseline) and current set of scores.

Signature:

def drift_flow(
    reference_scores: dict[str, np.ndarray],
    current_scores: dict[str, np.ndarray],
    *,
    score_types: dict[str, str] | None = None,
    threshold: float = 0.05,
    confidence_level: float = 0.95,
    seed: int | None = None,
) -> dict[str, Any]

Returns: drift_results (list of DriftResult), drift_detected (bool), report_text (str).

Drift is flagged when any metric's p-value falls below threshold (default 0.05). Each DriftResult includes delta, p_value, effect_size, and severity.

import numpy as np
from latent.flows import drift_flow

result = drift_flow(
    reference_scores={"quality": np.array([4, 5, 4, 3, 5, 4])},
    current_scores={"quality": np.array([3, 2, 3, 4, 2, 3])},
    threshold=0.05,
)

if result["drift_detected"]:
    print(result["report_text"])

Quality Gates

All scoring flows support quality gates via the gates parameter -- a dict mapping metric names to minimum thresholds. The flow checks each gate against the point estimate from statistical analysis and sets all_passed accordingly.

result = judge_flow(
    eval_data=df,
    judge=judge,
    gates={
        "quality": 3.5,        # mean quality must be >= 3.5
        "faithfulness": 4.0,   # mean faithfulness must be >= 4.0
    },
)

if not result["all_passed"]:
    for gate in result["report"].gates:
        if not gate.passed:
            print(f"FAILED: {gate.metric_name} = {gate.observed:.2f} < {gate.threshold}")

Gate strictness

Flow-level gates use the point estimate by default. For stricter gating based on the lower CI bound, use latent.stats.threshold_gate directly with strictness="lower_ci".


Statistical Analysis Integration

Every flow that produces scores calls latent.stats.analyze() internally, which provides:

  • Bootstrap confidence intervals for all metrics
  • Paired comparisons when baseline scores are available (via comparison_flow)
  • Effect sizes (Cohen's d, odds ratio)
  • Gate evaluation against thresholds
  • MLflow logging of all metrics, comparisons, and gate results

All flows accept these common statistical parameters:

Parameter Default Purpose
n_resamples 10_000 Bootstrap iterations for CI estimation
confidence_level 0.95 Confidence interval width
seed None Random seed for reproducibility

Common Patterns

Chaining flows

Use retrieval_eval_flow to validate retrieval quality before running the full agent_eval_flow:

from latent.flows import retrieval_eval_flow, agent_eval_flow

# Step 1: Check retrieval quality
retrieval_result = retrieval_eval_flow(
    eval_data=df,
    retriever=my_retriever,
    k=10,
)

# Step 2: Only proceed if coverage is acceptable
coverage = retrieval_result["report"].metrics[0].point_estimate
if coverage >= 3.5:
    eval_result = agent_eval_flow(
        eval_data=df,
        agent_factory=lambda: MyRAGAgent(retriever=my_retriever),
        judge=judge,
        gates={"quality": 3.5},
    )

Simulation-to-scoring pipeline

Simulate conversations, then run detailed scoring and trajectory analysis:

from latent.flows import (
    conversation_simulation_flow,
    conversation_scoring_flow,
    conversation_trajectory_flow,
    results_to_conversations,
)

# Simulate
sim = conversation_simulation_flow(
    agent_class=MyAgent,
    scenarios=scenarios_df,
)

# Convert to Conversation objects
convos = results_to_conversations(sim["simulation_results"])

# Score
scored = conversation_scoring_flow(convos, judge=judge, aggregation="mean")

# Analyze trajectory patterns
traj = conversation_trajectory_flow(
    scored["scored_conversations"],
    score_key="quality",
    reference_pattern=[4.0, 4.0, 4.0],
)

Drift monitoring across runs

Store baseline scores and check for regression on each new evaluation:

import numpy as np
from latent.flows import judge_flow, drift_flow

# Current run
current = judge_flow(eval_data=df, judge=judge)

# Load baseline from previous run (e.g., from MLflow or disk)
baseline_scores = {"quality": np.load("baseline_quality.npy")}
current_scores = {"quality": current["scored_data"]["quality"].values}

drift = drift_flow(
    reference_scores=baseline_scores,
    current_scores=current_scores,
    threshold=0.05,
)

if drift["drift_detected"]:
    print("Quality regression detected")
    print(drift["report_text"])

Custom deterministic scorers with agent_eval_flow

from latent.stats.models import MetricResult

def exact_match_scorer(scored_data, inference_results):
    matches = sum(
        1 for r in inference_results
        if r.get("output", "").strip() == r.get("expected", "").strip()
    )
    total = len(inference_results)
    return {
        "exact_match": MetricResult(
            name="exact_match",
            point_estimate=matches / total if total else 0.0,
            sample_size=total,
        )
    }

result = agent_eval_flow(
    eval_data=df,
    agent_factory=lambda: MyAgent(),
    judge=judge,
    scorers=[exact_match_scorer],
)