Skip to content

Flow Building & Style Guide

This guide covers best practices for structuring, organizing, and building flows in Latent. Following these conventions ensures your evaluation pipelines remain maintainable, discoverable, and easy to reason about.

The Colocation Principle

Latent embraces colocation as a core design principle: everything related to a flow lives together in the same directory.

flows/
└── my_evaluation/
    ├── flow.py           # Flow and task definitions
    ├── parameters.yaml   # Configuration parameters
    ├── catalog.yaml      # Dataset definitions
    └── README.md         # Optional: flow documentation

Why Colocation?

Benefit Description
Discoverability Find everything about a flow in one place
Self-contained Each flow is independently understandable
Reduced coupling Changes to one flow don't affect others
Easy onboarding New team members can focus on one directory
Simple deployment Copy a directory to deploy a flow

Anti-patterns to Avoid

# ❌ Bad: Scattered configuration
project/
├── configs/
│   └── my_flow/
│       └── parameters.yaml
├── catalogs/
│   └── my_flow.yaml
├── flows/
│   └── my_flow.py
└── data/

# ✅ Good: Colocated structure
project/
├── flows/
│   └── my_flow/
│       ├── flow.py
│       ├── parameters.yaml
│       └── catalog.yaml
└── data/

Project Structure

my_project/
├── flows/                    # All flow definitions
│   ├── data_ingestion/       # Flow 1
│   │   ├── flow.py
│   │   ├── parameters.yaml
│   │   ├── catalog.yaml
│   │   └── utils.py          # Flow-specific utilities
│   │
│   ├── agent_evaluation/     # Flow 2
│   │   ├── flow.py
│   │   ├── parameters.yaml
│   │   ├── catalog.yaml
│   │   ├── evaluators.py     # Custom evaluators
│   │   └── agents.py         # Agent definitions
│   │
│   └── report_generation/    # Flow 3
│       ├── flow.py
│       ├── parameters.yaml
│       └── catalog.yaml
├── data/                     # Data storage (gitignored)
│   ├── data_ingestion/
│   │   ├── input/
│   │   └── output/
│   └── agent_evaluation/
│       ├── input/
│       └── output/
├── shared/                   # Shared utilities across flows
│   ├── __init__.py
│   ├── models.py
│   └── utils.py
├── global.yaml               # Global configuration
├── pyproject.toml
└── README.md

Directory Naming

Convention Example Purpose
Flow directory snake_case Matches Python module naming
Flow name in decorator Must match directory @flow("data_ingestion") in flows/data_ingestion/
Input data data/<flow>/input/ Input datasets (filesystem)
Output data mlartifacts/<flow>/ Output datasets (MLflow artifacts)
Latest symlink data/<flow>/output/latest Points to latest MLflow artifacts

Flow Name = Directory Name

The flow name in @flow("name") must match the directory name. This is how Latent locates configuration files.

# flows/my_evaluation/flow.py
@flow("my_evaluation")  # ✅ Matches directory name
def my_evaluation_flow():
    pass

@flow("evaluation")  # ❌ Won't find parameters.yaml!
def my_evaluation_flow():
    pass

Flow File Organization

Single-File Flows

For simple flows, keep everything in one file:

# flows/simple_eval/flow.py
"""Simple evaluation flow."""

from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
import pandas as pd


@task("process")
def process_task(data: pd.DataFrame) -> pd.DataFrame:
    """Process the data."""
    return data.dropna()


@task("evaluate")
def evaluate_task(data: pd.DataFrame) -> dict:
    """Evaluate results."""
    return {"accuracy": 0.95}


@flow("simple_eval")
def simple_eval_flow():
    """Main flow."""
    data = load_data()
    processed = process_task(data)
    results = evaluate_task(processed)
    return results


if __name__ == "__main__":
    simple_eval_flow()

Multi-File Flows

For complex flows, split into logical modules:

flows/complex_eval/
├── flow.py           # Flow definition and orchestration
├── tasks.py          # Task definitions
├── agents.py         # Agent implementations
├── evaluators.py     # Custom evaluators
├── utils.py          # Helper functions
├── models.py         # Pydantic models / data classes
├── parameters.yaml
└── catalog.yaml
# flows/complex_eval/flow.py
"""Complex evaluation flow - orchestration only."""

