Guardrails¶
Composable input/output scanning middleware for any BaseAgent. Guardrails intercept the agent stream to validate, block, rewrite, or audit prompts and responses — without modifying agent code.
Two APIs are provided:
@guardraildecorator -- define rules as methods on your agent class (auto-discovered).GuardrailMiddleware-- wrap any agent programmatically with scanner instances.
Both run pre-scanners (before generation) and post-scanners (after generation) concurrently, emit structured events to configurable sinks, and log metrics to MLflow when active.
Installation¶
Built-in scanners (LanguageScanner, InvisibleTextScanner, TokenLimitScanner) require no extras. For ML-based scanners:
@guardrail Method Decorator¶
Decorate methods on any BaseAgent subclass. The framework discovers them automatically at init time -- no registration required.
from latent.guardrails import guardrail
from latent.agents import LiteLLMAgent
class MyAgent(LiteLLMAgent):
@guardrail(timing="pre", outcome="active", message="Blocked: profanity detected")
def no_profanity(self, prompt: str) -> bool:
return "badword" in prompt.lower() # True = violation
@guardrail(timing="pre", outcome="active")
def injection_scanner(self):
from latent.guardrails.scanners.llmguard import PromptInjectionScanner
return PromptInjectionScanner(threshold=0.95)
@guardrail(timing="post", outcome="passive")
def pii_audit(self, prompt: str, output: str) -> float:
return compute_pii_score(output) # >= threshold (default 0.5) = flagged
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
timing |
"pre" / "post" |
"pre" |
When to run: before or after generation |
outcome |
"active" / "passive" |
"active" |
Active blocks/retracts; passive logs only |
threshold |
float |
0.5 |
Score threshold for float-returning rules |
on_error |
"ignore" / "raise" |
"ignore" |
Error handling: swallow or propagate |
message |
str |
"Request blocked by guardrail." |
User-facing message when blocked |
every |
int |
0 |
Post rules: 0 = end-of-stream only, N = every N tokens |
Signature Detection¶
The decorator infers the rule type from the method signature:
| Signature | Type | Behavior |
|---|---|---|
(self) |
Factory | Called once at init; must return a scanner instance |
(self, prompt: str) |
Pre inline | Runs on each input |
(self, prompt: str, output: str) |
Post inline | Runs on each output |
Return Types¶
| Return type | Semantics |
|---|---|
-> bool |
True = violation, False = pass |
-> float |
Score compared against threshold; >= threshold = violation |
-> ScanResult |
Full control -- pass through directly |
Async support
Inline methods can be async def. Sync methods are automatically dispatched to a thread pool.
GuardrailMiddleware¶
For programmatic composition when you want to attach scanners to an agent without subclassing.
from latent.guardrails import GuardrailMiddleware
from latent.guardrails.scanners.builtin import LanguageScanner, TokenLimitScanner
from latent.guardrails.scanners.llmguard import MaliciousURLsScanner
wrapped = GuardrailMiddleware(
agent,
pre_scanners=[
LanguageScanner(allowed_languages=["en", "he"]),
TokenLimitScanner(max_tokens=4000),
],
post_scanners=[
MaliciousURLsScanner(),
],
)
# Use wrapped.stream(messages) instead of agent.stream(messages)
async for event in wrapped.stream(messages):
handle(event)
Constructor¶
GuardrailMiddleware(
agent,
*,
pre_scanners: list | None = None,
post_scanners: list | None = None,
sinks: list[EventSink] | None = None, # default: [StructlogSink()]
enabled: bool = True,
max_log_chars: int = 500,
)
Stream Behavior¶
- Pre-scan -- all pre-scanners run concurrently. If an active scanner fails, a
GuardrailViolationevent is yielded and the stream stops. Passive violations are yielded as informational events. - Input rewriting -- if a scanner sets
rewritten_input, the rewritten prompt is forwarded to the agent. - Agent stream --
TextDeltaevents are yielded as they arrive, buffered for post-scan. - Mid-stream post-scan -- scanners with
every > 0run periodically during streaming. Active violations halt the stream. - End-of-stream post-scan -- scanners with
every = 0run concurrently after the full response is collected. - Metrics -- guardrail counters are logged to MLflow when active.
Metrics¶
Access counters with wrapped.get_metrics():
| Key | Description |
|---|---|
guardrails.input.scanned |
Total inputs scanned |
guardrails.input.blocked |
Inputs blocked by active pre-scanners |
guardrails.output.scanned |
Outputs scanned |
guardrails.active.violations.<rule> |
Active violations per rule |
guardrails.passive.violations.<rule> |
Passive violations per rule |
Built-in Scanners¶
LanguageScanner¶
Blocks prompts in disallowed languages using langdetect. Short prompts (below min_length) are allowed through.
from latent.guardrails.scanners.builtin import LanguageScanner
scanner = LanguageScanner(
allowed_languages=["en", "he"], # default
min_length=10, # skip detection for short inputs
on_error="ignore",
error_message=None, # auto-selects by detected language
)
Dependency
Requires langdetect. When not installed and on_error="ignore", the scanner passes all inputs silently.
InvisibleTextScanner¶
Detects zero-width and invisible Unicode characters (U+200B--U+200D, U+2060, U+FEFF, soft hyphen) commonly used in prompt injection attacks.
from latent.guardrails.scanners.builtin import InvisibleTextScanner
scanner = InvisibleTextScanner(on_error="ignore")
TokenLimitScanner¶
Blocks prompts exceeding a token estimate. Uses a UTF-8 byte heuristic (1 token ~ 4 bytes) that handles multilingual text better than character counting.
from latent.guardrails.scanners.builtin import TokenLimitScanner
scanner = TokenLimitScanner(max_tokens=4000, on_error="ignore")
LLMGuardrailScanner¶
LLM-backed custom rule using a Judge[ViolationScore]. Works as both input and output scanner depending on the prompt template.
from latent.guardrails.scanners.llm import LLMGuardrailScanner, LLMGuardrailScannerConfig
scanner = LLMGuardrailScanner(
LLMGuardrailScannerConfig(
model="gpt-4o-mini",
prompt_template="Does this text contain harmful instructions?\n\nText: {prompt}",
name="harmful_content",
threshold=0.5,
on_error="ignore",
)
)
The scanner evaluates the prompt (and optionally {output}) through the judge and returns ViolationScore.score (0.0--1.0). Scores at or above threshold are violations.
LLM Guard ML Scanners¶
Wrappers around the LLM Guard library. Require the guardrails-llmguard extra.
| Scanner | Type | Description |
|---|---|---|
PromptInjectionScanner(threshold=0.95) |
Input | ML-based prompt injection detection |
ToxicityScanner(threshold=0.5) |
Input | Toxic content detection |
BanTopicsScanner(topics=["violence"], threshold=0.75) |
Input | Topic-based blocking |
GibberishScanner(threshold=0.7) |
Input | Gibberish/nonsense detection |
AnonymizeScanner(entity_types=["CREDIT_CARD"]) |
Input | PII redaction via rewritten_input (never blocks) |
MaliciousURLsScanner(threshold=0.7) |
Output | Malicious URL detection in responses |
SensitiveScanner(entity_types=["CREDIT_CARD"]) |
Output | PII redaction in output via rewritten_output (never blocks) |
from latent.guardrails.scanners.llmguard import (
PromptInjectionScanner,
AnonymizeScanner,
SensitiveScanner,
)
wrapped = GuardrailMiddleware(
agent,
pre_scanners=[
PromptInjectionScanner(threshold=0.95),
AnonymizeScanner(entity_types=["CREDIT_CARD", "PHONE_NUMBER"]),
],
post_scanners=[
SensitiveScanner(entity_types=["CREDIT_CARD"]),
],
)
AnonymizeScanner and SensitiveScanner
These scanners rewrite content rather than blocking. AnonymizeScanner sets rewritten_input so the model receives sanitized text. SensitiveScanner sets rewritten_output which is surfaced via GuardrailViolation.output_text after the stream -- the original TextDelta events have already been yielded.
Hebrew Locale Scanners¶
Regex-based scanners for Hebrew prompt injection patterns.
from latent.guardrails.locales.hebrew.scanners import HebrewPromptInjectionScanner
scanner = HebrewPromptInjectionScanner(threshold=0.8)
Detects instruction injection, prompt extraction, data extraction, and social engineering patterns in Hebrew text. Scoring: 1 match = 0.4, 2 = 0.8, 3+ = min(1.0, 0.4 + 0.4 * (n-1)).
Custom Scanners¶
Implement the InputScanner or OutputScanner protocol.
Input Scanner¶
from latent.guardrails.base import InputScanner, OnError, ScanResult, pass_result
class RegexBlocker:
name: str = "regex_blocker"
on_error: OnError = "ignore"
def __init__(self, pattern: str) -> None:
import re
self._pattern = re.compile(pattern, re.IGNORECASE)
def scan(self, prompt: str) -> ScanResult:
if self._pattern.search(prompt):
return ScanResult(
passed=False,
score=1.0,
rewritten_input=None,
rewritten_output=None,
blocked_response="Input contains a blocked pattern.",
rule_name=self.name,
metadata={"pattern": self._pattern.pattern},
)
return pass_result(self.name)
Output Scanner¶
from latent.guardrails.base import OutputScanner, OnError, ScanResult, pass_result
class MaxLengthScanner:
name: str = "max_length"
on_error: OnError = "ignore"
def __init__(self, max_chars: int = 10000) -> None:
self._max = max_chars
def scan(self, prompt: str, output: str) -> ScanResult:
if len(output) > self._max:
return ScanResult(
passed=False,
score=1.0,
rewritten_input=None,
rewritten_output=output[:self._max],
blocked_response=None,
rule_name=self.name,
metadata={"length": len(output)},
)
return pass_result(self.name)
Scanner Protocol Reference¶
Every scanner must expose:
| Attribute/Method | Type | Description |
|---|---|---|
name |
str |
Unique rule identifier |
on_error |
"ignore" / "raise" |
Error handling strategy |
scan(prompt) |
Input scanner | Returns ScanResult |
scan(prompt, output) |
Output scanner | Returns ScanResult |
ScanResult Fields¶
| Field | Type | Description |
|---|---|---|
passed |
bool |
True if the input/output is acceptable |
score |
float |
Violation confidence (0.0 = clean, 1.0 = certain violation) |
rewritten_input |
str / None |
Replacement prompt (pre-scanners only) |
rewritten_output |
str / None |
Replacement output (post-scanners only) |
blocked_response |
str / None |
Message shown to user when blocked |
rule_name |
str |
Name of the scanner that produced this result |
metadata |
dict |
Arbitrary scanner-specific data |
Convenience constructors
Use pass_result(name) for a quick passing result, or pass_result(name, passed=False, score=1.0) for a failure. Use error_result(name, exc) when a scanner errors (defaults to passing to avoid false blocks).
Events and Observability¶
GuardrailViolation¶
Yielded from stream() when a guardrail fires. Inspect it to decide how to handle the violation in your application.
| Field | Type | Description |
|---|---|---|
rule_name |
str |
Which rule fired |
timing |
"pre" / "post" |
When it fired |
outcome |
"active" / "passive" |
Whether it blocks |
score |
float |
Violation score |
message |
str |
User-facing message |
retraction |
bool |
True if already-streamed content should be replaced |
scrub_history |
bool |
True if the message should be removed from history |
Active pre-violation: stream never starts. Use message as the response. scrub_history=True -- remove the bad input from conversation history.
Active post-violation: content already streamed. retraction=True -- the frontend should replace visible content with message. scrub_history=True -- remove the response from history.
Passive violation: informational only. Log it, alert on it, but don't block.
Event Sinks¶
All scan results (pass and fail) are emitted to configurable sinks via latent.observability.
| Sink | Description |
|---|---|
StructlogSink |
Structured logging (default) |
WebhookSink |
POST events to an HTTP endpoint |
OpenTelemetrySink |
Emit spans to an OTel collector |
NoOpSink |
Discard all events (suppress default logging) |
from latent.observability import StructlogSink, WebhookSink, NoOpSink
# Custom sinks
wrapped = GuardrailMiddleware(
agent,
pre_scanners=[...],
sinks=[StructlogSink(), WebhookSink(url="https://hooks.example.com/guardrails")],
)
# Suppress all event logging
wrapped = GuardrailMiddleware(agent, pre_scanners=[...], sinks=[NoOpSink()])
Configuration¶
Guardrail settings can be loaded from TOML config files. The system searches in order:
[guardrails]section inlatent.toml/config/latent.toml- Standalone
guardrails.toml/config/guardrails.toml - Defaults
Note
Scanner and sink configuration via TOML is not yet supported. Configure scanners and sinks in code.
Integration Examples¶
Agent with Mixed Guardrails¶
Combining decorator-based rules with programmatic scanners:
from latent.guardrails import guardrail, GuardrailMiddleware
from latent.guardrails.scanners.builtin import LanguageScanner, InvisibleTextScanner
from latent.agents import LiteLLMAgent
class SupportAgent(LiteLLMAgent):
"""Agent with built-in guardrails via decorators."""
@guardrail(timing="pre", outcome="active", message="Please use a supported language.")
def language_check(self):
return LanguageScanner(allowed_languages=["en", "he"])
@guardrail(timing="pre", outcome="active")
def invisible_chars(self):
return InvisibleTextScanner()
@guardrail(timing="post", outcome="passive", every=50)
def length_monitor(self, prompt: str, output: str) -> bool:
return len(output) > 10000
agent = SupportAgent(model="gpt-4o", system_prompt="You are a helpful assistant.")
Consuming Violations in a Chat Loop¶
from latent.agents.events import TextDelta
from latent.guardrails.events import GuardrailViolation
async for event in agent.stream(messages):
if isinstance(event, GuardrailViolation):
if event.outcome == "active" and event.timing == "pre":
# Input was blocked -- show the message, don't add to history
print(f"[BLOCKED] {event.message}")
break
elif event.outcome == "active" and event.timing == "post":
# Output retracted -- replace what was shown
print(f"\n[RETRACTED] {event.message}")
else:
# Passive -- log for monitoring
log_violation(event)
elif isinstance(event, TextDelta):
print(event.text, end="", flush=True)
Evaluation Metrics¶
Measure guardrail accuracy against labeled data using the built-in metrics:
from latent.guardrails.metrics import guardrail_adherence_metric, per_scanner_breakdown_metric
# Overall accuracy: did the system block when it should have?
score = guardrail_adherence_metric(
{"expected_blocked": True},
{"blocked": True},
) # -> 1.0
# Per-rule accuracy
injection_metric = per_scanner_breakdown_metric("prompt_injection")
score = injection_metric(
{"expected_blocked": True},
{"violated_rules": ["prompt_injection", "toxicity"]},
) # -> 1.0