Skip to content

Pipeline Agents

PipelineAgent is a phase-based agent with deterministic routing. Unlike free-form ReAct agents where the LLM decides what to do next, a PipelineAgent executes a directed graph of phases where code decides the transitions between them.

Use PipelineAgent when you need:

  • Multi-step workflows with predictable control flow
  • Structured LLM calls scoped to specific subtasks
  • A mix of LLM reasoning, tool execution, and Python logic in one agent
  • Guardrails, retries, and side effects at well-defined boundaries

Quick Start

from pydantic import BaseModel
from latent.agents.pipeline import PipelineAgent, phase, tool, respond, PipelineState
from latent.agents.events import Message


class Intent(BaseModel):
    category: str
    confidence: float


class SupportAgent(PipelineAgent):
    @phase(output_schema=Intent)
    def classify(self, state: PipelineState, messages: list[Message]) -> str:
        return "Classify the user's intent into: billing, technical, general."

    @tool(transitions={"respond": None})
    def fetch_context(self, state: PipelineState, messages: list[Message]) -> str:
        intent = state.outputs["classify"]
        return lookup_kb(intent.category)

    @respond
    def respond(self, state: PipelineState, messages: list[Message]) -> str:
        return f"Based on our records: {state.outputs['fetch_context']}"


agent = SupportAgent(model="gpt-4o")
answer = agent.ask("Why was I charged twice?")

Decorator API

Four decorators define the building blocks of a pipeline. Methods are discovered in declaration order -- the first non-@respond method becomes the initial phase.

@phase -- LLM Call

Sends the method's return string as a system prompt to the LLM. Optionally parses the response into a Pydantic model via structured output.

@phase(
    model="gpt-4o",              # Override pipeline default model
    output_schema=Intent,        # Pydantic model for structured output (optional)
    temperature=0.0,             # LLM temperature (default: 0.0)
    max_tokens=4096,             # Max response tokens (default: 4096)
    transitions={"next_phase": lambda output, state: output.confidence > 0.8},
)
def classify(self, state: PipelineState, messages: list[Message]) -> str:
    return "Classify the user query."

Signature: (self, state: PipelineState, messages: list[Message]) -> str

The return value is the system prompt. The LLM response (parsed or raw) is stored in state.outputs["classify"].

@tool -- Python Function

Runs arbitrary Python code. No LLM call. Can be sync or async.

@tool(transitions={"respond": None})
def search(self, state: PipelineState, messages: list[Message]) -> dict:
    query = state.outputs["classify"].query
    return vector_db.search(query, top_k=5)

Signature: (self, state: PipelineState, messages: list[Message]) -> Any

@respond -- Terminal Phase

Emits the return value as the final response and stops the pipeline. Every pipeline must have at least one reachable @respond phase.

# Bare decorator -- return value emitted directly
@respond
def answer(self, state: PipelineState, messages: list[Message]) -> str:
    return state.outputs["search"]["summary"]

# With model -- return value becomes a prompt, LLM generates the final response
@respond(model="gpt-4o")
def answer(self, state: PipelineState, messages: list[Message]) -> str:
    return f"Summarize this for the user: {state.outputs['search']}"

Signature: (self, state: PipelineState, messages: list[Message]) -> str

Note

@respond phases are never chosen as the initial phase, even if declared first.

@subagent -- Scoped ReAct Loop

Spawns a bounded tool-calling agent. The method returns instructions; the framework runs a LiteLLMAgent with the specified tools for up to max_iterations steps.

from latent.agents import tool as agent_tool

@agent_tool
def web_search(query: str) -> str:
    """Search the web."""
    return search_api(query)

class ResearchAgent(PipelineAgent):
    @subagent(
        tools=[web_search],
        max_iterations=5,
        model="gpt-4o",
        transitions={"summarize": None},
    )
    def research(self, state: PipelineState, messages: list[Message]) -> str:
        return "Research the user's question thoroughly using web search."

Signature: (self, state: PipelineState, messages: list[Message]) -> str

Warning

Inner agent TextDelta events are remapped to ReasoningDelta -- only the @respond phase produces the final TextDelta output.

Type System

All types are importable from latent.agents.pipeline.

PipelineState

Mutable runtime state for one pipeline execution. Passed to every phase method.