from latent.prefect import flow, params, logger
from .tasks import load_task, process_task, evaluate_task
from .agents import create_agent


@flow("complex_eval")
def complex_eval_flow():
    """Main orchestration flow."""
    logger.info(f"Starting evaluation with model={params.model}")

    data = load_task()
    agent = create_agent(params.model)
    processed = process_task(data, agent)
    results = evaluate_task(processed)

    return results
# flows/complex_eval/tasks.py
"""Task definitions for complex evaluation."""

from latent.prefect import task, logger
import pandas as pd


@task("load", output="raw_data")
def load_task() -> pd.DataFrame:
    """Load evaluation data."""
    logger.info("Loading data...")
    return pd.read_csv("data.csv")


@task("process", input="raw_data", output="processed_data")
def process_task(data: pd.DataFrame, agent) -> pd.DataFrame:
    """Process data through agent."""
    logger.info(f"Processing {len(data)} rows")
    # Processing logic...
    return data


@task("evaluate", input="processed_data", output="results")
def evaluate_task(data: pd.DataFrame) -> dict:
    """Run evaluation."""
    return {"accuracy": 0.95}

Naming Conventions

Flow Names

# ✅ Good: Descriptive, snake_case
@flow("agent_evaluation")
@flow("data_preprocessing")
@flow("model_training")
@flow("results_aggregation")

# ❌ Bad: Too generic or inconsistent
@flow("flow1")
@flow("myFlow")
@flow("do-stuff")

Task Names

# ✅ Good: Action-oriented, describes what it does
@task("load_conversations")
@task("generate_responses")
@task("calculate_metrics")
@task("export_results")

# ❌ Bad: Vague or too short
@task("process")
@task("do")
@task("step1")

Dataset Names

# ✅ Good: Descriptive, indicates content and format
raw_conversations:
  type: pandas.Parquet
  path: raw_conversations.parquet
  dataset_type: input

evaluation_results:
  type: json.JSON
  path: eval_results.json
  dataset_type: output

# ❌ Bad: Generic names
data:
  type: pandas.CSV
  path: data.csv

output:
  type: json.JSON
  path: out.json

Flow Design Patterns

Pattern 1: Linear Pipeline

For straightforward data transformations:

@flow("linear_pipeline")
def linear_pipeline():
    """A → B → C → D"""
    raw = load_task()
    cleaned = clean_task(raw)
    processed = process_task(cleaned)
    results = analyze_task(processed)
    return results
┌──────┐    ┌───────┐    ┌─────────┐    ┌─────────┐
│ Load │───▶│ Clean │───▶│ Process │───▶│ Analyze │
└──────┘    └───────┘    └─────────┘    └─────────┘

Pattern 2: Fan-Out / Fan-In

For parallel processing:

@flow("parallel_eval")
def parallel_eval():
    """Load → [Eval1, Eval2, Eval3] → Aggregate"""
    data = load_task()

    # Fan out - run evaluators in parallel
    results_1 = evaluate_accuracy.submit(data)
    results_2 = evaluate_fluency.submit(data)
    results_3 = evaluate_safety.submit(data)

    # Fan in - aggregate results
    combined = aggregate_task(
        results_1.result(),
        results_2.result(),
        results_3.result()
    )
    return combined
                ┌─────────────────┐
             ┌─▶│ Eval Accuracy   │──┐
             │  └─────────────────┘  │
┌──────┐     │  ┌─────────────────┐  │  ┌───────────┐
│ Load │─────┼─▶│ Eval Fluency    │──┼─▶│ Aggregate │
└──────┘     │  └─────────────────┘  │  └───────────┘
             │  ┌─────────────────┐  │
             └─▶│ Eval Safety     │──┘
                └─────────────────┘

Pattern 3: Conditional Branching

For flows with optional steps:

@flow("conditional_flow")
def conditional_flow():
    """Different paths based on configuration."""
    data = load_task()

    if params.enable_preprocessing:
        data = preprocess_task(data)

    if params.evaluation_type == "full":
        results = full_evaluation_task(data)
    else:
        results = quick_evaluation_task(data)

    if params.generate_report:
        report_task(results)

    return results

Pattern 4: Cross-Flow Dependencies

For multi-stage pipelines:

