Skip to main content
โšก Calmops

RAG 2.0: Advanced Retrieval-Augmented Generation in 2026

Introduction

Retrieval-Augmented Generation (RAG) has evolved significantly from its early implementations. In 2025, RAG 2.0 represents a new generation of architectures that address the limitations of basic RAG systems. This guide covers advanced patterns for building production-ready RAG systems that deliver accurate, contextual, and reliable responses.


What Is RAG 2.0?

The Evolution from Basic RAG

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    RAG Evolution                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚  RAG 1.0                 RAG 2.0                           โ”‚
โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€               โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                          โ”‚
โ”‚  โ€ข Simple retrieval     โ€ข Hybrid search                    โ”‚
โ”‚  โ€ข Single vector DB     โ€ข Multi-vector storage             โ”‚
โ”‚  โ€ข Chunk + query        โ€ข Query transformation             โ”‚
โ”‚  โ€ข Basic chunking       โ€ข Intelligent chunking             โ”‚
โ”‚  โ€ข No reranking         โ€ข Cross-encoder reranking         โ”‚
โ”‚  โ€ข Flat context         โ€ข Hierarchical context             โ”‚
โ”‚  โ€ข Single modality      โ€ข Multi-modal support              โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key Components of RAG 2.0

# RAG 2.0 Architecture
rag_architecture:
  components:
    - name: "Data Processing"
      elements: ["Document loaders", "Text splitters", "Metadata extractors"]
      
    - name: "Storage Layer"
      elements: ["Vector DB", "Document store", "Graph DB", "Cache"]
      
    - name: "Retrieval Engine"
      elements: ["Hybrid search", "Query transformation", "Reranking"]
      
    - name: "Generation"
      elements: ["Context assembly", "Prompt engineering", "LLM calls"]
      
    - name: "Evaluation"
      elements: ["Relevance scoring", "Hallucination detection", "Metrics"]

Advanced Retrieval Patterns

Combine keyword and semantic search for better results:

# Hybrid search implementation
from pinecone import Pinecone
import numpy as np

class HybridSearch:
    def __init__(self, vector_db, keyword_index):
        self.vector_db = vector_db
        self.keyword_index = keyword_index
    
    def search(self, query, top_k=10, alpha=0.5):
        """
        Hybrid search combining semantic and keyword matching
        
        alpha: weight for semantic (1-alpha) for keyword
        """
        # Semantic search
        semantic_results = self.vector_db.query(
            vector=self.embed_query(query),
            top_k=top_k * 2,
            include_scores=True
        )
        
        # Keyword search (BM25)
        keyword_results = self.keyword_index.search(
            query=query,
            top_k=top_k * 2
        )
        
        # Normalize and combine scores
        combined = self._fuse_results(
            semantic_results,
            keyword_results,
            alpha=alpha,
            top_k=top_k
        )
        
        return combined
    
    def _fuse_results(self, semantic, keyword, alpha, top_k):
        """RRF-based result fusion"""
        # Reciprocal Rank Fusion
        rrf_scores = {}
        
        for rank, item in enumerate(semantic['matches']):
            doc_id = item['id']
            rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + \
                (1.0 / (rank + 60))
        
        for rank, item in enumerate(keyword):
            doc_id = item['id']
            rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + \
                (1.0 / (rank + 60)) * (1 - alpha)
        
        # Sort by combined score
        sorted_results = sorted(
            rrf_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]
        
        return sorted_results

2. Query Transformation

Transform queries to improve retrieval:

# Query transformation pipeline
class QueryTransformer:
    def __init__(self, llm):
        self.llm = llm
    
    def expand_query(self, query):
        """Expand query with synonyms and related terms"""
        prompt = f"""Expand this query with related terms:
        
Query: {query}

Return only the expanded query:"""
        
        expanded = self.llm.generate(prompt)
        return f"{query} {expanded}"
    
    def decompose_query(self, query):
        """Break complex query into sub-queries"""
        prompt = f"""Decompose into simpler sub-questions:
        
Query: {query}

Return as JSON array of sub-questions:"""
        
        sub_questions = json.loads(self.llm.generate(prompt))
        return sub_questions
    
    def generate_hypothetical_doc(self, query):
        """Generate hypothetical document for retrieval"""
        prompt = f"""Write a brief document that would answer this query:
        
Query: {query}

Write 2-3 sentences:"""
        
        hypothetical = self.llm.generate(prompt)
        return hypothetical