Field Type Description
current_phase str Name of the currently executing phase
outputs dict[str, Any] Phase name to output mapping. Populated after each phase completes.
retry_counts dict[str, int] How many times each phase has been re-entered
phase_history list[str] Ordered list of all phases executed
metadata dict[str, Any] Free-form storage for cross-phase data
# Access output from a previous phase
intent = state.outputs["classify"]

# Check retry count
if state.retries("classify") > 2:
    return "fallback instructions"

# Store arbitrary data
state.metadata["user_tier"] = "premium"

Transition

Routes to another phase. Placed in a phase's transitions list; first match wins.

Transition(
    to="next_phase",                                    # Target phase name
    when=lambda output, state: output.score > 0.5,     # Predicate (None = always)
    side_effect=lambda output, state: log(output),      # Optional callback
)

Respond

Terminal transition. Emits text and stops the pipeline.

Respond(
    output=lambda output, state: f"Answer: {output}",  # Produces final text
    when=lambda output, state: state.retries("x") > 2, # Predicate (None = always)
    side_effect=None,                                   # Optional callback
    instructions="Rewrite in a friendly tone.",         # Optional LLM formatting pass
    model="gpt-4o-mini",                                # Model for formatting call
)

Phase Types (Advanced)

When building pipelines programmatically instead of using decorators:

Type Maps to Description
PromptPhase @phase Single LLM call with optional structured output
FunctionPhase @tool / @respond Arbitrary Python function
ReActPhase @subagent Bounded tool-calling loop via scoped LiteLLMAgent

Transitions and Routing

Transitions are evaluated in declaration order after each phase completes. The first transition whose when predicate returns True (or is None) wins.

Decorator Shorthand

In the decorator API, transitions are a dict[str, Callable | None]:

@phase(
    output_schema=Intent,
    transitions={
        "handle_billing": lambda output, state: output.category == "billing",
        "handle_technical": lambda output, state: output.category == "technical",
        "respond": None,  # None = always matches (fallback)
    },
)
def classify(self, state, messages):
    return "Classify the intent."

Tip

Always include a fallback transition (with None predicate) as the last entry. If no transition matches, the pipeline emits an error and stops.

Implicit Transitions

When a phase has no explicit transitions, the framework chains it to the next declared phase. If it is the last non-respond phase, it transitions to the first @respond method.

Side Effects

Both Transition and Respond accept a side_effect callback for logging, metrics, or state mutations:

@phase(transitions={
    "next": None,
})
def analyze(self, state, messages):
    ...

# Programmatic equivalent with side effect:
PromptPhase(
    name="analyze",
    instructions="...",
    transitions=[
        Transition(
            to="next",
            side_effect=lambda output, state: state.metadata.update({"analyzed": True}),
        ),
    ],
)

Execution Model

  1. The pipeline starts at the initial phase (first non-@respond method).
  2. The phase executes (@phase calls the LLM, @tool runs Python, @subagent runs a ReAct loop).
  3. The phase output is stored in state.outputs[phase_name].
  4. Transitions are evaluated in order. First match wins.
  5. If the match is a Transition, the pipeline moves to the target phase (step 2).
  6. If the match is a Respond, the output text is emitted as TextDelta and the pipeline stops.
  7. A safety limit (max_total_phases, default 20) prevents infinite loops.

Event Stream

PipelineAgent yields these events during execution:

Event When
StepBoundary Before each phase
PhaseStarted Phase begins (includes retry count)
ReasoningDelta LLM tokens from @phase and @subagent (intermediate reasoning)
PhaseCompleted Phase finished (includes output preview)
PhaseRouted Transition to next phase
TextDelta Final response from @respond
LLMCallStart / LLMCallEnd LLM call boundaries with token usage
Usage Aggregated token counts
Error No matching transition or phase limit exceeded

Calling Conventions

agent = MyPipelineAgent(model="gpt-4o")

# Single-shot (sync)
result = agent.run([Message(role="user", content="Hello")])

# Conversational (maintains chat history)
result = agent.ask("Hello")
result = agent.ask("Follow up question")

# Callable (resets context each call, useful for eval)
result = agent("Hello")

# Async streaming
async for event in agent.stream([Message(role="user", content="Hello")]):
    if isinstance(event, TextDelta):
        print(event.text, end="")