# flows/stage_1_ingest/flow.py
@flow("stage_1_ingest", output=["raw_data"])
def ingest_flow():
    """Stage 1: Data ingestion."""
    data = fetch_data_task()
    return data

# flows/stage_2_process/flow.py
@flow("stage_2_process", input=["stage_1_ingest.raw_data"], output=["processed_data"])
def process_flow(raw_data):
    """Stage 2: Uses output from stage 1."""
    processed = transform_task(raw_data)
    return processed

# flows/stage_3_evaluate/flow.py
@flow("stage_3_evaluate", input=["stage_2_process.processed_data"])
def evaluate_flow(processed_data):
    """Stage 3: Uses output from stage 2."""
    results = evaluate_task(processed_data)
    return results

Stable Cross-Flow References with latest

When flows complete, Latent automatically maintains a latest symlink pointing to the most recent MLflow run's output artifacts:

data/
  flow_a/
    input/              # Input data (filesystem)
    output/
      latest -> ../../../mlartifacts/flow_a/<run_id>/artifacts/outputs/

mlartifacts/
  flow_a/
    <run_id_1>/
      artifacts/
        outputs/        # Previous run's outputs
    <run_id_2>/
      artifacts/
        outputs/        # Latest run's outputs

This enables stable cross-flow references that always point to the most recent data:

# flow_a/flow.py
@task("process", output="results")
def process():
    return df  # Saves to MLflow artifacts: outputs/results.csv

# flow_b/flow.py - references flow_a's latest output
@task("analyze", input="flow_a.results")
def analyze(results):
    # Automatically loads from flow_a's latest MLflow artifacts
    pass

Benefits:

  • Zero configuration - Works automatically for all flows
  • Full MLflow integration - Outputs tracked as artifacts with lineage
  • Version history - All previous runs accessible via MLflow UI
  • Development-friendly - Latest symlink always points to most recent run
  • Cross-platform - Works on Windows, macOS, and Linux

Development Workflow

The latest link is especially useful when iterating on downstream flows:

  1. Run flow_a to generate data (saves to MLflow + updates latest link)
  2. Iterate on flow_b multiple times - always loads from flow_a's latest
  3. View all runs and artifacts in MLflow UI: mlflow ui

Combined with environment = "dev" in config (which disables caching), this provides a smooth development experience with no stale data issues.

Note: The latest symlink is updated every time a flow completes, ensuring downstream flows always reference the most recent data.

Configuration Best Practices

Parameters Organization

Group related parameters:

# parameters.yaml

# Model configuration
model:
  name: gpt-4
  temperature: 0.7
  max_tokens: 1000

# Evaluation settings
evaluation:
  batch_size: 32
  num_samples: 100
  metrics:
    - accuracy
    - fluency
    - coherence

# Processing options
processing:
  enable_cache: true
  retry_failed: true
  timeout_seconds: 300

Access nested parameters:

from latent.prefect import params

model_name = params.model.name
batch_size = params.evaluation.batch_size
metrics = params.evaluation.metrics

Catalog Organization

Keep datasets organized by purpose:

# catalog.yaml

# Input datasets (loaded from data/<flow>/input/)
conversations:
  type: pandas.Parquet
  path: conversations.parquet
  dataset_type: input

evaluation_prompts:
  type: json.JSON
  path: prompts.json
  dataset_type: input

# Output datasets (saved to MLflow artifacts)
agent_responses:
  type: pandas.Parquet
  path: responses.parquet
  dataset_type: output

evaluation_results:
  type: json.JSON
  path: results.json
  dataset_type: output

# Text/Markdown outputs (for reports, summaries, etc.)
evaluation_report:
  type: text.Markdown
  path: report.md
  dataset_type: output

raw_output:
  type: text.Text
  path: output.txt
  dataset_type: output

Cross-Flow References

To reference outputs from another flow, use dot notation in the decorator:

@task("analyze", input="upstream_flow.results")
def analyze_task(results):
    # results loaded from upstream_flow's MLflow artifacts
    pass

The framework automatically loads from the source flow's output/latest symlink.

Supported Dataset Types

Type Extension Description
pandas.CSV .csv CSV files loaded as DataFrames
pandas.Parquet .parquet Parquet files (recommended for large datasets)
pandas.JSON .jsonl JSON Lines format (one JSON object per line)
json.JSON .json Regular JSON files (dicts or lists)
pickle.Pickle .pkl Pickle files (for complex Python objects)
text.Text .txt Plain text files (strings)
text.Markdown .md Markdown files (strings)