3. Parent Document Retrieval

Retrieve larger context while maintaining relevance:

# Parent document retrieval
class ParentDocumentRetriever:
    def __init__(self, vector_store, document_store):
        self.vector_store = vector_store
        self.document_store = document_store
    
    def retrieve(self, query, child_top_k=20, parent_top_k=5):
        """
        Two-stage retrieval:
        1. Find relevant chunks (children)
        2. Retrieve full documents (parents)
        """
        # Stage 1: Get relevant chunks
        child_results = self.vector_store.similarity_search(
            query=query,
            k=child_top_k
        )
        
        # Get parent document IDs
        parent_ids = list(set([
            r.metadata['parent_doc_id'] 
            for r in child_results
        ]))
        
        # Stage 2: Get full parent documents
        parent_docs = self.document_store.get_by_ids(parent_ids)
        
        # Re-rank by chunk relevance
        scored_parents = self._score_parents(
            parent_docs, 
            child_results
        )
        
        return scored_parents[:parent_top_k]
    
    def _score_parents(self, parents, children):
        """Score parents based on child relevance"""
        child_scores = {}
        for child in children:
            pid = child.metadata['parent_doc_id']
            child_scores[pid] = child_scores.get(pid, 0) + child.score
        
        for parent in parents:
            parent.relevance_score = child_scores.get(parent.id, 0)
        
        return sorted(parents, key=lambda x: x.relevance_score, reverse=True)

Storage and Indexing Strategies

Multi-Vector Storage

# Store multiple vectors per document
class MultiVectorIndex:
    def __init__(self, chroma_client):
        self.client = chroma_client
    
    def index_document(self, doc_id, text, metadata):
        """Create multiple vector representations"""
        
        # 1. Full document embedding
        full_embedding = self.embed(text)
        
        # 2. Summary embedding (if available)
        summary = self.summarize(text)
        summary_embedding = self.embed(summary)
        
        # 3. Key phrases embeddings
        key_phrases = self.extract_key_phrases(text)
        phrase_embeddings = [self.embed(phrase) for phrase in key_phrases]
        
        # Store all in separate collections
        self.collections['full'].add(
            ids=[doc_id],
            embeddings=[full_embedding],
            metadatas=[metadata]
        )
        
        self.collections['summary'].add(
            ids=[doc_id],
            embeddings=[summary_embedding],
            metadatas=[metadata]
        )
        
        self.collections['phrases'].add(
            ids=[f"{doc_id}_phrase_{i}" for i in range(len(phrase_embeddings))],
            embeddings=phrase_embeddings,
            metadatas=[{**metadata, 'phrase': p} for p, i in zip(key_phrases, range(len(phrase_embeddings)))]
        )

Intelligent Chunking

# Semantic chunking
class SemanticChunker:
    def __init__(self, embedding_model, threshold=0.5):
        self.embedding_model = embedding_model
        self.threshold = threshold
    
    def chunk(self, text):
        """Split text at semantic boundaries"""
        
        # Split into sentences
        sentences = self._split_sentences(text)
        
        # Create embeddings for each sentence
        embeddings = [self.embedding_model.embed(s) for s in sentences]
        
        # Find semantic boundaries
        chunks = []
        current_chunk = [sentences[0]]
        
        for i in range(1, len(sentences)):
            # Calculate similarity to previous sentence
            similarity = self._cosine_similarity(
                embeddings[i-1], 
                embeddings[i]
            )
            
            if similarity < self.threshold:
                # New chunk
                chunks.append(' '.join(current_chunk))
                current_chunk = [sentences[i]]
            else:
                current_chunk.append(sentences[i])
        
        # Add final chunk
        if current_chunk:
            chunks.append(' '.join(current_chunk))
        
        return chunks
    
    def _cosine_similarity(self, a, b):
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

Reranking Strategies

Cross-Encoder Reranking

# Cross-encoder reranking
from sentence_transformers import CrossEncoder

class CrossEncoderReranker:
    def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
        self.model = CrossEncoder(model_name)
    
    def rerank(self, query, documents, top_k=5):
        """
        Re-score all documents with cross-encoder
        
        More accurate than bi-encoder but slower
        """
        # Create pairs for scoring
        pairs = [(query, doc) for doc in documents]
        
        # Get cross-encoder scores
        scores = self.model.predict(pairs)
        
        # Sort by score
        ranked = sorted(
            zip(documents, scores),
            key=lambda x: x[1],
            reverse=True
        )
        
        return ranked[:top_k]

