Skip to main content
โšก Calmops

Advanced RAG Optimization: Production-Ready Retrieval Systems

Introduction

Retrieval-Augmented Generation (RAG) has transformed how we build AI systems that need access to external knowledge. However, moving from a basic RAG prototype to a production-ready system requires careful optimization across multiple dimensions: retrieval quality, latency, relevance, and scalability.

Advanced RAG optimization encompasses sophisticated techniques beyond simple embedding-based retrieval: intelligent document chunking, query transformations, hybrid search, reranking models, and sophisticated caching strategies. These optimizations can dramatically improve the quality and efficiency of RAG systems.

In 2026, production RAG systems require a comprehensive understanding of these techniques. This guide explores advanced optimization strategies that can take your RAG system from prototype to production.

Retrieval Quality Optimization

1. Intelligent Document Chunking

The foundation of good retrieval is proper document segmentation:

import re
from typing import List, Dict

class SemanticChunker:
    """
    Semantic chunking using embedding similarity.
    
    Splits documents at semantically coherent boundaries.
    """
    
    def __init__(self, encoder, min_chunk_size=100, max_chunk_size=1000):
        self.encoder = encoder
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
        
    def chunk_by_sentence(self, text: str) -> List[str]:
        """
        Split by sentences, then combine into chunks.
        """
        # Split into sentences
        sentences = re.split(r'(?<=[.!?])\s+', text)
        
        chunks = []
        current_chunk = []
        current_size = 0
        
        for sentence in sentences:
            sentence_size = len(sentence)
            
            if current_size + sentence_size > self.max_chunk_size and current_size >= self.min_chunk_size:
                # Start new chunk
                chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]
                current_size = sentence_size
            else:
                current_chunk.append(sentence)
                current_size += sentence_size
                
        # Add remaining
        if current_chunk:
            chunks.append(' '.join(current_chunk))
            
        return chunks
    
    def chunk_by_embedding(self, text: str) -> List[str]:
        """
        Split using embedding-based boundary detection.
        """
        # Split into overlapping segments
        words = text.split()
        segments = []
        
        for i in range(0, len(words), self.max_chunk_size // 2):
            segment = ' '.join(words[i:i + self.max_chunk_size])
            segments.append(segment)
            
        # Compute embeddings
        embeddings = self.encoder.encode(segments)
        
        # Find boundaries where similarity drops
        boundaries = [0]
        
        for i in range(1, len(segments)):
            similarity = self.cosine_similarity(embeddings[i-1], embeddings[i])
            
            if similarity < 0.7:  # Threshold
                boundaries.append(i)
                
        # Create chunks
        chunks = []
        for i in range(len(boundaries)):
            start = boundaries[i]
            end = boundaries[i + 1] if i + 1 < len(boundaries) else len(segments)
            chunk = ' '.join(segments[start:end])
            chunks.append(chunk)
            
        return chunks
    
    @staticmethod
    def cosine_similarity(a, b):
        import numpy as np
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-10)


class MarkdownChunker:
    """
    Chunk by markdown structure (headings, code blocks, etc.)
    """
    
    def __init__(self, min_chunk_size=100, max_chunk_size=1000):
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
        
    def chunk_markdown(self, markdown_text: str) -> List[Dict]:
        """
        Split markdown into structured chunks.
        """
        chunks = []
        
        # Split by headers
        sections = re.split(r'(^#+\s+.+$)', markdown_text, flags=re.MULTILINE)
        
        current_section = ""
        current_heading = "Introduction"
        
        for section in sections:
            if section.startswith('#'):
                # Save previous section
                if current_section.strip():
                    chunks.append({
                        'heading': current_heading,
                        'content': current_section.strip()
                    })
                    
                current_heading = section.strip()
                current_section = ""
            else:
                current_section += section
                
        # Add final section
        if current_section.strip():
            chunks.append({
                'heading': current_heading,
                'content': current_section.strip()
            })
            
        # Further split large chunks
        final_chunks = []
        for chunk in chunks:
            if len(chunk['content']) > self.max_chunk_size:
                # Split by paragraphs
                paragraphs = chunk['content'].split('\n\n')
                subchunk = ""
                
                for para in paragraphs:
                    if len(subchunk) + len(para) > self.max_chunk_size:
                        final_chunks.append({
                            'heading': chunk['heading'],
                            'content': subchunk
                        })
                        subchunk = para
                    else:
                        subchunk += "\n\n" + para
                        
                if subchunk:
                    final_chunks.append({
                        'heading': chunk['heading'],
                        'content': subchunk
                    })
            else:
                final_chunks.append(chunk)
                
        return final_chunks