Text Outputs for Reports

Use text.Markdown for generating human-readable reports, summaries, or documentation as part of your flow outputs:

```python @task("generate_report", output="evaluation_report") def generate_report(results: dict) -> str: """Generate a markdown report from evaluation results.""" return f"""# Evaluation Report

Summary

  • Total samples: {results['total']}
  • Accuracy: {results['accuracy']:.2%}

Details

{results['details']} """ ```

Multiple File Outputs

Return a dict[str, str] to save multiple text/markdown files in one task:

# catalog.yaml
model_reports:
  type: text.Markdown
  path: reports.md  # Will create reports/ directory

```python @task("generate_model_reports", output="model_reports") def generate_model_reports(results: pd.DataFrame) -> dict[str, str]: """Generate a report for each model.""" reports = {} for model_name, group in results.groupby('model'): report = f"""# {model_name}

Performance

  • Accuracy: {group['accuracy'].mean():.2%}
  • Latency: {group['latency'].mean():.0f}ms

Details

{group.describe().to_markdown()} """ reports[f"{model_name.lower().replace(' ', '_')}.md"] = report return reports # Saves: reports/gpt_4.md, reports/claude.md, etc.

**Loading Multiple Files:**
```python
@task("aggregate_reports", input="model_reports")
def aggregate_reports(reports: dict[str, str]):
    """Aggregate all model reports."""
    # reports = {"gpt_4.md": "# GPT-4\n...", "claude.md": "# Claude\n..."}
    combined = "\n\n---\n\n".join(reports.values())
    return combined

Stable Artifacts

The data/stable/ directory holds curated, version-controlled deployment artifacts (glossaries, config files, ChromaDB indexes, etc.) that are shared across flows. Instead of constructing paths manually, access them through catalog.stable:

from latent.prefect import flow, task
from latent.datasets import get_catalog_datasets

@flow("my_flow")
def my_flow():
    catalog = get_catalog_datasets()

    # Load a file by stem name — type is inferred from extension
    glossary = catalog.stable.glossary           # loads data/stable/glossary.json
    config = catalog.stable.app_config           # loads data/stable/app_config.toml
    prompts = catalog.stable.system_prompts      # loads data/stable/system_prompts.yaml

    # Get the Path instead (for directories or unsupported extensions)
    chroma_path = catalog.stable.chroma_db_path  # Path to data/stable/chroma_db/
    raw_path = catalog.stable.binary_data_path   # Path to data/stable/binary_data.bin

How It Works

The accessor scans data/stable/ and maps each file's stem (filename without extension) to its path. You access files in two ways:

Access pattern What it does Example
catalog.stable.<stem> Loads and returns parsed data catalog.stable.glossary returns {"term": "..."}
catalog.stable.<stem>_path Returns the Path object catalog.stable.glossary_path returns Path(".../glossary.json")

Loaded data is cached — repeated access to the same stem returns the cached result without re-reading the file.

Supported File Types

Extension Loaded as Notes
.json dict or list Via json.JSON
.csv DataFrame Via pandas.CSV or polars.CSV
.parquet DataFrame Via pandas.Parquet or polars.Parquet
.toml dict Via tomllib
.yaml / .yml dict Via yaml.safe_load
.txt str Via text.Text
.md str Via text.Markdown
.pkl any Via pickle.Pickle
directories N/A Use _path suffix instead
other N/A Use _path suffix instead

Example Directory Layout

data/stable/
├── glossary.json          # catalog.stable.glossary -> dict
├── prompts.yaml           # catalog.stable.prompts -> dict
├── lookup_table.csv       # catalog.stable.lookup_table -> DataFrame
├── app_config.toml        # catalog.stable.app_config -> dict
├── chroma_db/             # catalog.stable.chroma_db_path -> Path
│   ├── index.bin
│   └── metadata.json
└── .gitkeep               # hidden files are ignored

Configuration

The stable directory defaults to data/stable/ relative to the workspace root. Override it via:

config/latent.toml
[workspace]
stable_dir = "data/stable"   # or an absolute path

Or with an environment variable:

export LATENT_STABLE_DIR=/path/to/stable

