Introduction
Retrieval-Augmented Generation (RAG) has evolved significantly from its early implementations. In 2025, RAG 2.0 represents a new generation of architectures that address the limitations of basic RAG systems. This guide covers advanced patterns for building production-ready RAG systems that deliver accurate, contextual, and reliable responses.
What Is RAG 2.0?
The Evolution from Basic RAG
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ RAG Evolution โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ RAG 1.0 RAG 2.0 โ
โ โโโโโโโโโ โโโโโโโโโ โ
โ โข Simple retrieval โข Hybrid search โ
โ โข Single vector DB โข Multi-vector storage โ
โ โข Chunk + query โข Query transformation โ
โ โข Basic chunking โข Intelligent chunking โ
โ โข No reranking โข Cross-encoder reranking โ
โ โข Flat context โข Hierarchical context โ
โ โข Single modality โข Multi-modal support โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Components of RAG 2.0
# RAG 2.0 Architecture
rag_architecture:
components:
- name: "Data Processing"
elements: ["Document loaders", "Text splitters", "Metadata extractors"]
- name: "Storage Layer"
elements: ["Vector DB", "Document store", "Graph DB", "Cache"]
- name: "Retrieval Engine"
elements: ["Hybrid search", "Query transformation", "Reranking"]
- name: "Generation"
elements: ["Context assembly", "Prompt engineering", "LLM calls"]
- name: "Evaluation"
elements: ["Relevance scoring", "Hallucination detection", "Metrics"]
Advanced Retrieval Patterns
1. Hybrid Search
Combine keyword and semantic search for better results:
# Hybrid search implementation
from pinecone import Pinecone
import numpy as np
class HybridSearch:
def __init__(self, vector_db, keyword_index):
self.vector_db = vector_db
self.keyword_index = keyword_index
def search(self, query, top_k=10, alpha=0.5):
"""
Hybrid search combining semantic and keyword matching
alpha: weight for semantic (1-alpha) for keyword
"""
# Semantic search
semantic_results = self.vector_db.query(
vector=self.embed_query(query),
top_k=top_k * 2,
include_scores=True
)
# Keyword search (BM25)
keyword_results = self.keyword_index.search(
query=query,
top_k=top_k * 2
)
# Normalize and combine scores
combined = self._fuse_results(
semantic_results,
keyword_results,
alpha=alpha,
top_k=top_k
)
return combined
def _fuse_results(self, semantic, keyword, alpha, top_k):
"""RRF-based result fusion"""
# Reciprocal Rank Fusion
rrf_scores = {}
for rank, item in enumerate(semantic['matches']):
doc_id = item['id']
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + \
(1.0 / (rank + 60))
for rank, item in enumerate(keyword):
doc_id = item['id']
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + \
(1.0 / (rank + 60)) * (1 - alpha)
# Sort by combined score
sorted_results = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
return sorted_results
2. Query Transformation
Transform queries to improve retrieval:
# Query transformation pipeline
class QueryTransformer:
def __init__(self, llm):
self.llm = llm
def expand_query(self, query):
"""Expand query with synonyms and related terms"""
prompt = f"""Expand this query with related terms:
Query: {query}
Return only the expanded query:"""
expanded = self.llm.generate(prompt)
return f"{query} {expanded}"
def decompose_query(self, query):
"""Break complex query into sub-queries"""
prompt = f"""Decompose into simpler sub-questions:
Query: {query}
Return as JSON array of sub-questions:"""
sub_questions = json.loads(self.llm.generate(prompt))
return sub_questions
def generate_hypothetical_doc(self, query):
"""Generate hypothetical document for retrieval"""
prompt = f"""Write a brief document that would answer this query:
Query: {query}
Write 2-3 sentences:"""
hypothetical = self.llm.generate(prompt)
return hypothetical
3. Parent Document Retrieval
Retrieve larger context while maintaining relevance:
# Parent document retrieval
class ParentDocumentRetriever:
def __init__(self, vector_store, document_store):
self.vector_store = vector_store
self.document_store = document_store
def retrieve(self, query, child_top_k=20, parent_top_k=5):
"""
Two-stage retrieval:
1. Find relevant chunks (children)
2. Retrieve full documents (parents)
"""
# Stage 1: Get relevant chunks
child_results = self.vector_store.similarity_search(
query=query,
k=child_top_k
)
# Get parent document IDs
parent_ids = list(set([
r.metadata['parent_doc_id']
for r in child_results
]))
# Stage 2: Get full parent documents
parent_docs = self.document_store.get_by_ids(parent_ids)
# Re-rank by chunk relevance
scored_parents = self._score_parents(
parent_docs,
child_results
)
return scored_parents[:parent_top_k]
def _score_parents(self, parents, children):
"""Score parents based on child relevance"""
child_scores = {}
for child in children:
pid = child.metadata['parent_doc_id']
child_scores[pid] = child_scores.get(pid, 0) + child.score
for parent in parents:
parent.relevance_score = child_scores.get(parent.id, 0)
return sorted(parents, key=lambda x: x.relevance_score, reverse=True)
Storage and Indexing Strategies
Multi-Vector Storage
# Store multiple vectors per document
class MultiVectorIndex:
def __init__(self, chroma_client):
self.client = chroma_client
def index_document(self, doc_id, text, metadata):
"""Create multiple vector representations"""
# 1. Full document embedding
full_embedding = self.embed(text)
# 2. Summary embedding (if available)
summary = self.summarize(text)
summary_embedding = self.embed(summary)
# 3. Key phrases embeddings
key_phrases = self.extract_key_phrases(text)
phrase_embeddings = [self.embed(phrase) for phrase in key_phrases]
# Store all in separate collections
self.collections['full'].add(
ids=[doc_id],
embeddings=[full_embedding],
metadatas=[metadata]
)
self.collections['summary'].add(
ids=[doc_id],
embeddings=[summary_embedding],
metadatas=[metadata]
)
self.collections['phrases'].add(
ids=[f"{doc_id}_phrase_{i}" for i in range(len(phrase_embeddings))],
embeddings=phrase_embeddings,
metadatas=[{**metadata, 'phrase': p} for p, i in zip(key_phrases, range(len(phrase_embeddings)))]
)
Intelligent Chunking
# Semantic chunking
class SemanticChunker:
def __init__(self, embedding_model, threshold=0.5):
self.embedding_model = embedding_model
self.threshold = threshold
def chunk(self, text):
"""Split text at semantic boundaries"""
# Split into sentences
sentences = self._split_sentences(text)
# Create embeddings for each sentence
embeddings = [self.embedding_model.embed(s) for s in sentences]
# Find semantic boundaries
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
# Calculate similarity to previous sentence
similarity = self._cosine_similarity(
embeddings[i-1],
embeddings[i]
)
if similarity < self.threshold:
# New chunk
chunks.append(' '.join(current_chunk))
current_chunk = [sentences[i]]
else:
current_chunk.append(sentences[i])
# Add final chunk
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def _cosine_similarity(self, a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Reranking Strategies
Cross-Encoder Reranking
# Cross-encoder reranking
from sentence_transformers import CrossEncoder
class CrossEncoderReranker:
def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
self.model = CrossEncoder(model_name)
def rerank(self, query, documents, top_k=5):
"""
Re-score all documents with cross-encoder
More accurate than bi-encoder but slower
"""
# Create pairs for scoring
pairs = [(query, doc) for doc in documents]
# Get cross-encoder scores
scores = self.model.predict(pairs)
# Sort by score
ranked = sorted(
zip(documents, scores),
key=lambda x: x[1],
reverse=True
)
return ranked[:top_k]
Learning to Rank
# LTR with XGBoost
class LearningToRank:
def __init__(self):
self.model = xgboost.XGBRanker()
def train(self, features, relevance_labels, query_ids):
"""
Train a ranking model
features: [N, M] feature matrix
relevance_labels: [N] relevance scores
query_ids: [N] query identifiers
"""
# Group by query for XGBoost ranking
group = self._get_group_sizes(query_ids)
self.model.fit(
features,
relevance_labels,
group=group
)
def predict(self, features):
return self.model.predict(features)
def _get_group_sizes(self, query_ids):
unique, counts = np.unique(query_ids, return_counts=True)
return counts.tolist()
Context Assembly
Hierarchical Context
# Assemble hierarchical context
class HierarchicalContextAssembler:
def __init__(self, token_limit=4000):
self.token_limit = token_limit
def assemble(self, retrieved_docs, query):
"""
Assemble context with different levels of detail
"""
context_parts = []
tokens_used = 0
# 1. Add top 1-2 docs with full detail
for doc in retrieved_docs[:2]:
if tokens_used + doc.tokens < self.token_limit * 0.6:
context_parts.append(f"## Document: {doc.title}\n{doc.content}")
tokens_used += doc.tokens
# 2. Add summaries for next 3-5 docs
for doc in retrieved_docs[2:7]:
if tokens_used + 100 < self.token_limit * 0.9:
context_parts.append(f"## Summary: {doc.title}\n{doc.summary}")
tokens_used += 100
# 3. Add metadata for remaining
for doc in retrieved_docs[7:]:
if tokens_used + 30 < self.token_limit:
context_parts.append(f"- {doc.title}: {doc.metadata.get('description', '')}")
tokens_used += 30
return '\n\n'.join(context_parts)
Dynamic Context Window
# Sliding window context
class SlidingWindowContext:
def __init__(self, window_size=2000, overlap=200):
self.window_size = window_size
self.overlap = overlap
def build_context(self, query, retrieved_docs):
"""Build context with sliding windows for long documents"""
contexts = []
for doc in retrieved_docs:
if doc.tokens <= self.window_size:
contexts.append(doc.content)
else:
# Create overlapping windows
windows = self._create_windows(doc.content)
# Score windows by query relevance
scored_windows = self._score_windows(windows, query)
# Take top windows that fit in limit
contexts.extend(self._select_windows(scored_windows))
return self._truncate('\n\n'.join(contexts))
def _create_windows(self, text):
tokens = text.split()
windows = []
for i in range(0, len(tokens), self.window_size - self.overlap):
window = ' '.join(tokens[i:i + self.window_size])
windows.append(window)
return windows
Evaluation and Optimization
RAG Evaluation Metrics
# RAG metrics
class RAGEvaluator:
def __init__(self, llm):
self.llm = llm
def evaluate(self, question, answer, retrieved_docs, ground_truth=None):
"""
Comprehensive RAG evaluation
"""
metrics = {}
# 1. Context Precision
metrics['context_precision'] = self._context_precision(
retrieved_docs,
question
)
# 2. Context Recall (if ground truth available)
if ground_truth:
metrics['context_recall'] = self._context_recall(
retrieved_docs,
ground_truth
)
# 3. Answer Faithfulness
metrics['faithfulness'] = self._faithfulness(
answer,
retrieved_docs
)
# 4. Answer Relevance
metrics['answer_relevance'] = self._answer_relevance(
answer,
question
)
# 5. Harmfulness (safety check)
metrics['harmful'] = self._check_harmful(answer)
return metrics
def _context_precision(self, docs, question):
"""How relevant are retrieved docs to the question?"""
# Use LLM to rate relevance
prompt = f"""Rate relevance of each document to the question.
Question: {question}
Documents:
{chr(10).join([f"{i+1}. {d.content[:200]}" for i, d in enumerate(docs)])}
Rate 1-5 for each document's relevance:"""
response = self.llm.generate(prompt)
# Parse and calculate average
return self._parse_rating(response)
def _faithfulness(self, answer, docs):
"""Does the answer match the retrieved context?"""
prompt = f"""Check if the answer is supported by the context.
Context:
{chr(10).join([d.content for d in docs])}
Answer:
{answer}
Is the answer fully supported by the context? Answer yes or no:"""
response = self.llm.generate(prompt).lower()
return 1.0 if 'yes' in response else 0.0
Continuous Evaluation Pipeline
# RAG evaluation pipeline
evaluation:
stages:
- name: "Retrieval Metrics"
metrics:
- "Recall@K"
- "MRR (Mean Reciprocal Rank)"
- "Context Precision"
- name: "Generation Metrics"
metrics:
- "Answer Relevance"
- "Faithfulness"
- "Hallucination Rate"
- name: "End-to-End"
metrics:
- "RAGAS Score"
- "Human Evaluation"
- "Task Completion Rate"
Production Patterns
Caching Strategy
# Intelligent caching for RAG
class RAGCache:
def __init__(self, redis_client):
self.cache = redis_client
def get_cached_response(self, question):
"""Check cache for similar questions"""
# Hash the question
question_hash = hash(question)
# Check cache
cached = self.cache.get(f"rag:{question_hash}")
if cached:
return json.loads(cached)
return None
def cache_response(self, question, answer, retrieved_docs):
"""Cache the response"""
question_hash = hash(question)
data = {
'answer': answer,
'doc_ids': [d.id for d in retrieved_docs],
'timestamp': time.time()
}
# Cache for 1 hour
self.cache.setex(
f"rag:{question_hash}",
3600,
json.dumps(data)
)
Fallback Strategies
# Multi-tier fallback
class RAGFallback:
def __init__(self, primary_rag, fallback_rag, llm_only):
self.primary = primary_rag
self.fallback = fallback_rag
self.llm_only = llm_only
def query(self, question):
"""Try primary, then fallback, then LLM only"""
# Try primary RAG
try:
result = self.primary.query(question)
if result.confidence > 0.7:
return result
# Low confidence, try fallback
if result.confidence > 0.3:
fallback_result = self.fallback.query(question)
if fallback_result.confidence > result.confidence:
return fallback_result
except Exception as e:
log.warning(f"Primary RAG failed: {e}")
# Try fallback RAG
try:
return self.fallback.query(question)
except Exception as e:
log.warning(f"Fallback RAG failed: {e}")
# Final fallback: LLM only
return self.llm_only.generate(question)
Common Pitfalls
1. Poor Chunking Strategy
Wrong:
# Fixed-size chunking ignores semantic boundaries
chunks = [text[i:i+1000] for i in range(0, len(text), 1000)]
# Result: Broken sentences, lost context
Correct:
# Semantic chunking preserves meaning
chunker = SemanticChunker(threshold=0.5)
chunks = chunker.chunk(text)
# Result: Coherent, meaningful chunks
2. Ignoring Query Analysis
Wrong:
# Direct retrieval without query understanding
results = vector_store.query(user_query)
# Result: Misses intent, poor recall
Correct:
# Transform and expand query
expanded = query_transformer.expand_query(user_query)
decomposed = query_transformer.decompose_query(user_query)
results = hybrid_search.multi_way_search(decomposed)
# Result: Better intent matching, higher recall
3. No Reranking
Wrong:
# Trust initial retrieval scores
results = vector_store.query(query, top_k=10)
# Result: Suboptimal ranking, missing best docs
Correct:
# Re-rank with cross-encoder
initial = vector_store.query(query, top_k=50)
reranked = cross_encoder.rerank(query, initial, top_k=10)
# Result: Best documents at top
Key Takeaways
- Hybrid search combines semantic and keyword for better results
- Query transformation improves recall for complex queries
- Parent document retrieval provides fuller context
- Multi-vector storage captures different aspects of documents
- Cross-encoder reranking significantly improves relevance
- Hierarchical context maximizes information within token limits
- Evaluation is critical - measure retrieval and generation separately
- Caching and fallbacks ensure production reliability
Comments