2. Query Transformations

Transform queries to improve retrieval:

class QueryTransformer:
    """
    Transform queries to improve retrieval quality.
    """
    
    def __init__(self, llm=None):
        self.llm = llm
        
    def expand_query(self, query: str) -> List[str]:
        """
        Expand query with synonyms and related terms.
        """
        expansions = [
            query,
            query.lower(),
            query.upper(),
        ]
        
        # Add common variations
        word_mappings = {
            'buy': ['purchase', 'get', 'acquire'],
            'find': ['search', 'locate', 'discover'],
            'help': ['assist', 'support', 'aid'],
            'info': ['information', 'details', 'data'],
        }
        
        words = query.lower().split()
        for word in words:
            if word in word_mappings:
                for syn in word_mappings[word]:
                    expanded = query.lower().replace(word, syn)
                    expansions.append(expanded)
                    
        return list(set(expansions))
    
    def decompose_query(self, query: str) -> List[str]:
        """
        Decompose complex query into sub-queries.
        """
        if self.llm:
            prompt = f"""Decompose this complex question into simpler sub-questions:

Question: {query}

Return sub-questions, one per line:"""
            
            result = self.llm.generate(prompt)
            sub_questions = result.split('\n')
            
            return [query] + [q.strip() for q in sub_questions if q.strip()]
            
        # Rule-based decomposition
        sub_queries = [query]
        
        # Split on "and", "or", ","
        connectors = [' and ', ' or ', ', ']
        
        for connector in connectors:
            if connector in query.lower():
                parts = query.split(connector)
                sub_queries = [q.strip() for q in parts if q.strip()]
                break
                
        return sub_queries
    
    def rewrite_for_retrieval(self, query: str) -> str:
        """
        Rewrite query to be more retrieval-friendly.
        """
        if self.llm:
            prompt = f"""Rewrite this query to be better for semantic search:

Original: {query}

Rewrite to include key concepts and be self-contained:"""
            
            return self.llm.generate(prompt)
            
        return query


class SubQueryRetriever:
    """
    Retrieve using multiple sub-queries and combine results.
    """
    
    def __init__(self, retriever, query_transformer):
        self.retriever = retriever
        self.transformer = query_transformer
        
    def retrieve(self, query: str, top_k=5):
        """
        Decompose, retrieve, and merge.
        """
        # Get sub-queries
        sub_queries = self.transformer.decompose_query(query)
        
        all_results = []
        
        # Retrieve for each sub-query
        for sq in sub_queries:
            results = self.retriever.retrieve(sq, top_k=top_k)
            all_results.extend(results)
            
        # Deduplicate and re-rank
        unique_results = self.deduplicate(all_results)
        
        return unique_results[:top_k]
    
    def deduplicate(self, results):
        """
        Remove duplicate results.
        """
        seen = set()
        unique = []
        
        for r in results:
            if r['id'] not in seen:
                seen.add(r['id'])
                unique.append(r)
                
        return unique

Combine multiple retrieval methods:

class HybridRetriever:
    """
    Combine vector search with keyword (BM25) search.
    """
    
    def __init__(self, vector_store, keyword_index, alpha=0.5):
        self.vector_store = vector_store
        self.keyword_index = keyword_index
        self.alpha = alpha  # Weight for vector search
        
    def retrieve(self, query: str, top_k=10):
        """
        Combine vector and keyword retrieval.
        """
        # Vector search
        vector_results = self.vector_store.search(query, top_k=top_k*2)
        
        # Keyword search
        keyword_results = self.keyword_index.search(query, top_k=top_k*2)
        
        # Normalize scores
        vector_scores = self.normalize_scores(vector_results)
        keyword_scores = self.normalize_scores(keyword_results)
        
        # Merge results
        merged = {}
        
        for doc_id, score in vector_scores.items():
            if doc_id not in merged:
                merged[doc_id] = {'score': 0, 'data': None}
            merged[doc_id]['score'] += self.alpha * score
            merged[doc_id]['data'] = vector_results.get(doc_id)
            
        for doc_id, score in keyword_scores.items():
            if doc_id not in merged:
                merged[doc_id] = {'score': 0, 'data': None}
            merged[doc_id]['score'] += (1 - self.alpha) * score
            
        # Sort by combined score
        ranked = sorted(merged.values(), key=lambda x: x['score'], reverse=True)
        
        return ranked[:top_k]
    
    def normalize_scores(self, results):
        """
        Min-max normalize scores to [0, 1].
        """
        if not results:
            return {}
            
        scores = [r['score'] for r in results]
        min_s, max_s = min(scores), max(scores)
        
        if max_s - min_s < 1e-10:
            return {r['id']: 0.5 for r in results}
            
        return {
            r['id']: (r['score'] - min_s) / (max_s - min_s)
            for r in results
        }

4. Reranking

Improve initial retrieval with reranking:

class CrossEncoderReranker:
    """
    Use cross-encoder for precise reranking.
    """
    
    def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
        from sentence_transformers import CrossEncoder
        self.cross_encoder = CrossEncoder(model_name)
        
    def rerank(self, query: str, results: List[Dict], top_k=5):
        """
        Rerank results using cross-encoder.
        """
        if not results:
            return []
            
        # Create query-document pairs
        pairs = [(query, r['content']) for r in results]
        
        # Get relevance scores
        scores = self.cross_encoder.predict(pairs)
        
        # Add scores and sort
        for result, score in zip(results, scores):
            result['rerank_score'] = float(score)
            
        # Sort by rerank score
        reranked = sorted(results, key=lambda x: x['rerank_score'], reverse=True)
        
        return reranked[:top_k]


class LLM reranker:
    """
    Use LLM for intelligent reranking.
    """
    
    def __init__(self, llm):
        self.llm = llm
        
    def rerank_with_llm(self, query: str, results: List[Dict], top_k=5):
        """
        Use LLM to score and reorder results.
        """
        if not results:
            return []
            
        # Score each result with LLM
        scored_results = []
        
        for result in results:
            relevance = self.score_relevance(query, result['content'])
            result['llm_score'] = relevance
            scored_results.append(result)
            
        # Sort by LLM score
        reranked = sorted(scored_results, key=lambda x: x['llm_score'], reverse=True)
        
        return reranked[:top_k]
    
    def score_relevance(self, query: str, document: str) -> float:
        """
        Score query-document relevance with LLM.
        """
        prompt = f"""On a scale of 1-10, how relevant is this document to the query?

Query: {query}

Document: {document[:500]}...

Relevance score:"""
        
        try:
            score = float(self.llm.generate(prompt).strip())
            return score / 10.0  # Normalize to [0, 1]
        except:
            return 0.5  # Default

Complete RAG Pipeline

