Flow Building & Style Guide¶
This guide covers best practices for structuring, organizing, and building flows in Latent. Following these conventions ensures your evaluation pipelines remain maintainable, discoverable, and easy to reason about.
The Colocation Principle¶
Latent embraces colocation as a core design principle: everything related to a flow lives together in the same directory.
flows/
└── my_evaluation/
├── flow.py # Flow and task definitions
├── parameters.yaml # Configuration parameters
├── catalog.yaml # Dataset definitions
└── README.md # Optional: flow documentation
Why Colocation?¶
| Benefit | Description |
|---|---|
| Discoverability | Find everything about a flow in one place |
| Self-contained | Each flow is independently understandable |
| Reduced coupling | Changes to one flow don't affect others |
| Easy onboarding | New team members can focus on one directory |
| Simple deployment | Copy a directory to deploy a flow |
Anti-patterns to Avoid¶
# ❌ Bad: Scattered configuration
project/
├── configs/
│ └── my_flow/
│ └── parameters.yaml
├── catalogs/
│ └── my_flow.yaml
├── flows/
│ └── my_flow.py
└── data/
# ✅ Good: Colocated structure
project/
├── flows/
│ └── my_flow/
│ ├── flow.py
│ ├── parameters.yaml
│ └── catalog.yaml
└── data/
Project Structure¶
Recommended Layout¶
my_project/
├── flows/ # All flow definitions
│ ├── data_ingestion/ # Flow 1
│ │ ├── flow.py
│ │ ├── parameters.yaml
│ │ ├── catalog.yaml
│ │ └── utils.py # Flow-specific utilities
│ │
│ ├── agent_evaluation/ # Flow 2
│ │ ├── flow.py
│ │ ├── parameters.yaml
│ │ ├── catalog.yaml
│ │ ├── evaluators.py # Custom evaluators
│ │ └── agents.py # Agent definitions
│ │
│ └── report_generation/ # Flow 3
│ ├── flow.py
│ ├── parameters.yaml
│ └── catalog.yaml
│
├── data/ # Data storage (gitignored)
│ ├── data_ingestion/
│ │ ├── input/
│ │ └── output/
│ └── agent_evaluation/
│ ├── input/
│ └── output/
│
├── shared/ # Shared utilities across flows
│ ├── __init__.py
│ ├── models.py
│ └── utils.py
│
├── global.yaml # Global configuration
├── pyproject.toml
└── README.md
Directory Naming¶
| Convention | Example | Purpose |
|---|---|---|
| Flow directory | snake_case |
Matches Python module naming |
| Flow name in decorator | Must match directory | @flow("data_ingestion") in flows/data_ingestion/ |
| Input data | data/<flow>/input/ |
Input datasets (filesystem) |
| Output data | mlartifacts/<flow>/ |
Output datasets (MLflow artifacts) |
| Latest symlink | data/<flow>/output/latest |
Points to latest MLflow artifacts |
Flow Name = Directory Name
The flow name in @flow("name") must match the directory name. This is how Latent locates configuration files.
Flow File Organization¶
Single-File Flows¶
For simple flows, keep everything in one file:
# flows/simple_eval/flow.py
"""Simple evaluation flow."""
from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
import pandas as pd
@task("process")
def process_task(data: pd.DataFrame) -> pd.DataFrame:
"""Process the data."""
return data.dropna()
@task("evaluate")
def evaluate_task(data: pd.DataFrame) -> dict:
"""Evaluate results."""
return {"accuracy": 0.95}
@flow("simple_eval")
def simple_eval_flow():
"""Main flow."""
data = load_data()
processed = process_task(data)
results = evaluate_task(processed)
return results
if __name__ == "__main__":
simple_eval_flow()
Multi-File Flows¶
For complex flows, split into logical modules:
flows/complex_eval/
├── flow.py # Flow definition and orchestration
├── tasks.py # Task definitions
├── agents.py # Agent implementations
├── evaluators.py # Custom evaluators
├── utils.py # Helper functions
├── models.py # Pydantic models / data classes
├── parameters.yaml
└── catalog.yaml
# flows/complex_eval/flow.py
"""Complex evaluation flow - orchestration only."""
from latent.prefect import flow, params, logger
from .tasks import load_task, process_task, evaluate_task
from .agents import create_agent
@flow("complex_eval")
def complex_eval_flow():
"""Main orchestration flow."""
logger.info(f"Starting evaluation with model={params.model}")
data = load_task()
agent = create_agent(params.model)
processed = process_task(data, agent)
results = evaluate_task(processed)
return results
# flows/complex_eval/tasks.py
"""Task definitions for complex evaluation."""
from latent.prefect import task, logger
import pandas as pd
@task("load", output="raw_data")
def load_task() -> pd.DataFrame:
"""Load evaluation data."""
logger.info("Loading data...")
return pd.read_csv("data.csv")
@task("process", input="raw_data", output="processed_data")
def process_task(data: pd.DataFrame, agent) -> pd.DataFrame:
"""Process data through agent."""
logger.info(f"Processing {len(data)} rows")
# Processing logic...
return data
@task("evaluate", input="processed_data", output="results")
def evaluate_task(data: pd.DataFrame) -> dict:
"""Run evaluation."""
return {"accuracy": 0.95}
Naming Conventions¶
Flow Names¶
# ✅ Good: Descriptive, snake_case
@flow("agent_evaluation")
@flow("data_preprocessing")
@flow("model_training")
@flow("results_aggregation")
# ❌ Bad: Too generic or inconsistent
@flow("flow1")
@flow("myFlow")
@flow("do-stuff")
Task Names¶
# ✅ Good: Action-oriented, describes what it does
@task("load_conversations")
@task("generate_responses")
@task("calculate_metrics")
@task("export_results")
# ❌ Bad: Vague or too short
@task("process")
@task("do")
@task("step1")
Dataset Names¶
# ✅ Good: Descriptive, indicates content and format
raw_conversations:
type: pandas.Parquet
path: raw_conversations.parquet
dataset_type: input
evaluation_results:
type: json.JSON
path: eval_results.json
dataset_type: output
# ❌ Bad: Generic names
data:
type: pandas.CSV
path: data.csv
output:
type: json.JSON
path: out.json
Flow Design Patterns¶
Pattern 1: Linear Pipeline¶
For straightforward data transformations:
@flow("linear_pipeline")
def linear_pipeline():
"""A → B → C → D"""
raw = load_task()
cleaned = clean_task(raw)
processed = process_task(cleaned)
results = analyze_task(processed)
return results
┌──────┐ ┌───────┐ ┌─────────┐ ┌─────────┐
│ Load │───▶│ Clean │───▶│ Process │───▶│ Analyze │
└──────┘ └───────┘ └─────────┘ └─────────┘
Pattern 2: Fan-Out / Fan-In¶
For parallel processing:
@flow("parallel_eval")
def parallel_eval():
"""Load → [Eval1, Eval2, Eval3] → Aggregate"""
data = load_task()
# Fan out - run evaluators in parallel
results_1 = evaluate_accuracy.submit(data)
results_2 = evaluate_fluency.submit(data)
results_3 = evaluate_safety.submit(data)
# Fan in - aggregate results
combined = aggregate_task(
results_1.result(),
results_2.result(),
results_3.result()
)
return combined
┌─────────────────┐
┌─▶│ Eval Accuracy │──┐
│ └─────────────────┘ │
┌──────┐ │ ┌─────────────────┐ │ ┌───────────┐
│ Load │─────┼─▶│ Eval Fluency │──┼─▶│ Aggregate │
└──────┘ │ └─────────────────┘ │ └───────────┘
│ ┌─────────────────┐ │
└─▶│ Eval Safety │──┘
└─────────────────┘
Pattern 3: Conditional Branching¶
For flows with optional steps:
@flow("conditional_flow")
def conditional_flow():
"""Different paths based on configuration."""
data = load_task()
if params.enable_preprocessing:
data = preprocess_task(data)
if params.evaluation_type == "full":
results = full_evaluation_task(data)
else:
results = quick_evaluation_task(data)
if params.generate_report:
report_task(results)
return results
Pattern 4: Cross-Flow Dependencies¶
For multi-stage pipelines:
# flows/stage_1_ingest/flow.py
@flow("stage_1_ingest", output=["raw_data"])
def ingest_flow():
"""Stage 1: Data ingestion."""
data = fetch_data_task()
return data
# flows/stage_2_process/flow.py
@flow("stage_2_process", input=["stage_1_ingest.raw_data"], output=["processed_data"])
def process_flow(raw_data):
"""Stage 2: Uses output from stage 1."""
processed = transform_task(raw_data)
return processed
# flows/stage_3_evaluate/flow.py
@flow("stage_3_evaluate", input=["stage_2_process.processed_data"])
def evaluate_flow(processed_data):
"""Stage 3: Uses output from stage 2."""
results = evaluate_task(processed_data)
return results
Stable Cross-Flow References with latest¶
When flows complete, Latent automatically maintains a latest symlink pointing to the most recent MLflow run's output artifacts:
data/
flow_a/
input/ # Input data (filesystem)
output/
latest -> ../../../mlartifacts/flow_a/<run_id>/artifacts/outputs/
mlartifacts/
flow_a/
<run_id_1>/
artifacts/
outputs/ # Previous run's outputs
<run_id_2>/
artifacts/
outputs/ # Latest run's outputs
This enables stable cross-flow references that always point to the most recent data:
# flow_a/flow.py
@task("process", output="results")
def process():
return df # Saves to MLflow artifacts: outputs/results.csv
# flow_b/flow.py - references flow_a's latest output
@task("analyze", input="flow_a.results")
def analyze(results):
# Automatically loads from flow_a's latest MLflow artifacts
pass
Benefits:
- ✅ Zero configuration - Works automatically for all flows
- ✅ Full MLflow integration - Outputs tracked as artifacts with lineage
- ✅ Version history - All previous runs accessible via MLflow UI
- ✅ Development-friendly - Latest symlink always points to most recent run
- ✅ Cross-platform - Works on Windows, macOS, and Linux
Development Workflow
The latest link is especially useful when iterating on downstream flows:
- Run flow_a to generate data (saves to MLflow + updates latest link)
- Iterate on flow_b multiple times - always loads from flow_a's latest
- View all runs and artifacts in MLflow UI:
mlflow ui
Combined with environment = "dev" in config (which disables caching),
this provides a smooth development experience with no stale data issues.
Note: The latest symlink is updated every time a flow completes, ensuring downstream flows always reference the most recent data.
Configuration Best Practices¶
Parameters Organization¶
Group related parameters:
# parameters.yaml
# Model configuration
model:
name: gpt-4
temperature: 0.7
max_tokens: 1000
# Evaluation settings
evaluation:
batch_size: 32
num_samples: 100
metrics:
- accuracy
- fluency
- coherence
# Processing options
processing:
enable_cache: true
retry_failed: true
timeout_seconds: 300
Access nested parameters:
from latent.prefect import params
model_name = params.model.name
batch_size = params.evaluation.batch_size
metrics = params.evaluation.metrics
Catalog Organization¶
Keep datasets organized by purpose:
# catalog.yaml
# Input datasets (loaded from data/<flow>/input/)
conversations:
type: pandas.Parquet
path: conversations.parquet
dataset_type: input
evaluation_prompts:
type: json.JSON
path: prompts.json
dataset_type: input
# Output datasets (saved to MLflow artifacts)
agent_responses:
type: pandas.Parquet
path: responses.parquet
dataset_type: output
evaluation_results:
type: json.JSON
path: results.json
dataset_type: output
# Text/Markdown outputs (for reports, summaries, etc.)
evaluation_report:
type: text.Markdown
path: report.md
dataset_type: output
raw_output:
type: text.Text
path: output.txt
dataset_type: output
Cross-Flow References
To reference outputs from another flow, use dot notation in the decorator:
@task("analyze", input="upstream_flow.results")
def analyze_task(results):
# results loaded from upstream_flow's MLflow artifacts
pass
The framework automatically loads from the source flow's output/latest symlink.
Supported Dataset Types¶
| Type | Extension | Description |
|---|---|---|
pandas.CSV |
.csv |
CSV files loaded as DataFrames |
pandas.Parquet |
.parquet |
Parquet files (recommended for large datasets) |
pandas.JSON |
.jsonl |
JSON Lines format (one JSON object per line) |
json.JSON |
.json |
Regular JSON files (dicts or lists) |
pickle.Pickle |
.pkl |
Pickle files (for complex Python objects) |
text.Text |
.txt |
Plain text files (strings) |
text.Markdown |
.md |
Markdown files (strings) |
Text Outputs for Reports
Use text.Markdown for generating human-readable reports, summaries, or documentation as part of your flow outputs:
```python @task("generate_report", output="evaluation_report") def generate_report(results: dict) -> str: """Generate a markdown report from evaluation results.""" return f"""# Evaluation Report
Summary¶
- Total samples: {results['total']}
- Accuracy: {results['accuracy']:.2%}
Details¶
{results['details']} """ ```
Multiple File Outputs
Return a dict[str, str] to save multiple text/markdown files in one task:
```python @task("generate_model_reports", output="model_reports") def generate_model_reports(results: pd.DataFrame) -> dict[str, str]: """Generate a report for each model.""" reports = {} for model_name, group in results.groupby('model'): report = f"""# {model_name}
Performance¶
- Accuracy: {group['accuracy'].mean():.2%}
- Latency: {group['latency'].mean():.0f}ms
Details¶
{group.describe().to_markdown()} """ reports[f"{model_name.lower().replace(' ', '_')}.md"] = report return reports # Saves: reports/gpt_4.md, reports/claude.md, etc.
**Loading Multiple Files:**
```python
@task("aggregate_reports", input="model_reports")
def aggregate_reports(reports: dict[str, str]):
"""Aggregate all model reports."""
# reports = {"gpt_4.md": "# GPT-4\n...", "claude.md": "# Claude\n..."}
combined = "\n\n---\n\n".join(reports.values())
return combined
Stable Artifacts¶
The data/stable/ directory holds curated, version-controlled deployment artifacts (glossaries, config files, ChromaDB indexes, etc.) that are shared across flows. Instead of constructing paths manually, access them through catalog.stable:
from latent.prefect import flow, task
from latent.datasets import get_catalog_datasets
@flow("my_flow")
def my_flow():
catalog = get_catalog_datasets()
# Load a file by stem name — type is inferred from extension
glossary = catalog.stable.glossary # loads data/stable/glossary.json
config = catalog.stable.app_config # loads data/stable/app_config.toml
prompts = catalog.stable.system_prompts # loads data/stable/system_prompts.yaml
# Get the Path instead (for directories or unsupported extensions)
chroma_path = catalog.stable.chroma_db_path # Path to data/stable/chroma_db/
raw_path = catalog.stable.binary_data_path # Path to data/stable/binary_data.bin
How It Works¶
The accessor scans data/stable/ and maps each file's stem (filename without extension) to its path. You access files in two ways:
| Access pattern | What it does | Example |
|---|---|---|
catalog.stable.<stem> |
Loads and returns parsed data | catalog.stable.glossary returns {"term": "..."} |
catalog.stable.<stem>_path |
Returns the Path object |
catalog.stable.glossary_path returns Path(".../glossary.json") |
Loaded data is cached — repeated access to the same stem returns the cached result without re-reading the file.
Supported File Types¶
| Extension | Loaded as | Notes |
|---|---|---|
.json |
dict or list |
Via json.JSON |
.csv |
DataFrame | Via pandas.CSV or polars.CSV |
.parquet |
DataFrame | Via pandas.Parquet or polars.Parquet |
.toml |
dict |
Via tomllib |
.yaml / .yml |
dict |
Via yaml.safe_load |
.txt |
str |
Via text.Text |
.md |
str |
Via text.Markdown |
.pkl |
any | Via pickle.Pickle |
| directories | N/A | Use _path suffix instead |
| other | N/A | Use _path suffix instead |
Example Directory Layout¶
data/stable/
├── glossary.json # catalog.stable.glossary -> dict
├── prompts.yaml # catalog.stable.prompts -> dict
├── lookup_table.csv # catalog.stable.lookup_table -> DataFrame
├── app_config.toml # catalog.stable.app_config -> dict
├── chroma_db/ # catalog.stable.chroma_db_path -> Path
│ ├── index.bin
│ └── metadata.json
└── .gitkeep # hidden files are ignored
Configuration¶
The stable directory defaults to data/stable/ relative to the workspace root. Override it via:
Or with an environment variable:
Version-Controlled Directory
Unlike other workspace directories, data/stable/ is not auto-created.
It should be checked into version control alongside your flow configurations.
Using in Tasks¶
@task("enrich")
def enrich_task(raw_data: pd.DataFrame):
catalog = get_catalog_datasets()
glossary = catalog.stable.glossary
return raw_data.assign(
label=raw_data["code"].map(glossary)
)
Tab Completion¶
StableAccessor implements __dir__, so tab completion works in IPython and Jupyter:
>>> catalog.stable.<TAB>
glossary glossary_path prompts prompts_path
chroma_db_path lookup_table lookup_table_path ...
Code Style Guidelines¶
Import Order¶
"""Flow module docstring."""
# Standard library
import json
from datetime import datetime
from pathlib import Path
# Third-party
import pandas as pd
from pydantic import BaseModel
# Latent imports
from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
# Local imports
from .agents import MyAgent
from .evaluators import custom_metric
Type Hints¶
Always use type hints for task inputs and outputs:
@task("process")
def process_task(
data: pd.DataFrame,
config: dict[str, Any],
threshold: float = 0.5,
) -> pd.DataFrame:
"""Process data with given configuration.
Args:
data: Input DataFrame to process
config: Processing configuration
threshold: Minimum threshold for filtering
Returns:
Processed DataFrame
"""
return data[data["score"] >= threshold]
Docstrings¶
Use Google-style docstrings:
@flow("my_evaluation")
def my_evaluation_flow():
"""Run agent evaluation pipeline.
This flow performs the following steps:
1. Load evaluation dataset
2. Run agent on each example
3. Calculate evaluation metrics
4. Generate report
Configuration:
- model.name: The model to evaluate
- evaluation.batch_size: Batch size for processing
Outputs:
- evaluation_results: JSON file with metrics
- agent_responses: Parquet file with all responses
"""
...
Logging Best Practices¶
@task("process")
def process_task(data: pd.DataFrame) -> pd.DataFrame:
# ✅ Good: Structured, informative logs
logger.info(
f"Processing {len(data)} rows "
f"with batch_size={params.batch_size}"
)
result = process(data)
logger.info(
f"Processing complete: "
f"input={len(data)}, output={len(result)}, "
f"dropped={len(data) - len(result)}"
)
# ❌ Bad: Vague, unhelpful logs
# logger.info("Processing...")
# logger.info("Done")
return result
Testing Flows¶
Unit Testing Tasks¶
# tests/unit/test_my_flow.py
from unittest.mock import patch
import pandas as pd
import pytest
from latent.testing import mock_catalog, mock_config
from flows.my_evaluation.tasks import process_task
def test_process_task():
"""Test process_task in isolation."""
# Arrange
input_data = pd.DataFrame({"value": [1, 2, None, 4]})
expected = pd.DataFrame({"value": [1, 2, 4]})
with mock_config({"batch_size": 10}):
with mock_catalog():
# Act
result = process_task.fn(input_data)
# Assert
pd.testing.assert_frame_equal(result, expected)
Integration Testing Flows¶
# tests/integration/test_my_flow.py
from latent.testing import mock_config, mock_catalog
def test_full_flow():
"""Test the complete flow execution."""
with mock_config({"model": "test-model", "batch_size": 5}):
with mock_catalog({"input_data": test_data}):
from flows.my_evaluation.flow import my_evaluation_flow
result = my_evaluation_flow()
assert result is not None
assert "accuracy" in result
Common Pitfalls¶
Common Mistakes
1. Mismatched flow name and directory
2. Circular task dependencies
@task("a", input="b_output") # ❌ Creates cycle
def task_a(): ...
@task("b", input="a_output") # ❌ Creates cycle
def task_b(): ...
3. Missing catalog entries for I/O datasets
4. Hardcoded paths instead of using catalog
See Also¶
- Getting Started - Initial project setup
- Prefect Flows & Tasks - Decorator reference
- Workspace Configuration - Environment setup
- Examples - Complete working examples