Examples¶
Real-world examples of using Latent for evaluation workflows.
Basic Pipeline¶
A simple data processing pipeline:
from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
import pandas as pd
@task("extract", input="raw_data", output="extracted_data")
def extract_task(raw_data: pd.DataFrame) -> pd.DataFrame:
logger.info(f"Extracting {len(raw_data)} rows")
extracted = raw_data[["id", "value", "timestamp"]]
mlflow.log_metric("extracted_rows", len(extracted))
return extracted
@task("transform", input="extracted_data", output="transformed_data")
def transform_task(data: pd.DataFrame) -> pd.DataFrame:
logger.info(f"Transforming data")
data["value_normalized"] = (data["value"] - data["value"].mean()) / data["value"].std()
mlflow.log_metric("mean_value", data["value"].mean())
return data
@task("load", input="transformed_data", output="final_results")
def load_task(data: pd.DataFrame) -> dict:
results = {
"total_rows": len(data),
"mean_normalized": float(data["value_normalized"].mean())
}
logger.info(f"Pipeline complete: {results}")
mlflow.log_metrics(results)
return results
@flow("etl_pipeline")
def etl_pipeline():
extracted = extract_task()
transformed = transform_task(extracted)
results = load_task(transformed)
return results
LLM Evaluation Pipeline¶
Evaluate LLM responses:
from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
from agents import LLMAgent, Judge
@task("generate_responses", input="prompts", output="responses", cache=False)
def generate_responses_task(prompts: list) -> list:
agent = LLMAgent(model=params.model_name)
responses = []
for i, prompt in enumerate(prompts):
response = agent.complete(prompt)
responses.append(response)
logger.info(f"Generated response {i+1}/{len(prompts)}")
mlflow.log_param("model", params.model_name)
mlflow.log_metric("num_responses", len(responses))
return responses
@task("evaluate_responses", input="responses", output="evaluations")
def evaluate_responses_task(responses: list) -> dict:
judge = Judge()
scores = []
for response in responses:
score = judge.evaluate(response)
scores.append(score)
results = {
"mean_score": sum(scores) / len(scores),
"min_score": min(scores),
"max_score": max(scores)
}
mlflow.log_metrics(results)
logger.info(f"Evaluation results: {results}")
return results
@flow("llm_evaluation")
def llm_evaluation_flow():
responses = generate_responses_task()
evaluation = evaluate_responses_task(responses)
return evaluation
Parallel Processing¶
Process items in parallel with concurrency control:
@task("process_item")
def process_item_task(item: dict) -> dict:
logger.info(f"Processing item {item['id']}")
result = expensive_computation(item)
mlflow.log_metric(f"item_{item['id']}_score", result["score"])
return result
@task("aggregate_results", output="results")
def aggregate_results_task(results: list) -> dict:
total_score = sum(r["score"] for r in results)
avg_score = total_score / len(results)
mlflow.log_metric("average_score", avg_score)
return {"average_score": avg_score, "results": results}
@flow("parallel_processing", input="input_items")
def parallel_processing_flow(input_items: list):
# Process all items in parallel with concurrency limit
futures = process_item_task.map(input_items, concurrency=5)
# Wait for completion
results = [f.result() for f in futures]
# Aggregate
final_results = aggregate_results_task(results)
return final_results
With DataFrame input:
@task("process_row")
def process_row_task(row: dict) -> dict:
return {"id": row["id"], "processed": row["value"] * 2}
@flow("dataframe_processing", input="data")
def dataframe_flow(data: pd.DataFrame):
# DataFrame rows are automatically converted to dicts
futures = process_row_task.map(data, concurrency=3)
results = [f.result() for f in futures]
return results
Multi-Stage Pipeline¶
Chain multiple flows:
# Flow 1: Data Collection
@flow("data_collection", output=["raw_conversations"])
def data_collection_flow():
conversations = collect_conversations()
return conversations
# Flow 2: Processing (depends on Flow 1)
@flow(
"data_processing",
input=["data_collection.raw_conversations"],
output=["processed_conversations"]
)
def data_processing_flow(raw_conversations):
processed = process(raw_conversations)
return processed
# Flow 3: Evaluation (depends on Flow 2)
@flow(
"evaluation",
input=["data_processing.processed_conversations"],
output=["evaluation_results"]
)
def evaluation_flow(processed_conversations):
results = evaluate(processed_conversations)
return results
# Run all flows in sequence
if __name__ == "__main__":
data_collection_flow()
data_processing_flow()
evaluation_flow()