class AdvancedRAGPipeline:
    """
    Production-ready RAG pipeline with optimizations.
    """
    
    def __init__(self, config):
        self.config = config
        
        # Components
        self.chunker = SemanticChunker(
            encoder=config.encoder,
            min_chunk_size=config.min_chunk_size,
            max_chunk_size=config.max_chunk_size
        )
        
        self.query_transformer = QueryTransformer(llm=config.llm)
        self.vector_store = config.vector_store
        self.keyword_index = config.keyword_index
        self.reranker = CrossEncoderReranker() if config.use_reranker else None
        
        # Hybrid search
        self.hybrid = HybridRetriever(
            self.vector_store, 
            self.keyword_index,
            alpha=config.hybrid_alpha
        ) if config.use_hybrid else None
        
    def index_documents(self, documents: List[Dict]):
        """
        Index documents with optimal chunking.
        """
        for doc in documents:
            # Chunk document
            if doc.get('type') == 'markdown':
                chunks = MarkdownChunker().chunk_markdown(doc['content'])
            else:
                chunks = self.chunker.chunk_by_embedding(doc['content'])
                
            # Embed and store
            for chunk in chunks:
                embedding = self.config.encoder.encode(chunk)
                
                self.vector_store.add({
                    'id': f"{doc['id']}_{chunk['index']}",
                    'content': chunk,
                    'metadata': doc.get('metadata', {})
                })
                
                # Also add to keyword index
                self.keyword_index.add(chunk)
                
    def retrieve(self, query: str, top_k=10):
        """
        Optimized retrieval with multiple techniques.
        """
        # Transform query
        expanded_queries = self.query_transformer.expand_query(query)
        
        all_results = []
        
        # Retrieve for each expanded query
        for q in expanded_queries:
            if self.hybrid:
                results = self.hybrid.retrieve(q, top_k=top_k)
            else:
                results = self.vector_store.search(q, top_k=top_k)
                
            all_results.extend(results)
            
        # Deduplicate
        unique_results = self.deduplicate(all_results)
        
        # Rerank if enabled
        if self.reranker:
            unique_results = self.reranker.rerank(query, unique_results, top_k=top_k)
            
        return unique_results[:top_k]
    
    def generate(self, query: str, context_results: List[Dict]) -> str:
        """
        Generate response with retrieved context.
        """
        # Build context from results
        context = "\n\n".join([
            f"[{i+1}] {r['content']}" 
            for i, r in enumerate(context_results[:5])
        ])
        
        prompt = f"""Use the following context to answer the question.

Context:
{context}

Question: {query}

Answer based on the context:"""
        
        return self.config.llm.generate(prompt)
    
    def query(self, query: str) -> Dict:
        """
        Full RAG query pipeline.
        """
        # Retrieve
        results = self.retrieve(query, top_k=10)
        
        # Generate
        answer = self.generate(query, results)
        
        return {
            'answer': answer,
            'sources': [
                {'content': r['content'][:200], 'score': r.get('score', 0)}
                for r in results[:3]
            ]
        }
    
    def deduplicate(self, results):
        """Remove duplicates."""
        seen = set()
        unique = []
        
        for r in results:
            if r.get('id') not in seen:
                seen.add(r.get('id'))
                unique.append(r)
                
        return unique

Production Optimizations

1. Caching Strategy

class RAGCaching:
    """
    Intelligent caching for RAG systems.
    """
    
    def __init__(self, vector_store, cache_ttl=3600):
        self.vector_store = vector_store
        self.cache = {}
        self.cache_ttl = cache_ttl
        import time
        self.time = time
        
    def get_cached_results(self, query: str):
        """
        Check cache for query results.
        """
        query_hash = hash(query)
        
        if query_hash in self.cache:
            timestamp, results = self.cache[query_hash]
            
            if self.time.time() - timestamp < self.cache_ttl:
                return results
                
        return None
    
    def cache_results(self, query: str, results: List[Dict]):
        """
        Cache retrieval results.
        """
        query_hash = hash(query)
        self.cache[query_hash] = (self.time.time(), results)
        
    def retrieve_with_cache(self, query: str, retriever):
        """
        Retrieve with caching.
        """
        # Check cache
        cached = self.get_cached_results(query)
        if cached:
            return cached
            
        # Retrieve fresh
        results = retriever(query)
        
        # Cache
        self.cache_results(query, results)
        
        return results

2. Query Planning

class QueryRouter:
    """
    Route queries to appropriate retrieval strategies.
    """
    
    def __init__(self, llm):
        self.llm = llm
        
    def classify_query(self, query: str) -> str:
        """
        Classify query type to select strategy.
        """
        # Simple keyword-based
        if any(word in query.lower() for word in ['compare', 'difference', 'vs']):
            return 'comparison'
        elif any(word in query.lower() for word in ['list', 'all', 'show']):
            return 'list'
        elif query.lower().startswith(('how', 'what', 'why', 'when', 'where')):
            return 'factual'
        else:
            return 'general'
            
    def route(self, query: str) -> Dict:
        """
        Determine retrieval strategy.
        """
        query_type = self.classify_query(query)
        
        strategies = {
            'comparison': {
                'use_hybrid': True,
                'use_reranker': True,
                'top_k': 15,
                'expand_query': True
            },
            'list': {
                'use_hybrid': False,
                'use_reranker': False,
                'top_k': 20,
                'expand_query': True
            },
            'factual': {
                'use_hybrid': True,
                'use_reranker': True,
                'top_k': 5,
                'expand_query': False
            },
            'general': {
                'use_hybrid': True,
                'use_reranker': False,
                'top_k': 10,
                'expand_query': False
            }
        }
        
        return strategies.get(query_type, strategies['general'])

