Skip to content

Prefect Flows & Tasks

Learn how to build evaluation workflows with Latent's Prefect integration.

Core Concepts

Flows

Flows are the top-level containers for your evaluation logic. They automatically:

  • Load configuration from parameters.yaml and catalog.yaml
  • Set up flow-aware logging
  • Initialize MLflow experiment tracking
  • Handle data loading/saving through the catalog

Tasks

Tasks are the individual units of work within a flow. They automatically:

  • Cache results using hybrid strategies (file-based or row-level)
  • Retry on failure (3 times by default)
  • Create MLflow spans for observability
  • Log automatic metrics (duration, row counts)
  • Wrap errors with rich context
  • Register in the task registry for visualization
  • Load/save data from/to the catalog

The @flow Decorator

from latent.prefect import flow
from pydantic import BaseModel

class FlowConfig(BaseModel):
    batch_size: int
    model_name: str

@flow(
    "flow_name",                    # Required: flow name (matches directory name)
    input=["dataset1", "dataset2"], # Optional: input datasets from catalog
    output=["result1", "result2"],  # Optional: output datasets to save
    config_schema=FlowConfig,       # Optional: Pydantic model for typed config
    **flow_kwargs                   # Optional: additional Prefect flow options
)
def my_flow():
    # Your flow logic
    pass

Basic Flow

@flow("simple_flow")
def simple_flow():
    """A simple flow with no explicit inputs/outputs."""
    logger.info("Flow started")
    # Your logic here
    return result

Flow with Inputs

@flow(
    "processing_flow",
    input=["raw_data", "config_data"]
)
def processing_flow(raw_data, config_data):
    """Flow that loads datasets automatically."""
    # raw_data and config_data are loaded from catalog
    processed = process(raw_data, config_data)
    return processed

Cross-Flow Data Loading

@flow(
    "downstream_flow",
    input=["upstream_flow.output_dataset"]
)
def downstream_flow(upstream_data):
    """Load outputs from another flow."""
    # Loads output_dataset from upstream_flow
    result = analyze(upstream_data)
    return result

The @task Decorator

from latent.prefect import task

@task(
    "task_name",                    # Required: task name
    input="input_dataset",          # Optional: input dataset name
    output="output_dataset",        # Optional: output dataset name
    cache=True,                     # Optional: enable caching (default: True)
    retry=True,                     # Optional: enable retries (default: True)
    span_type="CHAIN",              # Optional: MLflow span type (CHAIN, AGENT, TOOL, etc.)
    cache_expiration=timedelta(days=7),  # Optional: cache duration
    **task_kwargs                   # Optional: additional Prefect task options
)
def my_task(input_data):
    # Your task logic
    return output_data

Basic Task

@task("process_data")
def process_data(data):
    """Simple task with no caching."""
    return data.dropna()

Task with Catalog I/O

@task(
    "clean_data",
    input="raw_data",      # Loaded from catalog
    output="cleaned_data"  # Saved to catalog
)
def clean_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    """Task with automatic data loading/saving."""
    cleaned = raw_data.dropna()
    cleaned = cleaned[cleaned['value'] > 0]
    return cleaned

Task with Custom Caching

from datetime import timedelta

@task(
    "expensive_computation",
    input="data",
    output="results",
    cache_expiration=timedelta(days=30),  # Cache for 30 days
)
def expensive_computation(data):
    """Task with extended cache duration."""
    # Expensive processing...
    return results

Task without Caching

@task(
    "generate_timestamp",
    cache=False,  # Disable caching for non-deterministic tasks
    retry=False   # Disable retries
)
def generate_timestamp():
    """Task that always runs (no caching)."""
    return datetime.now().isoformat()

Typed Configuration

Use Pydantic models for type-safe configuration:

from pydantic import BaseModel, Field
from latent.prefect import flow, params

class MyFlowConfig(BaseModel):
    batch_size: int = Field(gt=0, description="Batch size for processing")
    model_name: str
    learning_rate: float = 0.001
    max_iterations: int = Field(default=1000, le=10000)