Learning to Rank

# LTR with XGBoost
class LearningToRank:
    def __init__(self):
        self.model = xgboost.XGBRanker()
    
    def train(self, features, relevance_labels, query_ids):
        """
        Train a ranking model
        
        features: [N, M] feature matrix
        relevance_labels: [N] relevance scores
        query_ids: [N] query identifiers
        """
        # Group by query for XGBoost ranking
        group = self._get_group_sizes(query_ids)
        
        self.model.fit(
            features, 
            relevance_labels,
            group=group
        )
    
    def predict(self, features):
        return self.model.predict(features)
    
    def _get_group_sizes(self, query_ids):
        unique, counts = np.unique(query_ids, return_counts=True)
        return counts.tolist()

Context Assembly

Hierarchical Context

# Assemble hierarchical context
class HierarchicalContextAssembler:
    def __init__(self, token_limit=4000):
        self.token_limit = token_limit
    
    def assemble(self, retrieved_docs, query):
        """
        Assemble context with different levels of detail
        """
        context_parts = []
        tokens_used = 0
        
        # 1. Add top 1-2 docs with full detail
        for doc in retrieved_docs[:2]:
            if tokens_used + doc.tokens < self.token_limit * 0.6:
                context_parts.append(f"## Document: {doc.title}\n{doc.content}")
                tokens_used += doc.tokens
        
        # 2. Add summaries for next 3-5 docs
        for doc in retrieved_docs[2:7]:
            if tokens_used + 100 < self.token_limit * 0.9:
                context_parts.append(f"## Summary: {doc.title}\n{doc.summary}")
                tokens_used += 100
        
        # 3. Add metadata for remaining
        for doc in retrieved_docs[7:]:
            if tokens_used + 30 < self.token_limit:
                context_parts.append(f"- {doc.title}: {doc.metadata.get('description', '')}")
                tokens_used += 30
        
        return '\n\n'.join(context_parts)

Dynamic Context Window

# Sliding window context
class SlidingWindowContext:
    def __init__(self, window_size=2000, overlap=200):
        self.window_size = window_size
        self.overlap = overlap
    
    def build_context(self, query, retrieved_docs):
        """Build context with sliding windows for long documents"""
        contexts = []
        
        for doc in retrieved_docs:
            if doc.tokens <= self.window_size:
                contexts.append(doc.content)
            else:
                # Create overlapping windows
                windows = self._create_windows(doc.content)
                
                # Score windows by query relevance
                scored_windows = self._score_windows(windows, query)
                
                # Take top windows that fit in limit
                contexts.extend(self._select_windows(scored_windows))
        
        return self._truncate('\n\n'.join(contexts))
    
    def _create_windows(self, text):
        tokens = text.split()
        windows = []
        
        for i in range(0, len(tokens), self.window_size - self.overlap):
            window = ' '.join(tokens[i:i + self.window_size])
            windows.append(window)
        
        return windows

Evaluation and Optimization

RAG Evaluation Metrics

# RAG metrics
class RAGEvaluator:
    def __init__(self, llm):
        self.llm = llm
    
    def evaluate(self, question, answer, retrieved_docs, ground_truth=None):
        """
        Comprehensive RAG evaluation
        """
        metrics = {}
        
        # 1. Context Precision
        metrics['context_precision'] = self._context_precision(
            retrieved_docs, 
            question
        )
        
        # 2. Context Recall (if ground truth available)
        if ground_truth:
            metrics['context_recall'] = self._context_recall(
                retrieved_docs,
                ground_truth
            )
        
        # 3. Answer Faithfulness
        metrics['faithfulness'] = self._faithfulness(
            answer,
            retrieved_docs
        )
        
        # 4. Answer Relevance
        metrics['answer_relevance'] = self._answer_relevance(
            answer,
            question
        )
        
        # 5. Harmfulness (safety check)
        metrics['harmful'] = self._check_harmful(answer)
        
        return metrics
    
    def _context_precision(self, docs, question):
        """How relevant are retrieved docs to the question?"""
        # Use LLM to rate relevance
        prompt = f"""Rate relevance of each document to the question.

Question: {question}

Documents:
{chr(10).join([f"{i+1}. {d.content[:200]}" for i, d in enumerate(docs)])}

Rate 1-5 for each document's relevance:"""
        
        response = self.llm.generate(prompt)
        # Parse and calculate average
        return self._parse_rating(response)
    
    def _faithfulness(self, answer, docs):
        """Does the answer match the retrieved context?"""
        prompt = f"""Check if the answer is supported by the context.

Context:
{chr(10).join([d.content for d in docs])}

Answer:
{answer}

Is the answer fully supported by the context? Answer yes or no:"""
        
        response = self.llm.generate(prompt).lower()
        return 1.0 if 'yes' in response else 0.0

