Skip to content

Examples

Real-world examples of using Latent for evaluation workflows.

Basic Pipeline

A simple data processing pipeline:

from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
import pandas as pd

@task("extract", input="raw_data", output="extracted_data")
def extract_task(raw_data: pd.DataFrame) -> pd.DataFrame:
    logger.info(f"Extracting {len(raw_data)} rows")
    extracted = raw_data[["id", "value", "timestamp"]]
    mlflow.log_metric("extracted_rows", len(extracted))
    return extracted

@task("transform", input="extracted_data", output="transformed_data")
def transform_task(data: pd.DataFrame) -> pd.DataFrame:
    logger.info(f"Transforming data")
    data["value_normalized"] = (data["value"] - data["value"].mean()) / data["value"].std()
    mlflow.log_metric("mean_value", data["value"].mean())
    return data

@task("load", input="transformed_data", output="final_results")
def load_task(data: pd.DataFrame) -> dict:
    results = {
        "total_rows": len(data),
        "mean_normalized": float(data["value_normalized"].mean())
    }
    logger.info(f"Pipeline complete: {results}")
    mlflow.log_metrics(results)
    return results

@flow("etl_pipeline")
def etl_pipeline():
    extracted = extract_task()
    transformed = transform_task(extracted)
    results = load_task(transformed)
    return results

LLM Evaluation Pipeline

Evaluate LLM responses:

from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
from agents import LLMAgent, Judge

@task("generate_responses", input="prompts", output="responses", cache=False)
def generate_responses_task(prompts: list) -> list:
    agent = LLMAgent(model=params.model_name)
    responses = []

    for i, prompt in enumerate(prompts):
        response = agent.complete(prompt)
        responses.append(response)
        logger.info(f"Generated response {i+1}/{len(prompts)}")

    mlflow.log_param("model", params.model_name)
    mlflow.log_metric("num_responses", len(responses))
    return responses

@task("evaluate_responses", input="responses", output="evaluations")
def evaluate_responses_task(responses: list) -> dict:
    judge = Judge()
    scores = []

    for response in responses:
        score = judge.evaluate(response)
        scores.append(score)

    results = {
        "mean_score": sum(scores) / len(scores),
        "min_score": min(scores),
        "max_score": max(scores)
    }

    mlflow.log_metrics(results)
    logger.info(f"Evaluation results: {results}")
    return results

@flow("llm_evaluation")
def llm_evaluation_flow():
    responses = generate_responses_task()
    evaluation = evaluate_responses_task(responses)
    return evaluation

Parallel Processing

Process items in parallel with concurrency control:

@task("process_item")
def process_item_task(item: dict) -> dict:
    logger.info(f"Processing item {item['id']}")
    result = expensive_computation(item)
    mlflow.log_metric(f"item_{item['id']}_score", result["score"])
    return result

@task("aggregate_results", output="results")
def aggregate_results_task(results: list) -> dict:
    total_score = sum(r["score"] for r in results)
    avg_score = total_score / len(results)

    mlflow.log_metric("average_score", avg_score)
    return {"average_score": avg_score, "results": results}

@flow("parallel_processing", input="input_items")
def parallel_processing_flow(input_items: list):
    # Process all items in parallel with concurrency limit
    futures = process_item_task.map(input_items, concurrency=5)

    # Wait for completion
    results = [f.result() for f in futures]

    # Aggregate
    final_results = aggregate_results_task(results)
    return final_results

With DataFrame input:

@task("process_row")
def process_row_task(row: dict) -> dict:
    return {"id": row["id"], "processed": row["value"] * 2}

@flow("dataframe_processing", input="data")
def dataframe_flow(data: pd.DataFrame):
    # DataFrame rows are automatically converted to dicts
    futures = process_row_task.map(data, concurrency=3)
    results = [f.result() for f in futures]
    return results

Multi-Stage Pipeline

Chain multiple flows:

# Flow 1: Data Collection
@flow("data_collection", output=["raw_conversations"])
def data_collection_flow():
    conversations = collect_conversations()
    return conversations

# Flow 2: Processing (depends on Flow 1)
@flow(
    "data_processing",
    input=["data_collection.raw_conversations"],
    output=["processed_conversations"]
)
def data_processing_flow(raw_conversations):
    processed = process(raw_conversations)
    return processed

# Flow 3: Evaluation (depends on Flow 2)
@flow(
    "evaluation",
    input=["data_processing.processed_conversations"],
    output=["evaluation_results"]
)
def evaluation_flow(processed_conversations):
    results = evaluate(processed_conversations)
    return results

# Run all flows in sequence
if __name__ == "__main__":
    data_collection_flow()
    data_processing_flow()
    evaluation_flow()

See Also