@flow("my_flow", config_schema=MyFlowConfig)
def my_flow():
    # params is now a typed Pydantic model
    batch_size: int = params.batch_size  # IDE autocomplete works!
    model: str = params.model_name

    # Validation happens automatically on flow start
    logger.info(f"Running with batch_size={batch_size}")

Benefits of Typed Config

  1. IDE Support: Full autocomplete and type checking
  2. Validation: Automatic validation on flow start
  3. Documentation: Field descriptions in your config
  4. Defaults: Built-in default values
  5. Constraints: Pydantic validators (gt, le, regex, etc.)
# parameters.yaml
batch_size: 32
model_name: "gpt-4"
learning_rate: 0.001
# max_iterations uses default (1000)

If validation fails, you get a clear error:

ValueError: Configuration validation failed for flow 'my_flow':
  batch_size: ensure this value is greater than 0 (type=value_error.number.not_gt)

Observability & Metrics

Automatic MLflow Spans

Tasks automatically create MLflow spans for observability:

@task("process_data", span_type="CHAIN")
def process_data(data):
    # Automatically creates an MLflow span with:
    # - span.type = "CHAIN"
    # - Automatic duration logging
    # - Input/output attributes
    return processed_data

Span types follow MLflow conventions: - CHAIN: Sequential processing - AGENT: Agent-based tasks - TOOL: Tool invocations - RETRIEVER: Data retrieval - LLM: Language model calls

Automatic Metrics

Tasks automatically log:

@task("process")
def process_task(data):
    # Automatically logged:
    # - task.process.duration (in seconds)
    # - task.process.status (success/failure)
    # - Error details (if failed)
    return result

Enhanced Error Context

Task failures are wrapped with rich context:

@task("process", input="data")
def process_task(data):
    raise ValueError("Processing failed!")

# Error includes:
# - Task name
# - Input arguments
# - Original exception
# - MLflow span attributes

Error output:

TaskExecutionError: Task 'process' failed.
Original error: ValueError: Processing failed!
Context: {'input': 'data', 'row_index': None}

Task Registry & Visualization

Tasks are automatically registered for visualization:

from latent.registry import TaskRegistry

# After running flows, visualize the pipeline
TaskRegistry.print_ascii()

Output:

=== 🌳 Canopy Pipeline Topology ===

📦 Flow: data_pipeline
  ├── ⚙️  load
  │    ⬆️  out: raw_data
  ├── ⚙️  clean
  │    ⬇️  in: raw_data
  │    ⬆️  out: cleaned_data
  ├── ⚙️  process
  │    ⬇️  in: cleaned_data
  │    ⬆️  out: processed_data

Accessing Configuration

Using the params Object

from latent.prefect import params

@task("process")
def process_task(data):
    # Access parameters from parameters.yaml
    batch_size = params.batch_size
    model = params.model_name

    # Dict-style access
    lr = params["learning_rate"]

    # With default value
    timeout = params.get("timeout", 30)

    return process(data, batch_size, model, lr)

Cross-Flow Parameter Access

You can access parameters from other flows using dot notation with params.get():

from latent.prefect import params

@task("analyze")
def analyze_task(data):
    # Access parameters from another flow
    upstream_model = params.get("upstream_flow.model_name")
    upstream_temp = params.get("upstream_flow.temperature", default=0.7)

    # Current flow parameters still work
    batch_size = params.batch_size

    return analyze(data, model=upstream_model, temp=upstream_temp)

This is useful when:

  • You need to ensure consistency across flows (e.g., same model configuration)
  • A downstream flow needs to know upstream configuration
  • You want to reference shared settings without duplicating them

How it works:

  1. params.get("other_flow.param_name") loads parameters.yaml from flows/other_flow/
  2. Global parameters from global.yaml are merged (flow-specific takes precedence)
  3. If the flow or parameter doesn't exist, returns the default value
# Example: Downstream flow accessing upstream configuration
@flow("downstream_flow")
def downstream_flow():
    # Get the model that was used in upstream processing
    upstream_model = params.get("upstream_flow.model_name")

    # Use the same model for consistency
    if upstream_model:
        logger.info(f"Using upstream model: {upstream_model}")

    result = process_task()
    return result

