MLflow Tracking¶
Latent automatically integrates MLflow for experiment tracking, metrics logging, and artifact management.
Automatic Tracking¶
Every flow automatically creates an MLflow experiment and run:
from latent.prefect import flow, task
from latent.mlflow import mlflow
@task("train_model")
def train_model_task(data):
# MLflow run is already active
mlflow.log_param("model_type", "random_forest")
mlflow.log_param("n_estimators", 100)
model = train(data)
mlflow.log_metric("accuracy", 0.95)
mlflow.log_metric("f1_score", 0.93)
# Save model artifact
mlflow.sklearn.log_model(model, "model")
return model
@flow("training_flow")
def training_flow():
# Experiment "training_flow" is automatically created
data = load_data()
model = train_model_task(data)
return model
Logging Metrics¶
from latent.mlflow import mlflow
# Log a single metric
mlflow.log_metric("accuracy", 0.95)
# Log multiple metrics
mlflow.log_metrics({
"precision": 0.94,
"recall": 0.92,
"f1": 0.93
})
# Log metric at a specific step
mlflow.log_metric("loss", 0.5, step=1)
mlflow.log_metric("loss", 0.3, step=2)
Logging Parameters¶
# Log parameters
mlflow.log_param("learning_rate", 0.001)
mlflow.log_param("batch_size", 32)
# Log multiple parameters
mlflow.log_params({
"optimizer": "adam",
"epochs": 100,
"model_name": "gpt-4"
})
Logging Artifacts¶
# Log a file
mlflow.log_artifact("results/plot.png")
# Log entire directory
mlflow.log_artifacts("outputs/")
# Log text content
mlflow.log_text("Model predictions...", "predictions.txt")
# Log dictionary as JSON
mlflow.log_dict({"config": "value"}, "config.json")
Configuration¶
Configure MLflow in config/latent.toml:
[mlflow]
# Enable MLflow experiment tracking (default: true)
enabled = true
# MLflow tracking URI (default: SQLite in mlruns/)
# tracking_uri = "http://localhost:5000"
# Enable LiteLLM auto-tracing for LLM calls (default: true)
litellm_autolog = true
# Enable LangChain/LangGraph auto-tracing (default: true)
langchain_autolog = true
Or in parameters.yaml for flow-specific settings:
Artifact Storage¶
All flow outputs are automatically saved to MLflow artifacts:
A latest symlink is maintained for cross-flow references:
This enables downstream flows to always reference the latest outputs using dot notation:
@task("analyze", input="my_flow.results")
def analyze_task(results):
# results loaded from my_flow's latest MLflow artifacts
pass
Viewing Results¶
Start the MLflow UI:
Then open http://localhost:5000
Advanced Usage¶
Manual Run Control¶
from latent.mlflow import setup, end_mlflow_run
@flow("custom_flow")
def custom_flow():
# Normally handled automatically
run_id = setup("my_experiment", "run-id", parameters={})
try:
# Your logic
pass
finally:
end_mlflow_run()
Nested Runs¶
@task("subtask")
def subtask(data):
# Create nested run for this task
with mlflow.start_run(nested=True):
mlflow.log_metric("subtask_metric", 0.5)
return process(data)
LLM Autologging¶
Latent automatically logs LLM API calls and agent interactions using MLflow's autologging capabilities. This works with both LiteLLM and LangChain/LangGraph.
Supported Frameworks¶
LiteLLM Autologging¶
Automatically logs all LiteLLM API calls (OpenAI, Anthropic, Cohere, etc.) including: - Input prompts and output responses - Token usage and costs - Latency and performance metrics - Model parameters (temperature, max_tokens, etc.)
Requires: MLflow >= 2.14.0
from litellm import completion
from latent.prefect import flow, task
from latent.mlflow import mlflow
@task("generate_response")
def generate_response_task(question: str) -> str:
# This call is automatically logged to MLflow
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": question}],
temperature=0.7,
)
return response.choices[0].message.content
@flow("litellm_flow")
def litellm_flow():
answer = generate_response_task("What is machine learning?")
return answer
LangChain/LangGraph Autologging¶
Automatically logs all LangChain chains, agents, and tools including: - Chain inputs and outputs - Agent reasoning steps - Tool calls and results - Intermediate steps and traces
Requires: MLflow >= 2.3.0
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from latent.prefect import flow, task
@task("run_chain")
def run_chain_task(question: str) -> str:
# Create LLM and prompt using modern LCEL syntax
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "{question}")
])
# Build chain with pipe syntax - automatically logged to MLflow
chain = prompt | llm | StrOutputParser()
return chain.invoke({"question": question})
@flow("langchain_flow")
def langchain_flow():
answer = run_chain_task("What is deep learning?")
return answer
LangGraph Agent Example:
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from latent.prefect import flow, task
@task("run_agent")
def run_agent_task(question: str) -> str:
# LangGraph agents are automatically traced in MLflow
from typing import TypedDict
class AgentState(TypedDict):
question: str
answer: str
llm = ChatOpenAI(model="gpt-4")
def think(state: AgentState) -> AgentState:
response = llm.invoke(state["question"])
return {"question": state["question"], "answer": response.content}
# Build graph - all steps are traced
graph = StateGraph(AgentState)
graph.add_node("think", think)
graph.set_entry_point("think")
graph.add_edge("think", END)
agent = graph.compile()
result = agent.invoke({"question": question, "answer": ""})
return result["answer"]
Configuration¶
Configure autologging in your config/latent.toml or parameters.yaml:
In config/latent.toml:
[mlflow]
# Enable LiteLLM autologging (default: true)
litellm_autolog = true
# Enable LangChain/LangGraph autologging (default: true)
langchain_autolog = true
In parameters.yaml (flow-specific):
mlflow:
enabled: true
litellm_autolog: true # Enable for LiteLLM calls
langchain_autolog: true # Enable for LangChain calls
Disabling Autologging¶
To disable autologging for specific frameworks, set the config option to false:
Global configuration (config/latent.toml):
[mlflow]
litellm_autolog = false # Disable LiteLLM logging
langchain_autolog = true # Keep LangChain logging
Per-flow configuration (parameters.yaml):
Viewing Traces in MLflow¶
All autologged traces appear in the MLflow UI:
-
Start the MLflow UI:
-
Navigate to your experiment (named after your flow)
-
Click on a run to see:
- Traces: Full trace tree of LLM calls
- Metrics: Token usage, latency, costs
- Parameters: Model settings, prompts
- Artifacts: Request/response payloads
Best Practices¶
- Always Enable: Keep autologging enabled to track all LLM interactions
- Review Costs: Check token usage metrics to optimize spending
- Compare Models: Use traces to compare different model configurations
- Debug Failures: Inspect traces to understand agent reasoning
- Production Monitoring: Use autolog in production for observability
Advanced: Manual Control¶
If you need fine-grained control over autologging:
from latent.mlflow import mlflow
# Disable autologging temporarily
mlflow.litellm.autolog(disable=True)
# Your code without logging
response = completion(model="gpt-4", messages=[...])
# Re-enable autologging
mlflow.litellm.autolog()
LLM Evaluation with MLflow¶
Latent provides powerful wrappers around MLflow's evaluation features optimized for LLM agent workflows.
Basic Evaluation¶
from latent.prefect import flow, task
from latent.mlflow import evaluate
import pandas as pd
@task("evaluate_agent", input="eval_data", output="results")
def evaluate_task(eval_data: pd.DataFrame) -> pd.DataFrame:
# Define your agent
def my_agent(question):
# Your LLM logic here
return generate_response(question)
# Evaluate with built-in metrics
results = evaluate(
data=eval_data,
agent=my_agent,
evaluators=["default"], # Built-in evaluators
model_type="text",
)
return results
Custom Evaluators¶
Create custom evaluation metrics with the @evaluator decorator:
from latent.mlflow import evaluator
import numpy as np
@evaluator(name="factual_accuracy", greater_is_better=True)
def factual_accuracy(eval_df, builtin_metrics):
"""Check factual accuracy using an LLM judge."""
from litellm import completion
scores = []
for _, row in eval_df.iterrows():
# Use LLM as judge
judge_prompt = f"""
Rate the factual accuracy of this response (1-5):
Question: {row['inputs']}
Response: {row['outputs']}
Expected: {row['ground_truth']}
"""
response = completion(
model="gpt-4",
messages=[{"role": "user", "content": judge_prompt}]
)
score = int(response.choices[0].message.content.strip())
scores.append(score / 5.0) # Normalize to 0-1
return scores
# Use in evaluation
results = evaluate(
data=eval_data,
agent=my_agent,
evaluators=["default", factual_accuracy],
)
Using the targets Field¶
Specify which column contains expected outputs in your catalog:
# catalog.yaml
eval_dataset:
type: pandas.CSV
path: eval_data.csv
targets: expected_output # Column with ground truth
The targets field is automatically used by MLflow for tracking and metrics calculation.
Built-in Evaluators¶
MLflow provides several built-in evaluators for LLM outputs:
"default": Basic text metrics (toxicity, flesch_kincaid, etc.)"answer_relevance": How relevant is the answer to the question"faithfulness": Does the answer stay true to the context"answer_similarity": Similarity to ground truth
results = evaluate(
data=eval_data,
agent=my_agent,
evaluators=["default", "answer_relevance", "faithfulness"],
)
Multiple Custom Evaluators¶
Mix and match built-in and custom evaluators:
@evaluator(name="coherence")
def coherence_metric(eval_df, builtin_metrics):
# Calculate coherence scores
return [calculate_coherence(text) for text in eval_df['outputs']]
@evaluator(name="completeness")
def completeness_metric(eval_df, builtin_metrics):
# Calculate completeness scores
return [check_completeness(text) for text in eval_df['outputs']]
results = evaluate(
data=eval_data,
agent=my_agent,
evaluators=[
"default",
"answer_relevance",
coherence_metric,
completeness_metric,
],
)
Evaluation Results¶
The evaluate() function returns a DataFrame with all metrics:
results = evaluate(data=eval_data, agent=my_agent, evaluators=["default"])
# Results DataFrame contains one row with all metrics
print(results.columns)
# ['accuracy', 'latency', 'toxicity/mean', 'flesch_kincaid_grade_level/mean', ...]
# Access specific metrics
accuracy = results['accuracy'].iloc[0]
latency = results['latency'].iloc[0]
All metrics are automatically logged to MLflow and visible in the MLflow UI.
Dataset Lineage¶
MLflow automatically tracks: - Which dataset was used for evaluation - MD5 hash of the dataset - Schema and statistics - Link to the specific run
This creates a complete lineage from data → evaluation → results.
Best Practices¶
- Log Early and Often: Log parameters at the start, metrics throughout
- Meaningful Names: Use descriptive metric names
- Track Everything: Parameters, metrics, artifacts, and models
- Use Tags: Organize experiments with tags
- Specify Targets: Always set the
targetsfield in catalog.yaml for eval datasets - Custom Evaluators: Create task-specific evaluators for your use case
- LLM-as-Judge: Use GPT-4 or Claude for nuanced evaluation metrics