Getting Started¶
This guide will help you set up Latent and create your first evaluation flow.
Installation¶
Using pip¶
Using uv (recommended)¶
From source¶
Prerequisites¶
Latent requires:
- Python 3.12 or higher
- Prefect 3.0+
- MLflow 2.10+
These are automatically installed when you install Latent.
Setting Up Your First Project¶
1. Create Project Structure¶
Your project structure should look like:
my_evaluation/
├── data/ # Data files
├── flows/ # Flow definitions
│ ├── my_flow/
│ │ ├── catalog.yaml # Data catalog
│ │ ├── parameters.yaml # Flow parameters
│ │ └── flow.py # Flow definition
└── global.yaml # Global configuration (optional)
2. Initialize Configuration¶
Generate a configuration file using the CLI:
This creates config/latent.toml with sensible defaults:
# Environment mode: "production" or "dev"
environment = "production"
[workspace]
# flows_dir = "flows"
# data_dir = "data"
# logs_dir = "logs"
# mlruns_dir = "mlruns"
[mlflow]
enabled = true
litellm_autolog = true
langchain_autolog = true
[logging]
level = "INFO"
Environment Variables
You can also configure Latent via environment variables which take precedence over the TOML config. See Workspace Configuration for details.
3. Create Flow Configuration¶
Create flows/my_flow/parameters.yaml:
Create flows/my_flow/catalog.yaml:
# Input datasets (loaded from data/my_flow/input/)
input_data:
type: pandas.CSV
path: input.csv
dataset_type: input
# Output datasets (saved to MLflow artifacts)
processed_data:
type: pandas.Parquet
path: processed.parquet
dataset_type: output
results:
type: json.JSON
path: results.json
dataset_type: output
# Text/Markdown outputs (for reports)
report:
type: text.Markdown
path: report.md
dataset_type: output
Supported Dataset Types
pandas.CSV,pandas.Parquet,pandas.JSON- DataFramesjson.JSON- Dicts and liststext.Text,text.Markdown- Plain strings (ordict[str, str]for multiple files)pickle.Pickle- Any Python object
Input vs Output
- Inputs (
dataset_type: input): Loaded fromdata/<flow>/input/ - Outputs (
dataset_type: output): Saved to MLflow artifacts and accessible viadata/<flow>/output/latestsymlink
4. Write Your First Flow¶
Create flows/my_flow/flow.py:
"""My first evaluation flow."""
from latent.prefect import flow, task, params, logger
from latent.mlflow import mlflow
import pandas as pd
@task(
"load_data",
input="input_data",
output="loaded_data"
)
def load_data_task(input_df: pd.DataFrame) -> pd.DataFrame:
"""Load and validate input data."""
logger.info(f"Loading {len(input_df)} rows")
# Validate data
if input_df.empty:
raise ValueError("Input data is empty")
# Log basic stats
mlflow.log_metric("input_rows", len(input_df))
mlflow.log_metric("input_columns", len(input_df.columns))
return input_df
@task(
"process_data",
input="loaded_data",
output="processed_data"
)
def process_data_task(data: pd.DataFrame) -> pd.DataFrame:
"""Process the data."""
logger.info(f"Processing with batch_size={params.batch_size}")
# Your processing logic
processed = data.dropna()
processed = processed[processed['value'] > 0]
# Log processing metrics
mlflow.log_metric("processed_rows", len(processed))
mlflow.log_metric("rows_dropped", len(data) - len(processed))
return processed
@task(
"analyze_data",
input="processed_data",
output="results"
)
def analyze_data_task(data: pd.DataFrame) -> dict:
"""Analyze processed data."""
logger.info("Analyzing data")
results = {
"total_rows": len(data),
"mean_value": float(data['value'].mean()),
"std_value": float(data['value'].std()),
"model_used": params.model_name,
}
# Log results to MLflow
for key, value in results.items():
if isinstance(value, (int, float)):
mlflow.log_metric(key, value)
else:
mlflow.log_param(key, value)
return results
@flow(
"my_flow",
output=["processed_data", "results"]
)
def my_evaluation_flow():
"""
Main evaluation flow.
This flow:
1. Loads input data
2. Processes and cleans the data
3. Analyzes the results
4. Saves outputs automatically
"""
logger.info(f"Starting evaluation with model={params.model_name}")
# Tasks are automatically chained based on input/output
loaded = load_data_task()
processed = process_data_task(loaded)
results = analyze_data_task(processed)
logger.info(f"Evaluation complete: {results}")
return processed, results
if __name__ == "__main__":
# Run the flow
my_evaluation_flow()
5. Prepare Your Data¶
Create a sample input file data/input.csv:
6. Run Your Flow¶
What Just Happened?¶
When you ran the flow, Latent automatically:
- ✅ Loaded configuration from
parameters.yamlandcatalog.yaml - ✅ Set up logging with flow-aware loggers
- ✅ Created MLflow experiment named "my_flow"
- ✅ Loaded input data from the catalog
- ✅ Executed tasks with automatic caching
- ✅ Tracked metrics in MLflow
- ✅ Saved outputs to timestamped directories
Viewing Results¶
MLflow UI¶
View your experiment tracking:
Then open http://localhost:5000 in your browser.
Logs¶
Logs are automatically saved to logs/my_flow/.
Next Steps¶
Now that you have a working flow, explore:
- Flow Building & Style - Project organization, colocation, and best practices
- Prefect Flows & Tasks - Learn about advanced decorator options
- MLflow Tracking - Deep dive into experiment tracking
- Examples - See real-world examples
Common Patterns¶
Loading Data from Previous Flows¶
Reference outputs from other flows using dot notation:
# Use the dot notation in @flow or @task decorator
@flow(
"my_flow",
input=["other_flow.output_dataset"]
)
def my_flow(output_dataset):
# output_dataset is automatically loaded from other_flow's outputs
pass
# Or in a task
@task("analyze", input="upstream_flow.results")
def analyze_task(results):
# results loaded from upstream_flow's MLflow artifacts
pass
Cross-flow references automatically load from the source flow's output/latest symlink, which points to the most recent MLflow artifacts.
Conditional Task Execution¶
@task("conditional_task")
def conditional_task(data: pd.DataFrame) -> pd.DataFrame:
if params.enable_feature:
logger.info("Feature enabled, processing...")
return process_data(data)
else:
logger.info("Feature disabled, skipping...")
return data
Parallel Task Execution¶
@flow("parallel_flow")
def parallel_flow():
# Process items in parallel
results = process_task.map(
item=[item for item in items],
idx=[i for i in range(len(items))]
)
# Wait for all tasks to complete
completed = [r.result() for r in results]
return completed
Multiple File Outputs¶
Save multiple files from a single task by returning a dict:
@task("generate_reports", output="reports")
def generate_reports(data: pd.DataFrame) -> dict[str, str]:
"""Generate multiple report files."""
return {
"summary.md": f"# Summary\n\nTotal rows: {len(data)}",
"details.md": f"# Details\n\n{data.describe().to_markdown()}",
"warnings.md": f"# Warnings\n\nMissing values: {data.isna().sum().sum()}",
}
# Automatically saves to MLflow artifacts: outputs/reports/
In your catalog:
reports:
type: text.Markdown
path: reports.md # Creates reports/ directory in artifacts
dataset_type: output
Troubleshooting¶
Import Errors¶
If you see ModuleNotFoundError: No module named 'latent':
MLflow Connection Issues¶
If MLflow tracking fails:
# Check MLflow server
mlflow ui
# Or set tracking URI in config/latent.toml
# [mlflow]
# tracking_uri = "http://localhost:5000"
# Or via environment variable
export LATENT_MLFLOW_TRACKING_URI=http://localhost:5000