Pipeline Agents¶
PipelineAgent is a phase-based agent with deterministic routing. Unlike free-form ReAct agents where the LLM decides what to do next, a PipelineAgent executes a directed graph of phases where code decides the transitions between them.
Use PipelineAgent when you need:
- Multi-step workflows with predictable control flow
- Structured LLM calls scoped to specific subtasks
- A mix of LLM reasoning, tool execution, and Python logic in one agent
- Guardrails, retries, and side effects at well-defined boundaries
Quick Start¶
from pydantic import BaseModel
from latent.agents.pipeline import PipelineAgent, phase, tool, respond, PipelineState
from latent.agents.events import Message
class Intent(BaseModel):
category: str
confidence: float
class SupportAgent(PipelineAgent):
@phase(output_schema=Intent)
def classify(self, state: PipelineState, messages: list[Message]) -> str:
return "Classify the user's intent into: billing, technical, general."
@tool(transitions={"respond": None})
def fetch_context(self, state: PipelineState, messages: list[Message]) -> str:
intent = state.outputs["classify"]
return lookup_kb(intent.category)
@respond
def respond(self, state: PipelineState, messages: list[Message]) -> str:
return f"Based on our records: {state.outputs['fetch_context']}"
agent = SupportAgent(model="gpt-4o")
answer = agent.ask("Why was I charged twice?")
Decorator API¶
Four decorators define the building blocks of a pipeline. Methods are discovered in declaration order -- the first non-@respond method becomes the initial phase.
@phase -- LLM Call¶
Sends the method's return string as a system prompt to the LLM. Optionally parses the response into a Pydantic model via structured output.
@phase(
model="gpt-4o", # Override pipeline default model
output_schema=Intent, # Pydantic model for structured output (optional)
temperature=0.0, # LLM temperature (default: 0.0)
max_tokens=4096, # Max response tokens (default: 4096)
transitions={"next_phase": lambda output, state: output.confidence > 0.8},
)
def classify(self, state: PipelineState, messages: list[Message]) -> str:
return "Classify the user query."
Signature: (self, state: PipelineState, messages: list[Message]) -> str
The return value is the system prompt. The LLM response (parsed or raw) is stored in state.outputs["classify"].
@tool -- Python Function¶
Runs arbitrary Python code. No LLM call. Can be sync or async.
@tool(transitions={"respond": None})
def search(self, state: PipelineState, messages: list[Message]) -> dict:
query = state.outputs["classify"].query
return vector_db.search(query, top_k=5)
Signature: (self, state: PipelineState, messages: list[Message]) -> Any
@respond -- Terminal Phase¶
Emits the return value as the final response and stops the pipeline. Every pipeline must have at least one reachable @respond phase.
# Bare decorator -- return value emitted directly
@respond
def answer(self, state: PipelineState, messages: list[Message]) -> str:
return state.outputs["search"]["summary"]
# With model -- return value becomes a prompt, LLM generates the final response
@respond(model="gpt-4o")
def answer(self, state: PipelineState, messages: list[Message]) -> str:
return f"Summarize this for the user: {state.outputs['search']}"
Signature: (self, state: PipelineState, messages: list[Message]) -> str
Note
@respond phases are never chosen as the initial phase, even if declared first.
@subagent -- Scoped ReAct Loop¶
Spawns a bounded tool-calling agent. The method returns instructions; the framework runs a LiteLLMAgent with the specified tools for up to max_iterations steps.
from latent.agents import tool as agent_tool
@agent_tool
def web_search(query: str) -> str:
"""Search the web."""
return search_api(query)
class ResearchAgent(PipelineAgent):
@subagent(
tools=[web_search],
max_iterations=5,
model="gpt-4o",
transitions={"summarize": None},
)
def research(self, state: PipelineState, messages: list[Message]) -> str:
return "Research the user's question thoroughly using web search."
Signature: (self, state: PipelineState, messages: list[Message]) -> str
Warning
Inner agent TextDelta events are remapped to ReasoningDelta -- only the @respond phase produces the final TextDelta output.
Type System¶
All types are importable from latent.agents.pipeline.
PipelineState¶
Mutable runtime state for one pipeline execution. Passed to every phase method.
| Field | Type | Description |
|---|---|---|
current_phase |
str |
Name of the currently executing phase |
outputs |
dict[str, Any] |
Phase name to output mapping. Populated after each phase completes. |
retry_counts |
dict[str, int] |
How many times each phase has been re-entered |
phase_history |
list[str] |
Ordered list of all phases executed |
metadata |
dict[str, Any] |
Free-form storage for cross-phase data |
# Access output from a previous phase
intent = state.outputs["classify"]
# Check retry count
if state.retries("classify") > 2:
return "fallback instructions"
# Store arbitrary data
state.metadata["user_tier"] = "premium"
Transition¶
Routes to another phase. Placed in a phase's transitions list; first match wins.
Transition(
to="next_phase", # Target phase name
when=lambda output, state: output.score > 0.5, # Predicate (None = always)
side_effect=lambda output, state: log(output), # Optional callback
)
Respond¶
Terminal transition. Emits text and stops the pipeline.
Respond(
output=lambda output, state: f"Answer: {output}", # Produces final text
when=lambda output, state: state.retries("x") > 2, # Predicate (None = always)
side_effect=None, # Optional callback
instructions="Rewrite in a friendly tone.", # Optional LLM formatting pass
model="gpt-4o-mini", # Model for formatting call
)
Phase Types (Advanced)¶
When building pipelines programmatically instead of using decorators:
| Type | Maps to | Description |
|---|---|---|
PromptPhase |
@phase |
Single LLM call with optional structured output |
FunctionPhase |
@tool / @respond |
Arbitrary Python function |
ReActPhase |
@subagent |
Bounded tool-calling loop via scoped LiteLLMAgent |
Transitions and Routing¶
Transitions are evaluated in declaration order after each phase completes. The first transition whose when predicate returns True (or is None) wins.
Decorator Shorthand¶
In the decorator API, transitions are a dict[str, Callable | None]:
@phase(
output_schema=Intent,
transitions={
"handle_billing": lambda output, state: output.category == "billing",
"handle_technical": lambda output, state: output.category == "technical",
"respond": None, # None = always matches (fallback)
},
)
def classify(self, state, messages):
return "Classify the intent."
Tip
Always include a fallback transition (with None predicate) as the last entry. If no transition matches, the pipeline emits an error and stops.
Implicit Transitions¶
When a phase has no explicit transitions, the framework chains it to the next declared phase. If it is the last non-respond phase, it transitions to the first @respond method.
Side Effects¶
Both Transition and Respond accept a side_effect callback for logging, metrics, or state mutations:
@phase(transitions={
"next": None,
})
def analyze(self, state, messages):
...
# Programmatic equivalent with side effect:
PromptPhase(
name="analyze",
instructions="...",
transitions=[
Transition(
to="next",
side_effect=lambda output, state: state.metadata.update({"analyzed": True}),
),
],
)
Execution Model¶
- The pipeline starts at the initial phase (first non-
@respondmethod). - The phase executes (
@phasecalls the LLM,@toolruns Python,@subagentruns a ReAct loop). - The phase output is stored in
state.outputs[phase_name]. - Transitions are evaluated in order. First match wins.
- If the match is a
Transition, the pipeline moves to the target phase (step 2). - If the match is a
Respond, the output text is emitted asTextDeltaand the pipeline stops. - A safety limit (
max_total_phases, default 20) prevents infinite loops.
Event Stream¶
PipelineAgent yields these events during execution:
| Event | When |
|---|---|
StepBoundary |
Before each phase |
PhaseStarted |
Phase begins (includes retry count) |
ReasoningDelta |
LLM tokens from @phase and @subagent (intermediate reasoning) |
PhaseCompleted |
Phase finished (includes output preview) |
PhaseRouted |
Transition to next phase |
TextDelta |
Final response from @respond |
LLMCallStart / LLMCallEnd |
LLM call boundaries with token usage |
Usage |
Aggregated token counts |
Error |
No matching transition or phase limit exceeded |
Calling Conventions¶
agent = MyPipelineAgent(model="gpt-4o")
# Single-shot (sync)
result = agent.run([Message(role="user", content="Hello")])
# Conversational (maintains chat history)
result = agent.ask("Hello")
result = agent.ask("Follow up question")
# Callable (resets context each call, useful for eval)
result = agent("Hello")
# Async streaming
async for event in agent.stream([Message(role="user", content="Hello")]):
if isinstance(event, TextDelta):
print(event.text, end="")
Full Example: Research Assistant¶
A three-phase agent that classifies the query, optionally researches it, then responds.
from pydantic import BaseModel, Field
from latent.agents.pipeline import PipelineAgent, phase, tool, subagent, respond, PipelineState
from latent.agents.events import Message
from latent.agents import tool as agent_tool
# --- Structured output schema ---
class QueryAnalysis(BaseModel):
needs_research: bool = Field(description="Whether the query requires web research")
topic: str = Field(description="The core topic to research")
complexity: str = Field(description="low, medium, or high")
# --- Tools for the subagent ---
@agent_tool
def web_search(query: str) -> str:
"""Search the web for information."""
return f"Search results for: {query}"
@agent_tool
def fetch_page(url: str) -> str:
"""Fetch and extract text from a web page."""
return f"Page content from: {url}"
# --- The pipeline agent ---
class ResearchAssistant(PipelineAgent):
"""Classifies queries, researches when needed, responds with context."""
@phase(
output_schema=QueryAnalysis,
transitions={
"research": lambda output, state: output.needs_research,
"direct_respond": None,
},
)
def classify(self, state: PipelineState, messages: list[Message]) -> str:
return (
"Analyze the user's query. Determine if it requires web research "
"or can be answered directly. Identify the core topic and complexity."
)
@subagent(
tools=[web_search, fetch_page],
max_iterations=5,
transitions={"respond": None},
)
def research(self, state: PipelineState, messages: list[Message]) -> str:
analysis = state.outputs["classify"]
return (
f"Research the topic: {analysis.topic}\n"
f"Complexity: {analysis.complexity}\n"
"Use web_search to find relevant information, then fetch_page "
"for detailed content. Compile a thorough summary."
)
@respond
def direct_respond(self, state: PipelineState, messages: list[Message]) -> str:
analysis = state.outputs["classify"]
return f"Topic: {analysis.topic}. This is a straightforward question."
@respond(model="gpt-4o")
def respond(self, state: PipelineState, messages: list[Message]) -> str:
research = state.outputs["research"]
return (
f"Using the following research, provide a clear and thorough answer "
f"to the user's question:\n\n{research}"
)
# --- Usage ---
agent = ResearchAssistant(model="gpt-4o", name="research-assistant")
answer = agent.ask("What were the key outcomes of the latest IPCC report?")
print(answer)
Execution flow for this example¶
classify (PromptPhase)
|
+-- needs_research=True --> research (ReActPhase) --> respond (@respond + LLM)
|
+-- needs_research=False --> direct_respond (@respond, raw text)
Programmatic Pipeline Construction¶
For dynamic pipelines or when you prefer explicit configuration over decorators:
from latent.agents.pipeline import (
PipelineAgent, Pipeline, PromptPhase, FunctionPhase,
Transition, Respond, PipelineState,
)
pipeline = Pipeline(
name="classifier",
model="gpt-4o",
initial="classify",
max_total_phases=10,
phases=[
PromptPhase(
name="classify",
instructions="Classify the user query as billing or technical.",
output_schema=Intent,
transitions=[
Transition(to="lookup", when=lambda o, s: o.confidence > 0.7),
Respond(output=lambda o, s: "I'm not sure how to help with that."),
],
),
FunctionPhase(
name="lookup",
fn=lambda state, msgs: kb_search(state.outputs["classify"].category),
transitions=[
Respond(output=lambda o, s: f"Here's what I found: {o}"),
],
),
],
)
agent = PipelineAgent(pipeline=pipeline)
Guardrails Integration¶
PipelineAgent extends BaseAgent, so @guardrail methods are auto-discovered and applied to the outer stream() boundary:
from latent.guardrails import guardrail
class SafeAgent(PipelineAgent):
@guardrail(timing="pre", outcome="active", message="Blocked: off-topic")
def topic_filter(self, prompt: str) -> bool:
return "unrelated_keyword" in prompt.lower()
@phase(output_schema=Response)
def generate(self, state, messages):
return "Answer the user's question."
@respond
def answer(self, state, messages):
return state.outputs["generate"].text
Guardrails wrap the entire pipeline execution, not individual phases.