Version-Controlled Directory

Unlike other workspace directories, data/stable/ is not auto-created. It should be checked into version control alongside your flow configurations.

Using in Tasks

@task("enrich")
def enrich_task(raw_data: pd.DataFrame):
    catalog = get_catalog_datasets()
    glossary = catalog.stable.glossary
    return raw_data.assign(
        label=raw_data["code"].map(glossary)
    )

Tab Completion

StableAccessor implements __dir__, so tab completion works in IPython and Jupyter:

>>> catalog.stable.<TAB>
glossary       glossary_path       prompts       prompts_path
chroma_db_path lookup_table        lookup_table_path ...

Code Style Guidelines

Import Order

"""Flow module docstring."""

# Standard library
import json
from datetime import datetime
from pathlib import Path

# Third-party
import pandas as pd
from pydantic import BaseModel

# Latent imports
from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow

# Local imports
from .agents import MyAgent
from .evaluators import custom_metric

Type Hints

Always use type hints for task inputs and outputs:

@task("process")
def process_task(
    data: pd.DataFrame,
    config: dict[str, Any],
    threshold: float = 0.5,
) -> pd.DataFrame:
    """Process data with given configuration.

    Args:
        data: Input DataFrame to process
        config: Processing configuration
        threshold: Minimum threshold for filtering

    Returns:
        Processed DataFrame
    """
    return data[data["score"] >= threshold]

Docstrings

Use Google-style docstrings:

@flow("my_evaluation")
def my_evaluation_flow():
    """Run agent evaluation pipeline.

    This flow performs the following steps:
    1. Load evaluation dataset
    2. Run agent on each example
    3. Calculate evaluation metrics
    4. Generate report

    Configuration:
        - model.name: The model to evaluate
        - evaluation.batch_size: Batch size for processing

    Outputs:
        - evaluation_results: JSON file with metrics
        - agent_responses: Parquet file with all responses
    """
    ...

Logging Best Practices

@task("process")
def process_task(data: pd.DataFrame) -> pd.DataFrame:
    # ✅ Good: Structured, informative logs
    logger.info(
        f"Processing {len(data)} rows "
        f"with batch_size={params.batch_size}"
    )

    result = process(data)

    logger.info(
        f"Processing complete: "
        f"input={len(data)}, output={len(result)}, "
        f"dropped={len(data) - len(result)}"
    )

    # ❌ Bad: Vague, unhelpful logs
    # logger.info("Processing...")
    # logger.info("Done")

    return result

Testing Flows

Unit Testing Tasks

# tests/unit/test_my_flow.py
from unittest.mock import patch
import pandas as pd
import pytest

from latent.testing import mock_catalog, mock_config
from flows.my_evaluation.tasks import process_task


def test_process_task():
    """Test process_task in isolation."""
    # Arrange
    input_data = pd.DataFrame({"value": [1, 2, None, 4]})
    expected = pd.DataFrame({"value": [1, 2, 4]})

    with mock_config({"batch_size": 10}):
        with mock_catalog():
            # Act
            result = process_task.fn(input_data)

            # Assert
            pd.testing.assert_frame_equal(result, expected)

Integration Testing Flows

# tests/integration/test_my_flow.py
from latent.testing import mock_config, mock_catalog


def test_full_flow():
    """Test the complete flow execution."""
    with mock_config({"model": "test-model", "batch_size": 5}):
        with mock_catalog({"input_data": test_data}):
            from flows.my_evaluation.flow import my_evaluation_flow

            result = my_evaluation_flow()

            assert result is not None
            assert "accuracy" in result

Common Pitfalls

Common Mistakes

1. Mismatched flow name and directory

# flows/my_eval/flow.py
@flow("evaluation")  # ❌ Should be "my_eval"
def flow():
    pass

2. Circular task dependencies

@task("a", input="b_output")  # ❌ Creates cycle
def task_a(): ...

@task("b", input="a_output")  # ❌ Creates cycle
def task_b(): ...

3. Missing catalog entries for I/O datasets

@task("process", input="missing_dataset")  # ❌ Not in catalog.yaml
def process(): ...

4. Hardcoded paths instead of using catalog

@task("load")
def load():
    return pd.read_csv("/absolute/path/data.csv")  # ❌ Use catalog

See Also