Full Example: Research Assistant

A three-phase agent that classifies the query, optionally researches it, then responds.

from pydantic import BaseModel, Field
from latent.agents.pipeline import PipelineAgent, phase, tool, subagent, respond, PipelineState
from latent.agents.events import Message
from latent.agents import tool as agent_tool


# --- Structured output schema ---

class QueryAnalysis(BaseModel):
    needs_research: bool = Field(description="Whether the query requires web research")
    topic: str = Field(description="The core topic to research")
    complexity: str = Field(description="low, medium, or high")


# --- Tools for the subagent ---

@agent_tool
def web_search(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query}"


@agent_tool
def fetch_page(url: str) -> str:
    """Fetch and extract text from a web page."""
    return f"Page content from: {url}"


# --- The pipeline agent ---

class ResearchAssistant(PipelineAgent):
    """Classifies queries, researches when needed, responds with context."""

    @phase(
        output_schema=QueryAnalysis,
        transitions={
            "research": lambda output, state: output.needs_research,
            "direct_respond": None,
        },
    )
    def classify(self, state: PipelineState, messages: list[Message]) -> str:
        return (
            "Analyze the user's query. Determine if it requires web research "
            "or can be answered directly. Identify the core topic and complexity."
        )

    @subagent(
        tools=[web_search, fetch_page],
        max_iterations=5,
        transitions={"respond": None},
    )
    def research(self, state: PipelineState, messages: list[Message]) -> str:
        analysis = state.outputs["classify"]
        return (
            f"Research the topic: {analysis.topic}\n"
            f"Complexity: {analysis.complexity}\n"
            "Use web_search to find relevant information, then fetch_page "
            "for detailed content. Compile a thorough summary."
        )

    @respond
    def direct_respond(self, state: PipelineState, messages: list[Message]) -> str:
        analysis = state.outputs["classify"]
        return f"Topic: {analysis.topic}. This is a straightforward question."

    @respond(model="gpt-4o")
    def respond(self, state: PipelineState, messages: list[Message]) -> str:
        research = state.outputs["research"]
        return (
            f"Using the following research, provide a clear and thorough answer "
            f"to the user's question:\n\n{research}"
        )


# --- Usage ---

agent = ResearchAssistant(model="gpt-4o", name="research-assistant")
answer = agent.ask("What were the key outcomes of the latest IPCC report?")
print(answer)

Execution flow for this example

classify (PromptPhase)
    |
    +-- needs_research=True  --> research (ReActPhase) --> respond (@respond + LLM)
    |
    +-- needs_research=False --> direct_respond (@respond, raw text)

Programmatic Pipeline Construction

For dynamic pipelines or when you prefer explicit configuration over decorators:

from latent.agents.pipeline import (
    PipelineAgent, Pipeline, PromptPhase, FunctionPhase,
    Transition, Respond, PipelineState,
)

pipeline = Pipeline(
    name="classifier",
    model="gpt-4o",
    initial="classify",
    max_total_phases=10,
    phases=[
        PromptPhase(
            name="classify",
            instructions="Classify the user query as billing or technical.",
            output_schema=Intent,
            transitions=[
                Transition(to="lookup", when=lambda o, s: o.confidence > 0.7),
                Respond(output=lambda o, s: "I'm not sure how to help with that."),
            ],
        ),
        FunctionPhase(
            name="lookup",
            fn=lambda state, msgs: kb_search(state.outputs["classify"].category),
            transitions=[
                Respond(output=lambda o, s: f"Here's what I found: {o}"),
            ],
        ),
    ],
)

agent = PipelineAgent(pipeline=pipeline)

Guardrails Integration

PipelineAgent extends BaseAgent, so @guardrail methods are auto-discovered and applied to the outer stream() boundary:

from latent.guardrails import guardrail

class SafeAgent(PipelineAgent):
    @guardrail(timing="pre", outcome="active", message="Blocked: off-topic")
    def topic_filter(self, prompt: str) -> bool:
        return "unrelated_keyword" in prompt.lower()

    @phase(output_schema=Response)
    def generate(self, state, messages):
        return "Answer the user's question."

    @respond
    def answer(self, state, messages):
        return state.outputs["generate"].text

Guardrails wrap the entire pipeline execution, not individual phases.