Prefect Flows & Tasks¶
Learn how to build evaluation workflows with Latent's Prefect integration.
Core Concepts¶
Flows¶
Flows are the top-level containers for your evaluation logic. They automatically:
- Load configuration from
parameters.yamlandcatalog.yaml - Set up flow-aware logging
- Initialize MLflow experiment tracking
- Handle data loading/saving through the catalog
Tasks¶
Tasks are the individual units of work within a flow. They automatically:
- Cache results using hybrid strategies (file-based or row-level)
- Retry on failure (3 times by default)
- Create MLflow spans for observability
- Log automatic metrics (duration, row counts)
- Wrap errors with rich context
- Register in the task registry for visualization
- Load/save data from/to the catalog
The @flow Decorator¶
from latent.prefect import flow
from pydantic import BaseModel
class FlowConfig(BaseModel):
batch_size: int
model_name: str
@flow(
"flow_name", # Required: flow name (matches directory name)
input=["dataset1", "dataset2"], # Optional: input datasets from catalog
output=["result1", "result2"], # Optional: output datasets to save
config_schema=FlowConfig, # Optional: Pydantic model for typed config
**flow_kwargs # Optional: additional Prefect flow options
)
def my_flow():
# Your flow logic
pass
Basic Flow¶
@flow("simple_flow")
def simple_flow():
"""A simple flow with no explicit inputs/outputs."""
logger.info("Flow started")
# Your logic here
return result
Flow with Inputs¶
@flow(
"processing_flow",
input=["raw_data", "config_data"]
)
def processing_flow(raw_data, config_data):
"""Flow that loads datasets automatically."""
# raw_data and config_data are loaded from catalog
processed = process(raw_data, config_data)
return processed
Cross-Flow Data Loading¶
@flow(
"downstream_flow",
input=["upstream_flow.output_dataset"]
)
def downstream_flow(upstream_data):
"""Load outputs from another flow."""
# Loads output_dataset from upstream_flow
result = analyze(upstream_data)
return result
The @task Decorator¶
from latent.prefect import task
@task(
"task_name", # Required: task name
input="input_dataset", # Optional: input dataset name
output="output_dataset", # Optional: output dataset name
cache=True, # Optional: enable caching (default: True)
retry=True, # Optional: enable retries (default: True)
span_type="CHAIN", # Optional: MLflow span type (CHAIN, AGENT, TOOL, etc.)
cache_expiration=timedelta(days=7), # Optional: cache duration
**task_kwargs # Optional: additional Prefect task options
)
def my_task(input_data):
# Your task logic
return output_data
Basic Task¶
@task("process_data")
def process_data(data):
"""Simple task with no caching."""
return data.dropna()
Task with Catalog I/O¶
@task(
"clean_data",
input="raw_data", # Loaded from catalog
output="cleaned_data" # Saved to catalog
)
def clean_data(raw_data: pd.DataFrame) -> pd.DataFrame:
"""Task with automatic data loading/saving."""
cleaned = raw_data.dropna()
cleaned = cleaned[cleaned['value'] > 0]
return cleaned
Task with Custom Caching¶
from datetime import timedelta
@task(
"expensive_computation",
input="data",
output="results",
cache_expiration=timedelta(days=30), # Cache for 30 days
)
def expensive_computation(data):
"""Task with extended cache duration."""
# Expensive processing...
return results
Task without Caching¶
@task(
"generate_timestamp",
cache=False, # Disable caching for non-deterministic tasks
retry=False # Disable retries
)
def generate_timestamp():
"""Task that always runs (no caching)."""
return datetime.now().isoformat()
Typed Configuration¶
Use Pydantic models for type-safe configuration:
from pydantic import BaseModel, Field
from latent.prefect import flow, params
class MyFlowConfig(BaseModel):
batch_size: int = Field(gt=0, description="Batch size for processing")
model_name: str
learning_rate: float = 0.001
max_iterations: int = Field(default=1000, le=10000)
@flow("my_flow", config_schema=MyFlowConfig)
def my_flow():
# params is now a typed Pydantic model
batch_size: int = params.batch_size # IDE autocomplete works!
model: str = params.model_name
# Validation happens automatically on flow start
logger.info(f"Running with batch_size={batch_size}")
Benefits of Typed Config¶
- IDE Support: Full autocomplete and type checking
- Validation: Automatic validation on flow start
- Documentation: Field descriptions in your config
- Defaults: Built-in default values
- Constraints: Pydantic validators (gt, le, regex, etc.)
# parameters.yaml
batch_size: 32
model_name: "gpt-4"
learning_rate: 0.001
# max_iterations uses default (1000)
If validation fails, you get a clear error:
ValueError: Configuration validation failed for flow 'my_flow':
batch_size: ensure this value is greater than 0 (type=value_error.number.not_gt)
Observability & Metrics¶
Automatic MLflow Spans¶
Tasks automatically create MLflow spans for observability:
@task("process_data", span_type="CHAIN")
def process_data(data):
# Automatically creates an MLflow span with:
# - span.type = "CHAIN"
# - Automatic duration logging
# - Input/output attributes
return processed_data
Span types follow MLflow conventions:
- CHAIN: Sequential processing
- AGENT: Agent-based tasks
- TOOL: Tool invocations
- RETRIEVER: Data retrieval
- LLM: Language model calls
Automatic Metrics¶
Tasks automatically log:
@task("process")
def process_task(data):
# Automatically logged:
# - task.process.duration (in seconds)
# - task.process.status (success/failure)
# - Error details (if failed)
return result
Enhanced Error Context¶
Task failures are wrapped with rich context:
@task("process", input="data")
def process_task(data):
raise ValueError("Processing failed!")
# Error includes:
# - Task name
# - Input arguments
# - Original exception
# - MLflow span attributes
Error output:
TaskExecutionError: Task 'process' failed.
Original error: ValueError: Processing failed!
Context: {'input': 'data', 'row_index': None}
Task Registry & Visualization¶
Tasks are automatically registered for visualization:
from latent.registry import TaskRegistry
# After running flows, visualize the pipeline
TaskRegistry.print_ascii()
Output:
=== 🌳 Canopy Pipeline Topology ===
📦 Flow: data_pipeline
├── ⚙️ load
│ ⬆️ out: raw_data
├── ⚙️ clean
│ ⬇️ in: raw_data
│ ⬆️ out: cleaned_data
├── ⚙️ process
│ ⬇️ in: cleaned_data
│ ⬆️ out: processed_data
Accessing Configuration¶
Using the params Object¶
from latent.prefect import params
@task("process")
def process_task(data):
# Access parameters from parameters.yaml
batch_size = params.batch_size
model = params.model_name
# Dict-style access
lr = params["learning_rate"]
# With default value
timeout = params.get("timeout", 30)
return process(data, batch_size, model, lr)
Cross-Flow Parameter Access¶
You can access parameters from other flows using dot notation with params.get():
from latent.prefect import params
@task("analyze")
def analyze_task(data):
# Access parameters from another flow
upstream_model = params.get("upstream_flow.model_name")
upstream_temp = params.get("upstream_flow.temperature", default=0.7)
# Current flow parameters still work
batch_size = params.batch_size
return analyze(data, model=upstream_model, temp=upstream_temp)
This is useful when:
- You need to ensure consistency across flows (e.g., same model configuration)
- A downstream flow needs to know upstream configuration
- You want to reference shared settings without duplicating them
How it works:
params.get("other_flow.param_name")loadsparameters.yamlfromflows/other_flow/- Global parameters from
global.yamlare merged (flow-specific takes precedence) - If the flow or parameter doesn't exist, returns the default value
# Example: Downstream flow accessing upstream configuration
@flow("downstream_flow")
def downstream_flow():
# Get the model that was used in upstream processing
upstream_model = params.get("upstream_flow.model_name")
# Use the same model for consistency
if upstream_model:
logger.info(f"Using upstream model: {upstream_model}")
result = process_task()
return result
Note
Cross-flow parameter access only works with params.get(), not with attribute access (params.other_flow) or dict access (params["other_flow.param"]).
Using get_config()¶
from latent.prefect import get_config
@flow("my_flow")
def my_flow():
config = get_config()
# Access parameters
batch_size = config["parameters"]["batch_size"]
# Access catalog
datasets = config["catalog"]
Logging¶
Using the logger Object¶
from latent.prefect import logger
@task("process")
def process_task(data):
logger.debug("Detailed debug information")
logger.info(f"Processing {len(data)} rows")
logger.warning("This might take a while")
logger.error("Something went wrong")
try:
result = risky_operation()
except Exception as e:
logger.exception("Operation failed")
raise
return result
Using get_flow_logger()¶
from latent.prefect import get_flow_logger
@task("process")
def process_task(data):
logger = get_flow_logger()
logger.info("Task started")
return data
Advanced Patterns¶
Parallel Task Execution¶
@flow("parallel_flow")
def parallel_flow():
items = load_items()
# Execute task in parallel for each item
futures = process_item_task.map(
item=items,
idx=range(len(items))
)
# Wait for all tasks to complete
results = [f.result() for f in futures]
return aggregate(results)
@task("process_item")
def process_item_task(item, idx):
logger.info(f"Processing item {idx}")
return process(item)
Conditional Task Execution¶
@flow("conditional_flow")
def conditional_flow():
data = load_data()
if params.enable_preprocessing:
data = preprocess_task(data)
results = analyze_task(data)
if params.save_intermediate:
save_intermediate_task(data)
return results
Dynamic Task Configuration¶
@task("configurable_task")
def configurable_task(data):
# Get task-specific config
task_config = params.get("task_config", {})
threshold = task_config.get("threshold", 0.5)
method = task_config.get("method", "default")
return process(data, threshold=threshold, method=method)
Error Handling¶
@task("robust_task", retry=True)
def robust_task(data):
try:
result = process(data)
except ValueError as e:
logger.warning(f"Validation error: {e}, using defaults")
result = default_result()
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
return result
Subflows¶
@flow("subflow")
def subflow(data):
"""A reusable subflow."""
cleaned = clean_task(data)
processed = process_task(cleaned)
return processed
@flow("main_flow")
def main_flow():
"""Main flow that calls subflows."""
data1 = load_data_1()
data2 = load_data_2()
# Call subflow for each dataset
result1 = subflow(data1)
result2 = subflow(data2)
return combine(result1, result2)
Hybrid Caching Strategy¶
Latent uses two caching strategies depending on the task type:
1. File-Based Caching (Batch)¶
For tasks with explicit input datasets, uses file hashes:
@task("cached_task", input="raw_data", output="processed_data")
def cached_task(raw_data):
# Cached based on:
# 1. The hash of raw_data
# 2. The task code signature
# 3. Cross-flow references (if any)
# If raw_data hasn't changed,
# the cached result is returned immediately
return expensive_processing(raw_data)
Benefits: - Based on actual data content - Works across flows (cross-flow dependencies) - Invalidates automatically when data changes
2. Row-Level Caching (Granular)¶
For tasks without explicit inputs (e.g., mapped over rows):
@task("process_row")
def process_row(row_data: dict) -> dict:
# Cached based on stable hash of row_data
# Each unique row_data gets its own cache entry
return expensive_computation(row_data)
@flow("my_flow")
def my_flow():
rows = load_data()
# Each row is cached independently
results = process_row.map(row_data=rows)
return results
Benefits: - Fine-grained caching (per-row or per-argument) - Efficient for mapped tasks - Handles DataFrames, dicts, Pydantic models - Stable hashing ensures consistency
Cache Key Generation¶
File-Based (with input):
Row-Level (without input):
How Caching Works¶
- Cache Key Generation: Based on strategy (file or row-level)
- Cache Lookup: Prefect checks if a cached result exists
- Cache Hit: Return cached result (no execution)
- Cache Miss: Execute task and cache result
- Cache Expiration: Default 7 days, configurable
Disabling Caching¶
# For non-deterministic tasks
@task("random_task", cache=False)
def random_task():
return random.random()
# For always-fresh data
@task("fetch_latest", cache=False)
def fetch_latest():
return api.get_latest()
Custom Cache Duration¶
from datetime import timedelta
# Cache for 30 days
@task("expensive_task", input="data", cache_expiration=timedelta(days=30))
def expensive_task(data):
return very_expensive_computation(data)
# Cache for 1 hour
@task("api_task", cache_expiration=timedelta(hours=1))
def api_task():
return fetch_from_api()
Checkpointing for Resumability¶
While task-level caching handles completed tasks, checkpointing enables resumability within long-running tasks that iterate over many items. This is useful when network failures or other errors cause a task to fail mid-execution.
The Problem¶
@task("process_documents")
def process_documents(documents: list[dict]) -> list[dict]:
results = []
for doc in documents: # 10,000 documents
# Network call that might fail
result = llm_client.complete(doc["prompt"])
results.append(result)
return results
# Problem: If this fails at document 7,500...
# On retry, it starts from scratch and reprocesses all 10,000 documents!
The Solution: @checkpoint¶
Use the @checkpoint decorator on inner functions to cache their results persistently:
from latent.prefect import task, checkpoint
@task("process_documents")
def process_documents(documents: list[dict]) -> list[dict]:
@checkpoint # Results persist to disk
def expensive_llm_call(doc: dict) -> dict:
response = llm_client.complete(doc["prompt"])
return {"id": doc["id"], "response": response}
results = []
for doc in documents:
# On retry, already-processed docs return instantly from cache
result = expensive_llm_call(doc)
results.append(result)
return results
# On retry after failure at doc 7,500:
# - First 7,500 return instantly from checkpoint cache
# - Only remaining 2,500 are processed
How It Works¶
- First call: Function executes, result saved to
.latent/cache/ - Same arguments: Returns cached result immediately
- Different arguments: Executes and caches separately
- Flow retries: Cache persists across process restarts
Development Mode Only¶
Important: Checkpointing is only active in development mode. In production, it's a no-op.
# Enable checkpointing (development mode)
export LATENT_ENVIRONMENT=dev
# Or in config/latent.toml
environment = "dev"
| Environment | Checkpoint Behavior |
|---|---|
dev / development |
Enabled - caches to disk for resumability |
production (default) |
Disabled - no-op, runs fresh every time |
Rationale: In production, you want predictable, fresh executions. Checkpointing is for iterative development where you're debugging and want to resume after failures.
When to Use Checkpointing¶
Use @checkpoint when:
- ✅ Task iterates over many items internally (not using
.map()) - ✅ Each item takes significant time (API calls, model inference)
- ✅ Network failures or rate limits might cause partial failures
- ✅ You're in development/debugging mode
Use task .map() instead when:
- ✅ Items can be processed as separate task runs
- ✅ You want Prefect's built-in concurrency control
- ✅ You need visibility into individual item status in Prefect UI
Use task-level caching (cache=True) when:
- ✅ Entire task result should be cached (not individual items)
- ✅ Task is deterministic and data-driven
Multiple Checkpointed Functions¶
You can checkpoint multiple functions independently:
@task("complex_pipeline")
def complex_pipeline(items: list[dict]) -> list[dict]:
@checkpoint
def step_1_extract(item: dict) -> dict:
return extraction_model.run(item)
@checkpoint
def step_2_transform(extracted: dict) -> dict:
return transformation_model.run(extracted)
@checkpoint
def step_3_validate(transformed: dict) -> dict:
return validation_model.run(transformed)
results = []
for item in items:
# Each step independently cached
extracted = step_1_extract(item)
transformed = step_2_transform(extracted)
validated = step_3_validate(transformed)
results.append(validated)
return results
Clearing Checkpoints¶
Manually clear checkpoint cache when needed:
Or delete the cache directory:
Cache Location¶
Checkpoint data is stored in:
Note: Add .latent/ to your .gitignore (not committed to version control).
Thread Safety and Parallel Execution¶
The @checkpoint decorator is thread-safe and can be used with parallel task execution. The underlying cache uses proper locking to handle concurrent access from multiple threads.
@task("parallel_processing")
def parallel_processing(items: list[dict]) -> list[dict]:
@checkpoint
def process_item(item: dict) -> dict:
return expensive_operation(item)
# Safe to use with ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(process_item, item) for item in items]
results = [f.result() for f in futures]
return results
How it works:
- The checkpoint cache is initialized with thread-safe locking (double-checked locking pattern)
- Only one cache instance is created, shared across all threads
- The underlying
diskcachelibrary handles concurrent read/write operations safely
Best practices for parallel checkpointing:
- Define checkpointed functions at the top level of your task - not inside loops or nested functions
- Use unique function names - each checkpointed function should have a distinct name
- Avoid redefining checkpointed functions - dynamically creating
@checkpointdecorated functions in parallel contexts can cause cache key collisions
# ✅ Good: Define checkpointed function once at task level
@task("process_batch")
def process_batch(items: list[dict]) -> list[dict]:
@checkpoint
def call_api(item: dict) -> dict:
return api.process(item)
# Use the same function for all items
return [call_api(item) for item in items]
# ❌ Avoid: Creating checkpointed functions inside parallel workers
@task("process_batch")
def process_batch(items: list[dict]) -> list[dict]:
results = []
for item in items:
# Creating @checkpoint functions in a loop is inefficient
@checkpoint
def process_this_item(x: dict) -> dict:
return api.process(x)
results.append(process_this_item(item))
return results
Troubleshooting parallel execution issues:
If you experience hangs or deadlocks with checkpointing during parallel execution:
- Disable checkpointing temporarily: Set
environment = "production"inconfig/latent.toml - Check concurrency settings: Reduce concurrency if experiencing SQLite lock contention
- Clear the cache: Run
rm -rf .latent/cache/to remove potentially corrupted cache data
Best Practices¶
1. Small, Focused Tasks¶
# ✅ Good: Single responsibility
@task("clean_data")
def clean_data(data):
return data.dropna()
@task("transform_data")
def transform_data(data):
return data.apply(transform_fn)
# ❌ Bad: Too much in one task
@task("process_everything")
def process_everything(data):
data = data.dropna()
data = data.apply(transform_fn)
data = data.merge(other_data)
# ... many more operations
2. Use Type Hints¶
# ✅ Good: Clear types
@task("process")
def process_task(data: pd.DataFrame) -> pd.DataFrame:
return data.dropna()
# ❌ Bad: No type hints
@task("process")
def process_task(data):
return data.dropna()
3. Descriptive Logging¶
# ✅ Good: Informative logs
@task("process")
def process_task(data):
logger.info(f"Processing {len(data)} rows with batch_size={params.batch_size}")
result = process(data)
logger.info(f"Processed {len(result)} rows, dropped {len(data) - len(result)}")
return result
# ❌ Bad: No context
@task("process")
def process_task(data):
logger.info("Processing")
return process(data)
4. Handle Errors Gracefully¶
# ✅ Good: Explicit error handling
@task("api_call")
def api_call_task():
try:
result = api.fetch()
except TimeoutError:
logger.warning("API timeout, using cached data")
result = load_cached()
except Exception as e:
logger.error(f"API call failed: {e}")
raise
return result
See Also¶
- MLflow Tracking - Track experiments within flows
- API Reference - Complete API documentation