Skip to content

Getting Started

This guide will help you set up Latent and create your first evaluation flow.

Installation

Using pip

pip install latent
uv add latent

From source

git clone https://github.com/latentsp/canopy.git
cd canopy/latent
pip install -e .

Prerequisites

Latent requires:

  • Python 3.12 or higher
  • Prefect 3.0+
  • MLflow 2.10+

These are automatically installed when you install Latent.

Setting Up Your First Project

1. Create Project Structure

mkdir my_evaluation
cd my_evaluation
mkdir -p data flows

Your project structure should look like:

my_evaluation/
├── data/               # Data files
├── flows/              # Flow definitions
│   ├── my_flow/
│   │   ├── catalog.yaml      # Data catalog
│   │   ├── parameters.yaml   # Flow parameters
│   │   └── flow.py           # Flow definition
└── global.yaml         # Global configuration (optional)

2. Initialize Configuration

Generate a configuration file using the CLI:

latent init

This creates config/latent.toml with sensible defaults:

# Environment mode: "production" or "dev"
environment = "production"

[workspace]
# flows_dir = "flows"
# data_dir = "data"
# logs_dir = "logs"
# mlruns_dir = "mlruns"

[mlflow]
enabled = true
litellm_autolog = true
langchain_autolog = true

[logging]
level = "INFO"

Environment Variables

You can also configure Latent via environment variables which take precedence over the TOML config. See Workspace Configuration for details.

3. Create Flow Configuration

Create flows/my_flow/parameters.yaml:

# Flow-specific parameters
batch_size: 32
learning_rate: 0.001
max_epochs: 100
model_name: gpt-4

Create flows/my_flow/catalog.yaml:

# Input datasets (loaded from data/my_flow/input/)
input_data:
  type: pandas.CSV
  path: input.csv
  dataset_type: input

# Output datasets (saved to MLflow artifacts)
processed_data:
  type: pandas.Parquet
  path: processed.parquet
  dataset_type: output

results:
  type: json.JSON
  path: results.json
  dataset_type: output

# Text/Markdown outputs (for reports)
report:
  type: text.Markdown
  path: report.md
  dataset_type: output

Supported Dataset Types

  • pandas.CSV, pandas.Parquet, pandas.JSON - DataFrames
  • json.JSON - Dicts and lists
  • text.Text, text.Markdown - Plain strings (or dict[str, str] for multiple files)
  • pickle.Pickle - Any Python object

Input vs Output

  • Inputs (dataset_type: input): Loaded from data/<flow>/input/
  • Outputs (dataset_type: output): Saved to MLflow artifacts and accessible via data/<flow>/output/latest symlink

4. Write Your First Flow

Create flows/my_flow/flow.py:

"""My first evaluation flow."""

from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
import pandas as pd


@task(
    "load_data",
    input="input_data",
    output="loaded_data"
)
def load_data_task(input_df: pd.DataFrame) -> pd.DataFrame:
    """Load and validate input data."""
    logger.info(f"Loading {len(input_df)} rows")

    # Validate data
    if input_df.empty:
        raise ValueError("Input data is empty")

    # Log basic stats
    mlflow.log_metric("input_rows", len(input_df))
    mlflow.log_metric("input_columns", len(input_df.columns))

    return input_df


@task(
    "process_data",
    input="loaded_data",
    output="processed_data"
)
def process_data_task(data: pd.DataFrame) -> pd.DataFrame:
    """Process the data."""
    logger.info(f"Processing with batch_size={params.batch_size}")

    # Your processing logic
    processed = data.dropna()
    processed = processed[processed['value'] > 0]

    # Log processing metrics
    mlflow.log_metric("processed_rows", len(processed))
    mlflow.log_metric("rows_dropped", len(data) - len(processed))

    return processed


@task(
    "analyze_data",
    input="processed_data",
    output="results"
)
def analyze_data_task(data: pd.DataFrame) -> dict:
    """Analyze processed data."""
    logger.info("Analyzing data")

    results = {
        "total_rows": len(data),
        "mean_value": float(data['value'].mean()),
        "std_value": float(data['value'].std()),
        "model_used": params.model_name,
    }

    # Log results to MLflow
    for key, value in results.items():
        if isinstance(value, (int, float)):
            mlflow.log_metric(key, value)
        else:
            mlflow.log_param(key, value)

    return results