Note

Cross-flow parameter access only works with params.get(), not with attribute access (params.other_flow) or dict access (params["other_flow.param"]).

Using get_config()

from latent.prefect import get_config

@flow("my_flow")
def my_flow():
    config = get_config()

    # Access parameters
    batch_size = config["parameters"]["batch_size"]

    # Access catalog
    datasets = config["catalog"]

Logging

Using the logger Object

from latent.prefect import logger

@task("process")
def process_task(data):
    logger.debug("Detailed debug information")
    logger.info(f"Processing {len(data)} rows")
    logger.warning("This might take a while")
    logger.error("Something went wrong")

    try:
        result = risky_operation()
    except Exception as e:
        logger.exception("Operation failed")
        raise

    return result

Using get_flow_logger()

from latent.prefect import get_flow_logger

@task("process")
def process_task(data):
    logger = get_flow_logger()
    logger.info("Task started")
    return data

Advanced Patterns

Parallel Task Execution

@flow("parallel_flow")
def parallel_flow():
    items = load_items()

    # Execute task in parallel for each item
    futures = process_item_task.map(
        item=items,
        idx=range(len(items))
    )

    # Wait for all tasks to complete
    results = [f.result() for f in futures]

    return aggregate(results)


@task("process_item")
def process_item_task(item, idx):
    logger.info(f"Processing item {idx}")
    return process(item)

Conditional Task Execution

@flow("conditional_flow")
def conditional_flow():
    data = load_data()

    if params.enable_preprocessing:
        data = preprocess_task(data)

    results = analyze_task(data)

    if params.save_intermediate:
        save_intermediate_task(data)

    return results

Dynamic Task Configuration

@task("configurable_task")
def configurable_task(data):
    # Get task-specific config
    task_config = params.get("task_config", {})

    threshold = task_config.get("threshold", 0.5)
    method = task_config.get("method", "default")

    return process(data, threshold=threshold, method=method)

Error Handling

@task("robust_task", retry=True)
def robust_task(data):
    try:
        result = process(data)
    except ValueError as e:
        logger.warning(f"Validation error: {e}, using defaults")
        result = default_result()
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise

    return result

Subflows

@flow("subflow")
def subflow(data):
    """A reusable subflow."""
    cleaned = clean_task(data)
    processed = process_task(cleaned)
    return processed


@flow("main_flow")
def main_flow():
    """Main flow that calls subflows."""
    data1 = load_data_1()
    data2 = load_data_2()

    # Call subflow for each dataset
    result1 = subflow(data1)
    result2 = subflow(data2)

    return combine(result1, result2)

Hybrid Caching Strategy

Latent uses two caching strategies depending on the task type:

1. File-Based Caching (Batch)

For tasks with explicit input datasets, uses file hashes:

@task("cached_task", input="raw_data", output="processed_data")
def cached_task(raw_data):
    # Cached based on:
    # 1. The hash of raw_data
    # 2. The task code signature
    # 3. Cross-flow references (if any)

    # If raw_data hasn't changed,
    # the cached result is returned immediately
    return expensive_processing(raw_data)

Benefits: - Based on actual data content - Works across flows (cross-flow dependencies) - Invalidates automatically when data changes

2. Row-Level Caching (Granular)

For tasks without explicit inputs (e.g., mapped over rows):

@task("process_row")
def process_row(row_data: dict) -> dict:
    # Cached based on stable hash of row_data
    # Each unique row_data gets its own cache entry
    return expensive_computation(row_data)

@flow("my_flow")
def my_flow():
    rows = load_data()

    # Each row is cached independently
    results = process_row.map(row_data=rows)

    return results

Benefits: - Fine-grained caching (per-row or per-argument) - Efficient for mapped tasks - Handles DataFrames, dicts, Pydantic models - Stable hashing ensures consistency

Cache Key Generation

File-Based (with input):

MD5(hash_1 + hash_2 + ... + task_signature)

Row-Level (without input):

MD5(task_name + stable_hash(arg1) + stable_hash(arg2) + ...)