3. Evaluation

class RAGEvaluator:
    """
    Evaluate RAG system quality.
    """
    
    def __init__(self, llm):
        self.llm = llm
        
    def evaluate_retrieval(self, query: str, retrieved_docs: List[Dict], 
                          ground_truth: List[str]) -> Dict:
        """
        Evaluate retrieval quality.
        """
        # Precision@K
        retrieved_ids = set(r['id'] for r in retrieved_docs)
        relevant_ids = set(ground_truth)
        
        precision_at_k = {
            f'P@{k}': len(retrieved_ids & relevant_ids) / k 
            for k in [1, 3, 5, 10]
        }
        
        # Recall@K
        recall_at_k = {
            f'R@{k}': len(retrieved_ids & relevant_ids) / len(relevant_ids)
            for k in [1, 3, 5, 10]
        }
        
        # MRR
        mrr = 0
        for i, doc in enumerate(retrieved_docs[:10]):
            if doc['id'] in relevant_ids:
                mrr = 1 / (i + 1)
                break
                
        return {
            **precision_at_k,
            **recall_at_k,
            'MRR': mrr
        }
    
    def evaluate_generation(self, query: str, response: str, 
                          context: List[Dict]) -> Dict:
        """
        Evaluate generation quality.
        """
        # Context relevance (LLM-based)
        context_relevance = self.llm.evaluate(
            f"""Rate how well the context supports the answer from 1-5:

Context: {context[:2]}
Answer: {response}

Relevance:"""
        )
        
        # Faithfulness (LLM-based)
        faithfulness = self.llm.evaluate(
            f"""Rate how faithful the answer is to the context from 1-5:

Context: {context}
Answer: {response}

Faithfulness:"""
        )
        
        return {
            'context_relevance': context_relevance,
            'faithfulness': faithfulness
        }

Best Practices

Chunking Strategies

  1. Small chunks (256-512): Better precision, more chunks to search
  2. Large chunks (1024+): More context, may include noise
  3. Overlap: Use 10-20% overlap to capture boundaries

Retrieval Optimization

  1. Hybrid search: Combine vector + keyword for best results
  2. Query expansion: Especially for ambiguous queries
  3. Reranking: Always rerank for production systems

Latency Optimization

class LatencyOptimizer:
    """
    Optimize RAG latency.
    """
    
    @staticmethod
    def async_retrieval(query, retrievers):
        """
        Run retrievers in parallel.
        """
        import asyncio
        
        async def run_retriever(retriever, query):
            return await asyncio.to_thread(retriever, query)
            
        results = asyncio.run(
            asyncio.gather(*[run_retriever(r, query) for r in retrievers])
        )
        
        return results
    
    @staticmethod
    def prefetch_common(queries, vector_store):
        """
        Prefetch embeddings for common queries.
        """
        pass

Future Directions in 2026

Emerging Techniques

  1. Adaptive Retrieval: Retrieve more or less based on query complexity
  2. Self-RAG: Train models to know when to retrieve
  3. Graph RAG: Use knowledge graphs for better retrieval
  4. Multimodal RAG: Handle images, audio alongside text

Resources

Conclusion

Advanced RAG optimization is essential for production systems. The techniques exploredโ€”intelligent chunking, query transformations, hybrid search, reranking, and sophisticated cachingโ€”work together to create a retrieval system that is both accurate and efficient.

The key is to start with proper document chunking, then layer on query transformations and hybrid search, and finally use reranking to polish results. Throughout, monitor latency and cache aggressively for production.

As RAG systems continue to evolve, expect more sophisticated techniques like adaptive retrieval and multimodal support to become standard. The future of AI is retrieval-augmented, and optimizing these systems is crucial for success.

Comments