Continuous Evaluation Pipeline

# RAG evaluation pipeline
evaluation:
  stages:
    - name: "Retrieval Metrics"
      metrics:
        - "Recall@K"
        - "MRR (Mean Reciprocal Rank)"
        - "Context Precision"
        
    - name: "Generation Metrics"
      metrics:
        - "Answer Relevance"
        - "Faithfulness"
        - "Hallucination Rate"
        
    - name: "End-to-End"
      metrics:
        - "RAGAS Score"
        - "Human Evaluation"
        - "Task Completion Rate"

Production Patterns

Caching Strategy

# Intelligent caching for RAG
class RAGCache:
    def __init__(self, redis_client):
        self.cache = redis_client
    
    def get_cached_response(self, question):
        """Check cache for similar questions"""
        # Hash the question
        question_hash = hash(question)
        
        # Check cache
        cached = self.cache.get(f"rag:{question_hash}")
        
        if cached:
            return json.loads(cached)
        
        return None
    
    def cache_response(self, question, answer, retrieved_docs):
        """Cache the response"""
        question_hash = hash(question)
        
        data = {
            'answer': answer,
            'doc_ids': [d.id for d in retrieved_docs],
            'timestamp': time.time()
        }
        
        # Cache for 1 hour
        self.cache.setex(
            f"rag:{question_hash}",
            3600,
            json.dumps(data)
        )

Fallback Strategies

# Multi-tier fallback
class RAGFallback:
    def __init__(self, primary_rag, fallback_rag, llm_only):
        self.primary = primary_rag
        self.fallback = fallback_rag
        self.llm_only = llm_only
    
    def query(self, question):
        """Try primary, then fallback, then LLM only"""
        
        # Try primary RAG
        try:
            result = self.primary.query(question)
            
            if result.confidence > 0.7:
                return result
            
            # Low confidence, try fallback
            if result.confidence > 0.3:
                fallback_result = self.fallback.query(question)
                if fallback_result.confidence > result.confidence:
                    return fallback_result
        except Exception as e:
            log.warning(f"Primary RAG failed: {e}")
        
        # Try fallback RAG
        try:
            return self.fallback.query(question)
        except Exception as e:
            log.warning(f"Fallback RAG failed: {e}")
        
        # Final fallback: LLM only
        return self.llm_only.generate(question)

Common Pitfalls

1. Poor Chunking Strategy

Wrong:

# Fixed-size chunking ignores semantic boundaries
chunks = [text[i:i+1000] for i in range(0, len(text), 1000)]
# Result: Broken sentences, lost context

Correct:

# Semantic chunking preserves meaning
chunker = SemanticChunker(threshold=0.5)
chunks = chunker.chunk(text)
# Result: Coherent, meaningful chunks

2. Ignoring Query Analysis

Wrong:

# Direct retrieval without query understanding
results = vector_store.query(user_query)
# Result: Misses intent, poor recall

Correct:

# Transform and expand query
expanded = query_transformer.expand_query(user_query)
decomposed = query_transformer.decompose_query(user_query)
results = hybrid_search.multi_way_search(decomposed)
# Result: Better intent matching, higher recall

3. No Reranking

Wrong:

# Trust initial retrieval scores
results = vector_store.query(query, top_k=10)
# Result: Suboptimal ranking, missing best docs

Correct:

# Re-rank with cross-encoder
initial = vector_store.query(query, top_k=50)
reranked = cross_encoder.rerank(query, initial, top_k=10)
# Result: Best documents at top

Key Takeaways

  • Hybrid search combines semantic and keyword for better results
  • Query transformation improves recall for complex queries
  • Parent document retrieval provides fuller context
  • Multi-vector storage captures different aspects of documents
  • Cross-encoder reranking significantly improves relevance
  • Hierarchical context maximizes information within token limits
  • Evaluation is critical - measure retrieval and generation separately
  • Caching and fallbacks ensure production reliability

External Resources

Documentation

Tools

Learning

Comments