How Caching Works

  1. Cache Key Generation: Based on strategy (file or row-level)
  2. Cache Lookup: Prefect checks if a cached result exists
  3. Cache Hit: Return cached result (no execution)
  4. Cache Miss: Execute task and cache result
  5. Cache Expiration: Default 7 days, configurable

Disabling Caching

# For non-deterministic tasks
@task("random_task", cache=False)
def random_task():
    return random.random()

# For always-fresh data
@task("fetch_latest", cache=False)
def fetch_latest():
    return api.get_latest()

Custom Cache Duration

from datetime import timedelta

# Cache for 30 days
@task("expensive_task", input="data", cache_expiration=timedelta(days=30))
def expensive_task(data):
    return very_expensive_computation(data)

# Cache for 1 hour
@task("api_task", cache_expiration=timedelta(hours=1))
def api_task():
    return fetch_from_api()

Checkpointing for Resumability

While task-level caching handles completed tasks, checkpointing enables resumability within long-running tasks that iterate over many items. This is useful when network failures or other errors cause a task to fail mid-execution.

The Problem

@task("process_documents")
def process_documents(documents: list[dict]) -> list[dict]:
    results = []
    for doc in documents:  # 10,000 documents
        # Network call that might fail
        result = llm_client.complete(doc["prompt"])
        results.append(result)

    return results

# Problem: If this fails at document 7,500...
# On retry, it starts from scratch and reprocesses all 10,000 documents!

The Solution: @checkpoint

Use the @checkpoint decorator on inner functions to cache their results persistently:

from latent.prefect import task, checkpoint

@task("process_documents")
def process_documents(documents: list[dict]) -> list[dict]:

    @checkpoint  # Results persist to disk
    def expensive_llm_call(doc: dict) -> dict:
        response = llm_client.complete(doc["prompt"])
        return {"id": doc["id"], "response": response}

    results = []
    for doc in documents:
        # On retry, already-processed docs return instantly from cache
        result = expensive_llm_call(doc)
        results.append(result)

    return results

# On retry after failure at doc 7,500:
# - First 7,500 return instantly from checkpoint cache
# - Only remaining 2,500 are processed

How It Works

  1. First call: Function executes, result saved to .latent/cache/
  2. Same arguments: Returns cached result immediately
  3. Different arguments: Executes and caches separately
  4. Flow retries: Cache persists across process restarts

Development Mode Only

Important: Checkpointing is only active in development mode. In production, it's a no-op.

# Enable checkpointing (development mode)
export LATENT_ENVIRONMENT=dev

# Or in config/latent.toml
environment = "dev"
Environment Checkpoint Behavior
dev / development Enabled - caches to disk for resumability
production (default) Disabled - no-op, runs fresh every time

Rationale: In production, you want predictable, fresh executions. Checkpointing is for iterative development where you're debugging and want to resume after failures.

When to Use Checkpointing

Use @checkpoint when:

  • ✅ Task iterates over many items internally (not using .map())
  • ✅ Each item takes significant time (API calls, model inference)
  • ✅ Network failures or rate limits might cause partial failures
  • ✅ You're in development/debugging mode

Use task .map() instead when:

  • ✅ Items can be processed as separate task runs
  • ✅ You want Prefect's built-in concurrency control
  • ✅ You need visibility into individual item status in Prefect UI

Use task-level caching (cache=True) when:

  • ✅ Entire task result should be cached (not individual items)
  • ✅ Task is deterministic and data-driven

Multiple Checkpointed Functions

You can checkpoint multiple functions independently:

@task("complex_pipeline")
def complex_pipeline(items: list[dict]) -> list[dict]:

    @checkpoint
    def step_1_extract(item: dict) -> dict:
        return extraction_model.run(item)

    @checkpoint
    def step_2_transform(extracted: dict) -> dict:
        return transformation_model.run(extracted)

    @checkpoint
    def step_3_validate(transformed: dict) -> dict:
        return validation_model.run(transformed)

    results = []
    for item in items:
        # Each step independently cached
        extracted = step_1_extract(item)
        transformed = step_2_transform(extracted)
        validated = step_3_validate(transformed)
        results.append(validated)

    return results

