Skip to content

RAG Pipeline

Latent provides a modular Retrieval-Augmented Generation pipeline with pluggable backends, embedding providers, adaptive retrieval patterns, and config-driven composition.

Installation

pip install "latent[rag]"              # Core: ChromaDB, BM25, hybrid
pip install "latent[rag-voyage]"       # Voyage AI embeddings
pip install "latent[rag-bedrock]"      # AWS Bedrock embeddings
pip install "latent[rag-reranker]"     # CrossEncoder reranking
pip install "latent[optimizers]"       # Optuna for RAGResearchAgent Phase 1

Architecture

Documents
    |
    v
MarkdownChunker  -->  DocumentChunk[]
    |
    v
EmbeddingProvider  -->  vectors
    |
    v
+---------+----------+---------+
| Chroma  |  BM25    | Hybrid  |   <-- retrieval backends
+---------+----------+---------+
    |
    v
[optional] QueryRewriter / HyDE
[optional] CrossEncoder / LLM Reranker
[optional] CorrectiveRetriever (CRAG)
[optional] ConfidenceGate (L-RAG)
[optional] CachedRetriever
    |
    v
RetrievedChunk[]

The pipeline is layered:

  1. Chunking -- MarkdownChunker splits documents on headers with section path tracking.
  2. Embedding -- Pluggable providers (OpenAI, Voyage, Bedrock, LiteLLM, SentenceTransformers).
  3. Retrieval -- Vector (ChromaRetriever), keyword (BM25Retriever), or fused (HybridRetriever).
  4. Adaptive layers -- Query expansion, reranking, corrective retrieval, confidence gating, caching.

Data Models

All retrievers share two core models from latent.rag.models:

from latent.rag import DocumentChunk, RetrievedChunk

# For indexing
chunk = DocumentChunk(
    id=DocumentChunk.content_id("some text"),  # SHA-256 of content
    content="some text",
    source="docs/guide.md",
    section_path="Getting Started > Installation",
    metadata={"language": "en"},
)

# Returned from search
result = RetrievedChunk(
    id="abc123",
    content="some text",
    source="docs/guide.md",
    section_path="Getting Started > Installation",
    score=0.87,
    metadata={},
)

Retrieval Backends

ChromaRetriever

Persistent vector search backed by ChromaDB.

from latent.rag import ChromaRetriever

retriever = ChromaRetriever(
    collection_name="knowledge",
    persist_directory="./chroma_db",
    embedding_provider="openai",
    embedding_model="text-embedding-3-small",
    distance_fn="cosine",  # "cosine", "l2", or "ip"
)

# Index
retriever.add_texts(["Document one", "Document two"])
retriever.add_chunks([chunk1, chunk2])  # pre-built DocumentChunks

# Search
results = retriever.search("query", k=5)
results = retriever.search_with_threshold("query", k=5, threshold=0.7)
results = retriever.search_multi(["query v1", "query v2"], k=5)

Embeddings are batched in groups of 32 to stay within token limits. Content-based SHA-256 IDs enable automatic deduplication on upsert.

BM25Retriever

Keyword retrieval using BM25Okapi from rank-bm25.

from latent.rag import BM25Retriever

retriever = BM25Retriever(k1=1.5, b=0.75)
retriever.index(chunks)  # list[DocumentChunk]
results = retriever.search("exact phrase match", k=10)

Hebrew tokenization

Pass a language-aware tokenizer for morphologically rich languages:

from latent.rag.bm25 import _tokenize_hebrew

retriever = BM25Retriever(tokenizer=_tokenize_hebrew)

The Hebrew tokenizer strips prefix particles and uses stanza for morphological analysis when available, falling back to rule-based stripping.

HybridRetriever

Combines semantic and keyword search using Reciprocal Rank Fusion (RRF).

from latent.rag import ChromaRetriever, HybridRetriever