@flow(
    "my_flow",
    output=["processed_data", "results"]
)
def my_evaluation_flow():
    """
    Main evaluation flow.

    This flow:
    1. Loads input data
    2. Processes and cleans the data
    3. Analyzes the results
    4. Saves outputs automatically
    """
    logger.info(f"Starting evaluation with model={params.model_name}")

    # Tasks are automatically chained based on input/output
    loaded = load_data_task()
    processed = process_data_task(loaded)
    results = analyze_data_task(processed)

    logger.info(f"Evaluation complete: {results}")

    return processed, results


if __name__ == "__main__":
    # Run the flow
    my_evaluation_flow()

5. Prepare Your Data

Create a sample input file data/input.csv:

id,value,category
1,100,A
2,200,B
3,-50,C
4,300,A
5,150,B

6. Run Your Flow

cd flows/my_flow
python flow.py

What Just Happened?

When you ran the flow, Latent automatically:

  1. Loaded configuration from parameters.yaml and catalog.yaml
  2. Set up logging with flow-aware loggers
  3. Created MLflow experiment named "my_flow"
  4. Loaded input data from the catalog
  5. Executed tasks with automatic caching
  6. Tracked metrics in MLflow
  7. Saved outputs to timestamped directories

Viewing Results

MLflow UI

View your experiment tracking:

mlflow ui

Then open http://localhost:5000 in your browser.

Logs

Logs are automatically saved to logs/my_flow/.

Next Steps

Now that you have a working flow, explore:

Common Patterns

Loading Data from Previous Flows

Reference outputs from other flows using dot notation:

# Use the dot notation in @flow or @task decorator
@flow(
    "my_flow",
    input=["other_flow.output_dataset"]
)
def my_flow(output_dataset):
    # output_dataset is automatically loaded from other_flow's outputs
    pass

# Or in a task
@task("analyze", input="upstream_flow.results")
def analyze_task(results):
    # results loaded from upstream_flow's MLflow artifacts
    pass

Cross-flow references automatically load from the source flow's output/latest symlink, which points to the most recent MLflow artifacts.

Conditional Task Execution

@task("conditional_task")
def conditional_task(data: pd.DataFrame) -> pd.DataFrame:
    if params.enable_feature:
        logger.info("Feature enabled, processing...")
        return process_data(data)
    else:
        logger.info("Feature disabled, skipping...")
        return data

Parallel Task Execution

@flow("parallel_flow")
def parallel_flow():
    # Process items in parallel
    results = process_task.map(
        item=[item for item in items],
        idx=[i for i in range(len(items))]
    )

    # Wait for all tasks to complete
    completed = [r.result() for r in results]
    return completed

Multiple File Outputs

Save multiple files from a single task by returning a dict:

@task("generate_reports", output="reports")
def generate_reports(data: pd.DataFrame) -> dict[str, str]:
    """Generate multiple report files."""
    return {
        "summary.md": f"# Summary\n\nTotal rows: {len(data)}",
        "details.md": f"# Details\n\n{data.describe().to_markdown()}",
        "warnings.md": f"# Warnings\n\nMissing values: {data.isna().sum().sum()}",
    }
    # Automatically saves to MLflow artifacts: outputs/reports/

In your catalog:

reports:
  type: text.Markdown
  path: reports.md  # Creates reports/ directory in artifacts
  dataset_type: output

Troubleshooting

Import Errors

If you see ModuleNotFoundError: No module named 'latent':

# Make sure latent is installed
pip install latent

# Or in development
cd latent
pip install -e .

MLflow Connection Issues

If MLflow tracking fails:

# Check MLflow server
mlflow ui

# Or set tracking URI in config/latent.toml
# [mlflow]
# tracking_uri = "http://localhost:5000"

# Or via environment variable
export LATENT_MLFLOW_TRACKING_URI=http://localhost:5000

Getting Help