Clearing Checkpoints

Manually clear checkpoint cache when needed:

from latent.prefect import clear_checkpoints

# After successful completion
clear_checkpoints()

Or delete the cache directory:

rm -rf .latent/cache/

Cache Location

Checkpoint data is stored in:

.latent/
  cache/          # SQLite database managed by diskcache
    cache.db      # Cached function results
    ...

Note: Add .latent/ to your .gitignore (not committed to version control).

Thread Safety and Parallel Execution

The @checkpoint decorator is thread-safe and can be used with parallel task execution. The underlying cache uses proper locking to handle concurrent access from multiple threads.

@task("parallel_processing")
def parallel_processing(items: list[dict]) -> list[dict]:

    @checkpoint
    def process_item(item: dict) -> dict:
        return expensive_operation(item)

    # Safe to use with ThreadPoolExecutor
    from concurrent.futures import ThreadPoolExecutor

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_item, item) for item in items]
        results = [f.result() for f in futures]

    return results

How it works:

  • The checkpoint cache is initialized with thread-safe locking (double-checked locking pattern)
  • Only one cache instance is created, shared across all threads
  • The underlying diskcache library handles concurrent read/write operations safely

Best practices for parallel checkpointing:

  1. Define checkpointed functions at the top level of your task - not inside loops or nested functions
  2. Use unique function names - each checkpointed function should have a distinct name
  3. Avoid redefining checkpointed functions - dynamically creating @checkpoint decorated functions in parallel contexts can cause cache key collisions
# ✅ Good: Define checkpointed function once at task level
@task("process_batch")
def process_batch(items: list[dict]) -> list[dict]:

    @checkpoint
    def call_api(item: dict) -> dict:
        return api.process(item)

    # Use the same function for all items
    return [call_api(item) for item in items]


# ❌ Avoid: Creating checkpointed functions inside parallel workers
@task("process_batch")
def process_batch(items: list[dict]) -> list[dict]:
    results = []
    for item in items:
        # Creating @checkpoint functions in a loop is inefficient
        @checkpoint
        def process_this_item(x: dict) -> dict:
            return api.process(x)

        results.append(process_this_item(item))
    return results

Troubleshooting parallel execution issues:

If you experience hangs or deadlocks with checkpointing during parallel execution:

  1. Disable checkpointing temporarily: Set environment = "production" in config/latent.toml
  2. Check concurrency settings: Reduce concurrency if experiencing SQLite lock contention
  3. Clear the cache: Run rm -rf .latent/cache/ to remove potentially corrupted cache data

Best Practices

1. Small, Focused Tasks

# ✅ Good: Single responsibility
@task("clean_data")
def clean_data(data):
    return data.dropna()

@task("transform_data")
def transform_data(data):
    return data.apply(transform_fn)

# ❌ Bad: Too much in one task
@task("process_everything")
def process_everything(data):
    data = data.dropna()
    data = data.apply(transform_fn)
    data = data.merge(other_data)
    # ... many more operations

2. Use Type Hints

# ✅ Good: Clear types
@task("process")
def process_task(data: pd.DataFrame) -> pd.DataFrame:
    return data.dropna()

# ❌ Bad: No type hints
@task("process")
def process_task(data):
    return data.dropna()

3. Descriptive Logging

# ✅ Good: Informative logs
@task("process")
def process_task(data):
    logger.info(f"Processing {len(data)} rows with batch_size={params.batch_size}")
    result = process(data)
    logger.info(f"Processed {len(result)} rows, dropped {len(data) - len(result)}")
    return result

# ❌ Bad: No context
@task("process")
def process_task(data):
    logger.info("Processing")
    return process(data)

4. Handle Errors Gracefully

# ✅ Good: Explicit error handling
@task("api_call")
def api_call_task():
    try:
        result = api.fetch()
    except TimeoutError:
        logger.warning("API timeout, using cached data")
        result = load_cached()
    except Exception as e:
        logger.error(f"API call failed: {e}")
        raise
    return result

See Also