chroma = ChromaRetriever(
    embedding_provider="voyage",
    embedding_model="voyage-3",
)
hybrid = HybridRetriever(
    chroma=chroma,
    alpha=0.7,   # semantic weight (0.0 = pure BM25, 1.0 = pure vector)
    rrf_k=60,    # RRF smoothing constant
)

hybrid.add_texts(["doc one", "doc two"])  # indexes both backends
results = hybrid.search("my query", k=5)

The BM25 index is built lazily from the Chroma collection on first search. Multi-query search applies a 20% multiplicative boost per additional query hit, promoting chunks that match across different phrasings.

RAGAdapter

RAGAdapter is the high-level entry point that handles chunking, embedding, and backend selection from a flat config dict.

from latent.rag import RAGAdapter

rag = RAGAdapter(
    backend="hybrid",          # "chroma", "hybrid", or "bm25"
    embedding_provider="openai",
    embedding_model="text-embedding-3-small",
    collection_name="knowledge",
    persist_directory="./chroma_db",
    alpha=0.7,
    top_k=10,
    score_threshold=0.3,
    chunk_max_size=1000,
    chunk_overlap=100,
    cache=True,                # enable semantic caching
    cache_threshold=0.95,
)

# Index raw markdown documents (auto-chunked)
n_chunks = rag.index_documents(
    texts=["# Doc 1\nContent...", "# Doc 2\nContent..."],
    sources=["doc1.md", "doc2.md"],
)

# Search
results = rag.search("my question")
results = rag.search_multi(["query v1", "query v2"])
results = rag.search_with_threshold("query", threshold=0.5)

From parameters.yaml

rag = RAGAdapter.from_params({
    "backend": "hybrid",
    "embedding_provider": "voyage",
    "embedding_model": "voyage-3",
    "alpha": 0.7,
    "top_k": 10,
    "chunk_max_size": 512,
    "chunk_overlap": 100,
})

Embedding Providers

All providers implement the EmbeddingProvider protocol (embed_documents, embed_query).

Provider Class Default Model Extra
OpenAI OpenAIEmbeddings text-embedding-3-small [rag]
Voyage AI VoyageEmbeddings voyage-3 [rag-voyage]
AWS Bedrock BedrockEmbeddings amazon.titan-embed-text-v1 [rag-bedrock]
LiteLLM LiteLLMEmbeddings (required) [rag]
SentenceTransformers SentenceTransformerEmbeddings BAAI/bge-m3 local

Use the factory for string-based construction:

from latent.rag import create_embedding_provider

provider = create_embedding_provider("openai", model="text-embedding-3-large")

Adaptive Retrieval

Query Expansion

QueryRewriter generates alternative phrasings via LLM:

from latent.rag import QueryRewriter

rewriter = QueryRewriter(model="claude-haiku-4-5")
queries = rewriter.expand("What is RAG?")
# ["What is RAG?", "retrieval augmented generation explanation", ...]

HyDERewriter (Hypothetical Document Embeddings) generates a hypothetical answer and uses it as an additional search query:

from latent.rag import HyDERewriter

hyde = HyDERewriter(model="claude-haiku-4-5")
queries = hyde.expand("What is RAG?")
# ["What is RAG?", "RAG is a technique that combines..."]

Reranking

CrossEncoderReranker uses a cross-encoder model for precise relevance scoring:

from latent.rag import CrossEncoderReranker

