RAG Pipeline¶
Latent provides a modular Retrieval-Augmented Generation pipeline with pluggable backends, embedding providers, adaptive retrieval patterns, and config-driven composition.
Installation¶
pip install "latent[rag]" # Core: ChromaDB, BM25, hybrid
pip install "latent[rag-voyage]" # Voyage AI embeddings
pip install "latent[rag-bedrock]" # AWS Bedrock embeddings
pip install "latent[rag-reranker]" # CrossEncoder reranking
pip install "latent[optimizers]" # Optuna for RAGResearchAgent Phase 1
Architecture¶
Documents
|
v
MarkdownChunker --> DocumentChunk[]
|
v
EmbeddingProvider --> vectors
|
v
+---------+----------+---------+
| Chroma | BM25 | Hybrid | <-- retrieval backends
+---------+----------+---------+
|
v
[optional] QueryRewriter / HyDE
[optional] CrossEncoder / LLM Reranker
[optional] CorrectiveRetriever (CRAG)
[optional] ConfidenceGate (L-RAG)
[optional] CachedRetriever
|
v
RetrievedChunk[]
The pipeline is layered:
- Chunking --
MarkdownChunkersplits documents on headers with section path tracking. - Embedding -- Pluggable providers (OpenAI, Voyage, Bedrock, LiteLLM, SentenceTransformers).
- Retrieval -- Vector (
ChromaRetriever), keyword (BM25Retriever), or fused (HybridRetriever). - Adaptive layers -- Query expansion, reranking, corrective retrieval, confidence gating, caching.
Data Models¶
All retrievers share two core models from latent.rag.models:
from latent.rag import DocumentChunk, RetrievedChunk
# For indexing
chunk = DocumentChunk(
id=DocumentChunk.content_id("some text"), # SHA-256 of content
content="some text",
source="docs/guide.md",
section_path="Getting Started > Installation",
metadata={"language": "en"},
)
# Returned from search
result = RetrievedChunk(
id="abc123",
content="some text",
source="docs/guide.md",
section_path="Getting Started > Installation",
score=0.87,
metadata={},
)
Retrieval Backends¶
ChromaRetriever¶
Persistent vector search backed by ChromaDB.
from latent.rag import ChromaRetriever
retriever = ChromaRetriever(
collection_name="knowledge",
persist_directory="./chroma_db",
embedding_provider="openai",
embedding_model="text-embedding-3-small",
distance_fn="cosine", # "cosine", "l2", or "ip"
)
# Index
retriever.add_texts(["Document one", "Document two"])
retriever.add_chunks([chunk1, chunk2]) # pre-built DocumentChunks
# Search
results = retriever.search("query", k=5)
results = retriever.search_with_threshold("query", k=5, threshold=0.7)
results = retriever.search_multi(["query v1", "query v2"], k=5)
Embeddings are batched in groups of 32 to stay within token limits. Content-based SHA-256 IDs enable automatic deduplication on upsert.
BM25Retriever¶
Keyword retrieval using BM25Okapi from rank-bm25.
from latent.rag import BM25Retriever
retriever = BM25Retriever(k1=1.5, b=0.75)
retriever.index(chunks) # list[DocumentChunk]
results = retriever.search("exact phrase match", k=10)
Hebrew tokenization
Pass a language-aware tokenizer for morphologically rich languages:
The Hebrew tokenizer strips prefix particles and uses stanza for morphological
analysis when available, falling back to rule-based stripping.
HybridRetriever¶
Combines semantic and keyword search using Reciprocal Rank Fusion (RRF).
from latent.rag import ChromaRetriever, HybridRetriever
chroma = ChromaRetriever(
embedding_provider="voyage",
embedding_model="voyage-3",
)
hybrid = HybridRetriever(
chroma=chroma,
alpha=0.7, # semantic weight (0.0 = pure BM25, 1.0 = pure vector)
rrf_k=60, # RRF smoothing constant
)
hybrid.add_texts(["doc one", "doc two"]) # indexes both backends
results = hybrid.search("my query", k=5)
The BM25 index is built lazily from the Chroma collection on first search. Multi-query search applies a 20% multiplicative boost per additional query hit, promoting chunks that match across different phrasings.
RAGAdapter¶
RAGAdapter is the high-level entry point that handles chunking, embedding, and backend
selection from a flat config dict.
from latent.rag import RAGAdapter
rag = RAGAdapter(
backend="hybrid", # "chroma", "hybrid", or "bm25"
embedding_provider="openai",
embedding_model="text-embedding-3-small",
collection_name="knowledge",
persist_directory="./chroma_db",
alpha=0.7,
top_k=10,
score_threshold=0.3,
chunk_max_size=1000,
chunk_overlap=100,
cache=True, # enable semantic caching
cache_threshold=0.95,
)
# Index raw markdown documents (auto-chunked)
n_chunks = rag.index_documents(
texts=["# Doc 1\nContent...", "# Doc 2\nContent..."],
sources=["doc1.md", "doc2.md"],
)
# Search
results = rag.search("my question")
results = rag.search_multi(["query v1", "query v2"])
results = rag.search_with_threshold("query", threshold=0.5)
From parameters.yaml¶
rag = RAGAdapter.from_params({
"backend": "hybrid",
"embedding_provider": "voyage",
"embedding_model": "voyage-3",
"alpha": 0.7,
"top_k": 10,
"chunk_max_size": 512,
"chunk_overlap": 100,
})
Embedding Providers¶
All providers implement the EmbeddingProvider protocol (embed_documents, embed_query).
| Provider | Class | Default Model | Extra |
|---|---|---|---|
| OpenAI | OpenAIEmbeddings |
text-embedding-3-small |
[rag] |
| Voyage AI | VoyageEmbeddings |
voyage-3 |
[rag-voyage] |
| AWS Bedrock | BedrockEmbeddings |
amazon.titan-embed-text-v1 |
[rag-bedrock] |
| LiteLLM | LiteLLMEmbeddings |
(required) | [rag] |
| SentenceTransformers | SentenceTransformerEmbeddings |
BAAI/bge-m3 |
local |
Use the factory for string-based construction:
from latent.rag import create_embedding_provider
provider = create_embedding_provider("openai", model="text-embedding-3-large")
Adaptive Retrieval¶
Query Expansion¶
QueryRewriter generates alternative phrasings via LLM:
from latent.rag import QueryRewriter
rewriter = QueryRewriter(model="claude-haiku-4-5")
queries = rewriter.expand("What is RAG?")
# ["What is RAG?", "retrieval augmented generation explanation", ...]
HyDERewriter (Hypothetical Document Embeddings) generates a hypothetical answer and uses it as an additional search query:
from latent.rag import HyDERewriter
hyde = HyDERewriter(model="claude-haiku-4-5")
queries = hyde.expand("What is RAG?")
# ["What is RAG?", "RAG is a technique that combines..."]
Reranking¶
CrossEncoderReranker uses a cross-encoder model for precise relevance scoring:
from latent.rag import CrossEncoderReranker
reranker = CrossEncoderReranker(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")
reranked = reranker.rerank("query", results, top_k=5)
LLMReranker uses an LLM to score each (query, chunk) pair on a 0-10 scale:
from latent.rag import LLMReranker
reranker = LLMReranker(model="claude-haiku-4-5")
reranked = reranker.rerank("query", results, top_k=5)
Lost-in-the-Middle Reordering¶
Places the strongest chunks at the start and end of the context window:
from latent.rag import reorder_for_context
reordered = reorder_for_context(results)
# [0.9, 0.7, 0.5, 0.6, 0.8] -- strongest at edges
Corrective Retrieval (CRAG)¶
Wraps a retriever with LLM-based relevance evaluation. Chunks below the threshold are discarded and retrieval is retried with a fallback or broader query:
from latent.rag import CorrectiveRetriever
crag = CorrectiveRetriever(
retriever=hybrid,
relevance_judge=judge, # any object with .evaluate()
relevance_threshold=0.5,
fallback_retriever=bm25, # optional fallback
max_corrections=2,
)
results = crag.search("query", k=5)
Confidence Gating (L-RAG)¶
Skips retrieval entirely when the LLM is confident it can answer from parametric memory:
from latent.rag import ConfidenceGate
gate = ConfidenceGate(model="claude-haiku-4-5", threshold=0.8)
if gate.should_retrieve("What is 2+2?"):
results = retriever.search("What is 2+2?")
Semantic Caching¶
CachedRetriever wraps any retriever with embedding-based semantic caching using
LRU eviction:
from latent.rag import CachedRetriever
cached = CachedRetriever(
retriever=hybrid,
embedding_provider=provider,
similarity_threshold=0.95,
max_cache_size=1000,
)
PipelineBuilder¶
PipelineBuilder composes full pipelines from nested config dicts, handling adapter
construction, reranking, CRAG, confidence gating, and index caching.
from latent.rag import PipelineBuilder
builder = PipelineBuilder()
pipeline = builder.build({
"chunker": {"max_size": 512, "overlap": 100},
"embeddings": {"provider": "voyage", "model": "voyage-3"},
"backend": {"type": "hybrid", "alpha": 0.7},
"retrieval": {"top_k": 10, "score_threshold": 0.0},
"reranker": {"type": "cross_encoder"},
"post_retrieval": {"type": "reorder"},
"query_expansion": {"type": "hyde", "model": "claude-haiku-4-5"},
"confidence_gate": {"model": "claude-haiku-4-5", "threshold": 0.8},
"caching": {"threshold": 0.95},
})
pipeline.index_documents(texts, sources)
results = pipeline.search("query")
Index caching
PipelineBuilder caches indexed adapters by hash of index-time params (chunker,
embeddings, backend type, alpha). Changing only query-time params (top_k, reranker,
post-retrieval) reuses the existing index -- no re-embedding.
Component Registry¶
ComponentRegistry provides the full catalogue of available components and validates
pipeline configs before construction:
from latent.rag import ComponentRegistry
# Inspect available components
components = ComponentRegistry.get_available_components()
# Validate a config (returns list of error strings, empty = valid)
errors = ComponentRegistry.validate(config)
Available component types:
| Component | Types | Key Config |
|---|---|---|
chunker |
markdown |
max_size (256-1024), overlap (0-200) |
embeddings |
voyage, openai, bedrock, litellm |
provider, model |
backend |
chroma, bm25, hybrid, raptor |
alpha (hybrid), tree_depth (raptor) |
retrieval |
-- | top_k (3-15), score_threshold (0-0.5) |
reranker |
cross_encoder, llm |
model, top_n |
post_retrieval |
crag, reorder |
judge_model (crag), threshold |
query_expansion |
rewriter, hyde |
model |
confidence_gate |
-- | model, threshold |
caching |
semantic |
threshold |
Retrieval Metrics¶
Standalone evaluation metrics for measuring retrieval quality. All functions return a float in [0, 1].
from latent.rag import recall_at_k, precision_at_k, mrr, ndcg, context_precision
retrieved_ids = ["c1", "c3", "c5", "c2"]
relevant_ids = {"c1", "c2", "c4"}
recall_at_k(retrieved_ids, relevant_ids, k=5) # fraction of relevant found
precision_at_k(retrieved_ids, relevant_ids, k=5) # fraction of retrieved that are relevant
mrr(retrieved_ids, relevant_ids) # reciprocal rank of first hit
ndcg(retrieved_ids, relevant_ids, k=5) # normalized DCG (binary relevance)
context_precision(retrieved_ids, relevant_ids) # weighted precision by rank
RAGMetric Protocol¶
All metrics implement the RAGMetric protocol for use with eval pipelines:
from latent.rag import RecallAtKMetric, ContentRecallMetric, JudgeMetric
# ID-based metrics
recall_metric = RecallAtKMetric(k=5, relevant_ids_key="relevant_doc_ids")
# Content-overlap metrics (when chunk IDs don't match across configs)
content_recall = ContentRecallMetric(k=5, overlap_threshold=0.5)
# LLM judge metric
judge_metric = JudgeMetric(judge=my_judge, score_field="score")
| Metric Class | Match Strategy | Use Case |
|---|---|---|
RecallAtKMetric |
Chunk ID | Standard recall with known IDs |
PrecisionAtKMetric |
Chunk ID | Precision at K |
MRRMetric |
Chunk ID | First-hit ranking quality |
NDCGMetric |
Chunk ID | Ranking quality with position weighting |
ContextPrecisionMetric |
Chunk ID | Weighted precision by rank |
ContentRecallMetric |
Text overlap | Recall when IDs differ across configs |
ContentMRRMetric |
Text overlap | MRR when IDs differ across configs |
JudgeMetric |
LLM judge | Semantic relevance scoring |
Tuning and Optimization¶
Grid Search¶
rag_grid_search evaluates retrieval configs using recall@k against labeled chunk IDs --
no LLM calls needed.
from latent.rag.tune import rag_grid_search, TuneSpace
questions = [
{"query": "What is X?", "relevant_chunk_ids": ["c1", "c2"]},
{"query": "How does Y work?", "relevant_chunk_ids": ["c3"]},
]
results = rag_grid_search(
chroma_path="data/stable/chroma_db",
questions=questions,
embedding_provider="voyage",
embedding_model="voyage-3",
space=TuneSpace(
alphas=[0.3, 0.5, 0.7, 0.8],
k_values=[10, 15, 20],
rrf_k_values=[30, 40, 60],
bm25_k1=[1.2, 1.5, 2.0],
bm25_b=[0.5, 0.75, 1.0],
backends=["hybrid", "chroma"],
tokenizers=["default", "hebrew"],
),
max_configs=100,
)
# Results sorted by recall@k descending
best = results[0]
print(f"Best config: {best.config}")
print(f"Recall@K: {best.recall_at_k:.3f}, MRR: {best.mrr:.3f}")
RAGResearchAgent¶
Two-phase autonomous optimizer that combines Bayesian search with LLM validation.
Phase 1 uses Optuna to explore the full component space (chunk sizes, backends, alphas, rerankers, query expansion, post-retrieval). Documents are pre-indexed once per chunk size variant; query-time params are evaluated instantly with zero re-embedding cost.
Phase 2 dispatches an LLM agent to run full end-to-end evaluation (agent inference + judge) on the top configs from Phase 1.
from latent.rag import RAGResearchAgent, RecallAtKMetric
agent = RAGResearchAgent(
documents=["# Doc 1\n...", "# Doc 2\n..."],
eval_data=[
{"query": "What is X?", "relevant_doc_ids": ["c1", "c2"]},
],
metrics=[RecallAtKMetric(k=5)],
max_trials=30, # Phase 1 Optuna trials
max_phase2_trials=3, # Phase 2 LLM validations
embedding_provider="voyage",
embedding_model="voyage-3",
chunk_sizes=[512, 1000],
chunk_overlaps=[50, 100],
backends=["hybrid", "chroma", "bm25"],
alphas=[0.3, 0.5, 0.7],
extra_tools=[run_full_eval], # your eval tool for Phase 2
)
result = agent.run()
print(f"Best score: {result.best_score:.4f}")
print(f"Best config: {result.best_pipeline_config}")
print(f"Found in: {result.phase}")
RAPTOR Integration¶
RAPTOR (Recursive Abstractive Processing for Tree-Organized Retrieval) builds a hierarchical summary tree over documents for multi-level retrieval.
See RAPTOR for full documentation.
from latent.rag.raptor import RaptorAdapter
raptor = RaptorAdapter(
embedding_model="text-embedding-3-small",
summarization_model="gpt-4o-mini",
qa_model="gpt-4o-mini",
tb_num_layers=3,
tr_top_k=5,
)
raptor.build_tree("Your document corpus here...")
context = raptor.retrieve("What is the main theme?")
answer = raptor.answer("What is the main theme?")
raptor.save_tree("tree.pkl")
raptor.load_tree("tree.pkl")
RAPTOR is also available as a backend type in PipelineBuilder:
pipeline = builder.build({
"backend": {"type": "raptor", "tree_depth": 3, "summary_model": "gpt-4o-mini"},
})
End-to-End Example¶
Build a hybrid RAG pipeline with reranking, index a document corpus, search with quality metrics, and tune hyperparameters.
from latent.rag import (
PipelineBuilder,
RAGResearchAgent,
RecallAtKMetric,
MRRMetric,
)
# 1. Define pipeline config
config = {
"chunker": {"max_size": 512, "overlap": 100},
"embeddings": {"provider": "voyage", "model": "voyage-3"},
"backend": {"type": "hybrid", "alpha": 0.7},
"retrieval": {"top_k": 10},
"reranker": {"type": "cross_encoder"},
"post_retrieval": {"type": "reorder"},
}
# 2. Build and index
builder = PipelineBuilder()
pipeline = builder.build(config)
documents = [open(f).read() for f in doc_files]
sources = [str(f) for f in doc_files]
n_chunks = pipeline.index_documents(documents, sources)
print(f"Indexed {n_chunks} chunks")
# 3. Search
results = pipeline.search("How do I configure authentication?")
for chunk in results:
print(f" [{chunk.score:.3f}] {chunk.source} > {chunk.section_path}")
print(f" {chunk.content[:100]}...")
# 4. Evaluate retrieval quality
from latent.rag import recall_at_k, mrr
eval_questions = [
{"query": "How to configure auth?", "relevant_chunk_ids": ["a1", "a2"]},
{"query": "What are the defaults?", "relevant_chunk_ids": ["b1"]},
]
for q in eval_questions:
chunks = pipeline.search(q["query"])
ids = [c.id for c in chunks]
relevant = set(q["relevant_chunk_ids"])
print(f"Recall@10: {recall_at_k(ids, relevant, 10):.2f}")
print(f"MRR: {mrr(ids, relevant):.2f}")
# 5. Auto-tune with RAGResearchAgent
researcher = RAGResearchAgent(
documents=documents,
eval_data=eval_questions,
metrics=[RecallAtKMetric(k=10), MRRMetric()],
max_trials=30,
embedding_provider="voyage",
embedding_model="voyage-3",
)
result = researcher.run()
print(f"Best config: {result.best_pipeline_config}")
print(f"Best score: {result.best_score:.4f}")
# 6. Rebuild with optimized config
optimized_pipeline = builder.build(result.best_pipeline_config)
optimized_pipeline.index_documents(documents, sources)
# Cleanup
builder.cleanup()
Protocols¶
Implement these protocols to create custom components:
from latent.rag import Retriever, EmbeddingProvider, QueryExpander
class MyRetriever:
def search(self, query: str, k: int = 5) -> list[RetrievedChunk]: ...
def search_multi(self, queries: list[str], k: int = 5) -> list[RetrievedChunk]: ...
def search_with_threshold(self, query: str, k: int = 5, threshold: float = 0.0) -> list[RetrievedChunk]: ...
def add_texts(self, texts: list[str], metadatas: list[dict] | None = None) -> None: ...
def delete_collection(self) -> None: ...
class MyEmbeddingProvider:
def embed_documents(self, texts: list[str]) -> list[list[float]]: ...
def embed_query(self, text: str) -> list[float]: ...
class MyQueryExpander:
def expand(self, query: str) -> list[str]: ...
def expand_tokens(self, query: str) -> list[str]: ...
All protocols use @runtime_checkable for isinstance checks.