reranker = CrossEncoderReranker(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")
reranked = reranker.rerank("query", results, top_k=5)

LLMReranker uses an LLM to score each (query, chunk) pair on a 0-10 scale:

from latent.rag import LLMReranker

reranker = LLMReranker(model="claude-haiku-4-5")
reranked = reranker.rerank("query", results, top_k=5)

Lost-in-the-Middle Reordering

Places the strongest chunks at the start and end of the context window:

from latent.rag import reorder_for_context

reordered = reorder_for_context(results)
# [0.9, 0.7, 0.5, 0.6, 0.8]  -- strongest at edges

Corrective Retrieval (CRAG)

Wraps a retriever with LLM-based relevance evaluation. Chunks below the threshold are discarded and retrieval is retried with a fallback or broader query:

from latent.rag import CorrectiveRetriever

crag = CorrectiveRetriever(
    retriever=hybrid,
    relevance_judge=judge,           # any object with .evaluate()
    relevance_threshold=0.5,
    fallback_retriever=bm25,         # optional fallback
    max_corrections=2,
)
results = crag.search("query", k=5)

Confidence Gating (L-RAG)

Skips retrieval entirely when the LLM is confident it can answer from parametric memory:

from latent.rag import ConfidenceGate

gate = ConfidenceGate(model="claude-haiku-4-5", threshold=0.8)
if gate.should_retrieve("What is 2+2?"):
    results = retriever.search("What is 2+2?")

Semantic Caching

CachedRetriever wraps any retriever with embedding-based semantic caching using LRU eviction:

from latent.rag import CachedRetriever

cached = CachedRetriever(
    retriever=hybrid,
    embedding_provider=provider,
    similarity_threshold=0.95,
    max_cache_size=1000,
)

PipelineBuilder

PipelineBuilder composes full pipelines from nested config dicts, handling adapter construction, reranking, CRAG, confidence gating, and index caching.

from latent.rag import PipelineBuilder

builder = PipelineBuilder()
pipeline = builder.build({
    "chunker": {"max_size": 512, "overlap": 100},
    "embeddings": {"provider": "voyage", "model": "voyage-3"},
    "backend": {"type": "hybrid", "alpha": 0.7},
    "retrieval": {"top_k": 10, "score_threshold": 0.0},
    "reranker": {"type": "cross_encoder"},
    "post_retrieval": {"type": "reorder"},
    "query_expansion": {"type": "hyde", "model": "claude-haiku-4-5"},
    "confidence_gate": {"model": "claude-haiku-4-5", "threshold": 0.8},
    "caching": {"threshold": 0.95},
})

pipeline.index_documents(texts, sources)
results = pipeline.search("query")

Index caching

PipelineBuilder caches indexed adapters by hash of index-time params (chunker, embeddings, backend type, alpha). Changing only query-time params (top_k, reranker, post-retrieval) reuses the existing index -- no re-embedding.

Component Registry

ComponentRegistry provides the full catalogue of available components and validates pipeline configs before construction:

from latent.rag import ComponentRegistry

# Inspect available components
components = ComponentRegistry.get_available_components()

# Validate a config (returns list of error strings, empty = valid)
errors = ComponentRegistry.validate(config)

Available component types:

Component Types Key Config
chunker markdown max_size (256-1024), overlap (0-200)
embeddings voyage, openai, bedrock, litellm provider, model
backend chroma, bm25, hybrid, raptor alpha (hybrid), tree_depth (raptor)
retrieval -- top_k (3-15), score_threshold (0-0.5)
reranker cross_encoder, llm model, top_n
post_retrieval crag, reorder judge_model (crag), threshold
query_expansion rewriter, hyde model
confidence_gate -- model, threshold
caching semantic threshold

Retrieval Metrics

Standalone evaluation metrics for measuring retrieval quality. All functions return a float in [0, 1].

from latent.rag import recall_at_k, precision_at_k, mrr, ndcg, context_precision

retrieved_ids = ["c1", "c3", "c5", "c2"]
relevant_ids = {"c1", "c2", "c4"}

recall_at_k(retrieved_ids, relevant_ids, k=5)       # fraction of relevant found
precision_at_k(retrieved_ids, relevant_ids, k=5)     # fraction of retrieved that are relevant
mrr(retrieved_ids, relevant_ids)                     # reciprocal rank of first hit
ndcg(retrieved_ids, relevant_ids, k=5)               # normalized DCG (binary relevance)
context_precision(retrieved_ids, relevant_ids)       # weighted precision by rank

RAGMetric Protocol

All metrics implement the RAGMetric protocol for use with eval pipelines:

from latent.rag import RecallAtKMetric, ContentRecallMetric, JudgeMetric

# ID-based metrics
recall_metric = RecallAtKMetric(k=5, relevant_ids_key="relevant_doc_ids")

# Content-overlap metrics (when chunk IDs don't match across configs)
content_recall = ContentRecallMetric(k=5, overlap_threshold=0.5)

# LLM judge metric
judge_metric = JudgeMetric(judge=my_judge, score_field="score")
Metric Class Match Strategy Use Case
RecallAtKMetric Chunk ID Standard recall with known IDs
PrecisionAtKMetric Chunk ID Precision at K
MRRMetric Chunk ID First-hit ranking quality
NDCGMetric Chunk ID Ranking quality with position weighting
ContextPrecisionMetric Chunk ID Weighted precision by rank
ContentRecallMetric Text overlap Recall when IDs differ across configs
ContentMRRMetric Text overlap MRR when IDs differ across configs
JudgeMetric LLM judge Semantic relevance scoring

Tuning and Optimization

rag_grid_search evaluates retrieval configs using recall@k against labeled chunk IDs -- no LLM calls needed.

from latent.rag.tune import rag_grid_search, TuneSpace

questions = [
    {"query": "What is X?", "relevant_chunk_ids": ["c1", "c2"]},
    {"query": "How does Y work?", "relevant_chunk_ids": ["c3"]},
]

results = rag_grid_search(
    chroma_path="data/stable/chroma_db",
    questions=questions,
    embedding_provider="voyage",
    embedding_model="voyage-3",
    space=TuneSpace(
        alphas=[0.3, 0.5, 0.7, 0.8],
        k_values=[10, 15, 20],
        rrf_k_values=[30, 40, 60],
        bm25_k1=[1.2, 1.5, 2.0],
        bm25_b=[0.5, 0.75, 1.0],
        backends=["hybrid", "chroma"],
        tokenizers=["default", "hebrew"],
    ),
    max_configs=100,
)

# Results sorted by recall@k descending
best = results[0]
print(f"Best config: {best.config}")
print(f"Recall@K: {best.recall_at_k:.3f}, MRR: {best.mrr:.3f}")

RAGResearchAgent

Two-phase autonomous optimizer that combines Bayesian search with LLM validation.

Phase 1 uses Optuna to explore the full component space (chunk sizes, backends, alphas, rerankers, query expansion, post-retrieval). Documents are pre-indexed once per chunk size variant; query-time params are evaluated instantly with zero re-embedding cost.

Phase 2 dispatches an LLM agent to run full end-to-end evaluation (agent inference + judge) on the top configs from Phase 1.

from latent.rag import RAGResearchAgent, RecallAtKMetric

agent = RAGResearchAgent(
    documents=["# Doc 1\n...", "# Doc 2\n..."],
    eval_data=[
        {"query": "What is X?", "relevant_doc_ids": ["c1", "c2"]},
    ],
    metrics=[RecallAtKMetric(k=5)],
    max_trials=30,               # Phase 1 Optuna trials
    max_phase2_trials=3,         # Phase 2 LLM validations
    embedding_provider="voyage",
    embedding_model="voyage-3",
    chunk_sizes=[512, 1000],
    chunk_overlaps=[50, 100],
    backends=["hybrid", "chroma", "bm25"],
    alphas=[0.3, 0.5, 0.7],
    extra_tools=[run_full_eval],  # your eval tool for Phase 2
)

result = agent.run()
print(f"Best score: {result.best_score:.4f}")
print(f"Best config: {result.best_pipeline_config}")
print(f"Found in: {result.phase}")

RAPTOR Integration

RAPTOR (Recursive Abstractive Processing for Tree-Organized Retrieval) builds a hierarchical summary tree over documents for multi-level retrieval.

See RAPTOR for full documentation.

from latent.rag.raptor import RaptorAdapter

raptor = RaptorAdapter(
    embedding_model="text-embedding-3-small",
    summarization_model="gpt-4o-mini",
    qa_model="gpt-4o-mini",
    tb_num_layers=3,
    tr_top_k=5,
)

raptor.build_tree("Your document corpus here...")
context = raptor.retrieve("What is the main theme?")
answer = raptor.answer("What is the main theme?")

raptor.save_tree("tree.pkl")
raptor.load_tree("tree.pkl")

RAPTOR is also available as a backend type in PipelineBuilder:

pipeline = builder.build({
    "backend": {"type": "raptor", "tree_depth": 3, "summary_model": "gpt-4o-mini"},
})

End-to-End Example

Build a hybrid RAG pipeline with reranking, index a document corpus, search with quality metrics, and tune hyperparameters.

from latent.rag import (
    PipelineBuilder,
    RAGResearchAgent,
    RecallAtKMetric,
    MRRMetric,
)

# 1. Define pipeline config
config = {
    "chunker": {"max_size": 512, "overlap": 100},
    "embeddings": {"provider": "voyage", "model": "voyage-3"},
    "backend": {"type": "hybrid", "alpha": 0.7},
    "retrieval": {"top_k": 10},
    "reranker": {"type": "cross_encoder"},
    "post_retrieval": {"type": "reorder"},
}

# 2. Build and index
builder = PipelineBuilder()
pipeline = builder.build(config)

documents = [open(f).read() for f in doc_files]
sources = [str(f) for f in doc_files]
n_chunks = pipeline.index_documents(documents, sources)
print(f"Indexed {n_chunks} chunks")

# 3. Search
results = pipeline.search("How do I configure authentication?")
for chunk in results:
    print(f"  [{chunk.score:.3f}] {chunk.source} > {chunk.section_path}")
    print(f"  {chunk.content[:100]}...")

# 4. Evaluate retrieval quality
from latent.rag import recall_at_k, mrr

eval_questions = [
    {"query": "How to configure auth?", "relevant_chunk_ids": ["a1", "a2"]},
    {"query": "What are the defaults?", "relevant_chunk_ids": ["b1"]},
]

for q in eval_questions:
    chunks = pipeline.search(q["query"])
    ids = [c.id for c in chunks]
    relevant = set(q["relevant_chunk_ids"])
    print(f"Recall@10: {recall_at_k(ids, relevant, 10):.2f}")
    print(f"MRR: {mrr(ids, relevant):.2f}")

# 5. Auto-tune with RAGResearchAgent
researcher = RAGResearchAgent(
    documents=documents,
    eval_data=eval_questions,
    metrics=[RecallAtKMetric(k=10), MRRMetric()],
    max_trials=30,
    embedding_provider="voyage",
    embedding_model="voyage-3",
)
result = researcher.run()
print(f"Best config: {result.best_pipeline_config}")
print(f"Best score: {result.best_score:.4f}")

# 6. Rebuild with optimized config
optimized_pipeline = builder.build(result.best_pipeline_config)
optimized_pipeline.index_documents(documents, sources)

# Cleanup
builder.cleanup()

Protocols

Implement these protocols to create custom components:

from latent.rag import Retriever, EmbeddingProvider, QueryExpander

class MyRetriever:
    def search(self, query: str, k: int = 5) -> list[RetrievedChunk]: ...
    def search_multi(self, queries: list[str], k: int = 5) -> list[RetrievedChunk]: ...
    def search_with_threshold(self, query: str, k: int = 5, threshold: float = 0.0) -> list[RetrievedChunk]: ...
    def add_texts(self, texts: list[str], metadatas: list[dict] | None = None) -> None: ...
    def delete_collection(self) -> None: ...

class MyEmbeddingProvider:
    def embed_documents(self, texts: list[str]) -> list[list[float]]: ...
    def embed_query(self, text: str) -> list[float]: ...

class MyQueryExpander:
    def expand(self, query: str) -> list[str]: ...
    def expand_tokens(self, query: str) -> list[str]: ...

All protocols use @runtime_checkable for isinstance checks.