Skip to main content
โšก Calmops

MongoDB for AI: Vector Search, RAG, and Machine Learning

Introduction

The intersection of databases and artificial intelligence has created new opportunities for building intelligent applications. MongoDB has positioned itself as an excellent choice for AI applications with native vector search capabilities, integration with popular ML frameworks, and support for the Retrieval-Augmented Generation (RAG) pattern.

This comprehensive guide explores how to leverage MongoDB for AI applications. We will cover vector search, building RAG pipelines, ML feature stores, and integration with frameworks like LangChain.

Vector search enables semantic similarity matching, going beyond traditional keyword search to understand meaning and context.

What are Embeddings?

Embeddings are numerical representations of data that capture semantic meaning. They are generated by machine learning models and represented as arrays of floating-point numbers.

# Generate embeddings using OpenAI
from openai import OpenAI

client = OpenAI(api_key='your-api-key')

response = client.embeddings.create(
    input="The quick brown fox jumps over the lazy dog",
    model="text-embedding-3-small"
)

embedding = response.data[0].embedding
print(f"Dimension: {len(embedding)}")
# Output: Dimension: 1536

# Generate embeddings using Hugging Face
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
embedding = model.encode("The quick brown fox")
print(f"Dimension: {len(embedding)}")
# Output: Dimension: 384

MongoDB Atlas provides native vector search with the $vectorSearch aggregation stage.

// Create vector index
db.articles.createIndex(
  { embedding: 'knnVector' },
  {
    knnVector: {
      dimension: 1536,
      m: 2,
      efConstruction: 100
    }
  }
)

// Insert document with embedding
db.articles.insertOne({
  title: "Introduction to Machine Learning",
  content: "Machine learning is a subset of artificial intelligence...",
  embedding: [0.123, -0.456, 0.789, ...],  // 1536-dim vector
  author: "Dr. Smith",
  published_at: new Date("2026-01-15"),
  tags: ["AI", "ML", "Tutorial"]
})

Building RAG Pipelines

Retrieval-Augmented Generation (RAG) combines the power of LLMs with your data.

RAG Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Query     โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Retrieval  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚     LLM     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                          โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚ MongoDB   โ”‚
                    โ”‚ (Context) โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Complete RAG Implementation

from pymongo import MongoClient
from openai import OpenAI
from sentence_transformers import SentenceTransformer
import numpy as np

# Initialize clients
mongo_client = MongoClient('mongodb://localhost:27017')
openai_client = OpenAI(api_key='your-api-key')
embed_model = SentenceTransformer('all-MiniLM-L6-v2')

db = mongo_client['knowledge_base']
collection = db['documents']

def retrieve_context(query, top_k=5):
    """Retrieve relevant documents for a query."""
    # Generate query embedding
    query_embedding = embed_model.encode(query).tolist()
    
    # Perform vector search
    pipeline = [
        {
            '$vectorSearch': {
                'index': 'content_index',
                'path': 'embedding',
                'queryVector': query_embedding,
                'numCandidates': 20,
                'limit': top_k
            }
        },
        {
            '$project': {
                'title': 1,
                'content': 1,
                'source': 1,
                'score': {'$meta': 'vectorSearchScore'}
            }
        }
    ]
    
    results = list(collection.aggregate(pipeline))
    return results

def generate_response(query, context):
    """Generate response using retrieved context."""
    # Build prompt with context
    context_text = "\n\n".join([
        f"Source: {doc['title']}\n{doc['content'][:500]}"
        for doc in context
    ])
    
    prompt = f"""Based on the following context, answer the question.

Context:
{context_text}

Question: {query}

Answer:"""
    
    # Generate response
    response = openai_client.chat.completions.create(
        model='gpt-4o',
        messages=[{'role': 'user', 'content': prompt}],
        temperature=0.7
    )
    
    return response.choices[0].message.content

def rag_pipeline(query):
    """Complete RAG pipeline."""
    # Step 1: Retrieve
    context = retrieve_context(query)
    
    # Step 2: Generate
    response = generate_response(query, context)
    
    return {
        'response': response,
        'sources': [doc['title'] for doc in context]
    }

# Execute RAG pipeline
result = rag_pipeline("What is machine learning?")
print(result['response'])
print(f"\nSources: {result['sources']}")

Combine vector search with traditional filtering for better results.

// Hybrid search: vector + filter + keyword
const searchResults = await db.products.aggregate([
  {
    $vectorSearch: {
      index: 'product_embedding_index',
      path: 'embedding',
      queryVector: queryEmbedding,
      numCandidates: 100,
      limit: 20,
      filter: {
        category: 'Electronics',
        price: { $gte: 100, $lte: 2000 }
      }
    }
  },
  {
    $match: {
      $text: { $search: query }
    }
  },
  {
    $limit: 10
  },
  {
    $project: {
      name: 1,
      description: 1,
      price: 1,
      vectorScore: { $meta: 'vectorSearchScore' },
      textScore: { $meta: 'textSearchScore' },
      combinedScore: {
        $add: [
          { $multiply: ['$vectorScore', 0.6] },
          { $multiply: ['$textScore', 0.4] }
        ]
      }
    }
  }
])

Integration with LangChain

LangChain provides excellent integration with MongoDB for building AI applications.

MongoDB as Document Store

from langchain_community.document_loaders import TextLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pymongo import MongoClient

# Initialize MongoDB
client = MongoClient('mongodb://localhost:27017')
db = client['langchain']
collection = db['documents']

# Load and split documents
loader = TextLoader('article.txt')
documents = loader.load()

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100
)
splits = text_splitter.split_documents(documents)

# Create embeddings
embeddings = HuggingFaceEmbeddings(
    model_name='sentence-transformers/all-MiniLM-L6-v2'
)

# Create vector store
vector_store = MongoDBAtlasVectorSearch.from_documents(
    documents=splits,
    embedding=embeddings,
    collection=collection,
    index_name='content_index'
)

# Perform similarity search
query = "What is machine learning?"
docs = vector_store.similarity_search(query)
print(docs[0].page_content)

RAG Chain with MongoDB

from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

# Initialize components
llm = ChatOpenAI(model='gpt-4o', temperature=0)
vector_store = MongoDBAtlasVectorSearch(...)  # From previous example

# Create retriever
retriever = vector_store.as_retriever(
    search_type='similarity',
    search_kwargs={'k': 5}
)

# Create QA chain
prompt_template = """Use the following context to answer the question.
If the answer is not in the context, say so.

Context: {context}

Question: {question}

Answer:"""

prompt = PromptTemplate(
    template=prompt_template,
    input_variables=['context', 'question']
)

qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type='stuff',
    retriever=retriever,
    return_source_documents=True
)

# Execute
result = qa_chain.invoke("What is machine learning?")
print(result['result'])

# Check sources
for doc in result['source_documents']:
    print(f"Source: {doc.metadata.get('source', 'Unknown')}")

ML Feature Store

MongoDB can serve as a feature store for machine learning workflows.

Feature Store Architecture

from pymongo import MongoClient
import pandas as pd
from datetime import datetime, timedelta

client = MongoClient('mongodb://localhost:27017')
db = client['feature_store']

# Define feature schema
feature_schema = {
    'user_id': str,
    'timestamp': datetime,
    'features': {
        'account_age_days': float,
        'total_purchases': int,
        'avg_order_value': float,
        'last_login_days_ago': int,
        'device_mobile': bool,
        'session_count_30d': int
    },
    'label': int  # For supervised learning
}

def store_features(user_id, features, label=None):
    """Store computed features for a user."""
    document = {
        'user_id': user_id,
        'timestamp': datetime.utcnow(),
        'features': features,
    }
    
    if label is not None:
        document['label'] = label
    
    db.features.insert_one(document)

def get_features(user_id, window_days=30):
    """Retrieve features for a user within time window."""
    cutoff = datetime.utcnow() - timedelta(days=window_days)
    
    cursor = db.features.find({
        'user_id': user_id,
        'timestamp': {'$gte': cutoff}
    }).sort('timestamp', -1).limit(1)
    
    return list(cursor)

def compute_training_data(feature_ids):
    """Prepare training dataset."""
    data = []
    
    for feature_doc in db.features.find({
        '_id': {'$in': feature_ids}
    }):
        row = {
            **feature_doc['features'],
            'label': feature_doc.get('label')
        }
        data.append(row)
    
    return pd.DataFrame(data)

Real-Time Feature Computation

from datetime import datetime

def update_user_features(user_id, event_data):
    """Update features based on user event."""
    # Get latest features
    latest = db.features.find_one(
        {'user_id': user_id},
        sort=[('timestamp', -1)]
    )
    
    features = latest['features'] if latest else {
        'account_age_days': 0,
        'total_purchases': 0,
        'avg_order_value': 0,
        'last_login_days_ago': 0,
        'device_mobile': False,
        'session_count_30d': 0
    }
    
    # Update based on event
    if event_data['type'] == 'purchase':
        features['total_purchases'] += 1
        features['avg_order_value'] = (
            (features['avg_order_value'] * (features['total_purchases'] - 1) + 
             event_data['amount']) / features['total_purchases']
        )
    elif event_data['type'] == 'login':
        features['last_login_days_ago'] = 0
        features['session_count_30d'] += 1
    
    # Store updated features
    store_features(user_id, features)

Semantic Caching

Reduce LLM costs and latency with semantic caching.

Implementation

import numpy as np
from sentence_transformers import SentenceTransformer
from pymongo import MongoClient
import hashlib
import json

client = MongoClient('mongodb://localhost:27017')
db = client['cache']
embed_model = SentenceTransformer('all-MiniLM-L6-v2')

def get_cache_key(query, threshold=0.95):
    """Generate cache key based on semantic similarity."""
    embedding = embed_model.encode(query).tolist()
    
    # Search for similar query in cache
    cached = list(db.cache.aggregate([
        {
            '$vectorSearch': {
                'index': 'cache_embedding_index',
                'path': 'embedding',
                'queryVector': embedding,
                'numCandidates': 5,
                'limit': 1,
                'scoreDetails': True
            }
        },
        {
            '$match': {
                'score': { '$gte': threshold }
            }
        },
        {
            '$project': {
                'query': 1,
                'response': 1,
                'score': 1
            }
        }
    ]))
    
    if cached:
        return cached[0]['response'], cached[0]['query']
    
    return None, None

def cache_response(query, response):
    """Cache query-response pair."""
    embedding = embed_model.encode(query).tolist()
    
    db.cache.insert_one({
        'query': query,
        'response': response,
        'embedding': embedding,
        'timestamp': datetime.utcnow()
    })

def semantic_search_with_cache(query, llm_func):
    """Search with semantic caching."""
    # Check cache
    cached_response, original_query = get_cache_key(query)
    
    if cached_response:
        print(f"Cache hit! Original query: {original_query}")
        return cached_response
    
    # Generate response
    response = llm_func(query)
    
    # Cache result
    cache_response(query, response)
    
    return response

Document Chunking Strategies

Effective chunking is crucial for RAG performance.

Strategies

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    PythonCodeTextSplitter
)

# Strategy 1: Recursive character splitting
recursive_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100,
    separators=['\n\n', '\n', ' ', '']
)

# Strategy 2: Markdown-aware splitting
markdown_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ('#', 'Header 1'),
        ('##', 'Header 2'),
        ('###', 'Header 3'),
    ]
)

# Strategy 3: Code-aware splitting
code_splitter = PythonCodeTextSplitter(
    chunk_size=500,
    chunk_overlap=50
)

def smart_chunk(document, content_type='markdown'):
    """Choose appropriate chunking strategy."""
    if content_type == 'markdown':
        docs = markdown_splitter.split_text(document)
    elif content_type == 'code':
        docs = code_splitter.split_text(document)
    else:
        docs = recursive_splitter.split_text(document)
    
    return docs

Evaluation and Monitoring

Ensure your AI application performs well.

RAG Evaluation

def evaluate_rag_system(test_queries, ground_truth, rag_func):
    """Evaluate RAG system with test queries."""
    results = []
    
    for query in test_queries:
        # Get RAG response
        result = rag_func(query)
        
        # Calculate metrics
        retrieved_docs = result['sources']
        response = result['response']
        
        # Simple relevance score
        relevant = sum([
            any(doc in gt for doc in retrieved_docs)
            for gt in ground_truth.get(query, [])
        ])
        
        results.append({
            'query': query,
            'response': response[:100],
            'retrieved': retrieved_docs,
            'relevant_count': relevant,
            'precision': relevant / len(retrieved_docs) if retrieved_docs else 0
        })
    
    # Calculate average metrics
    avg_precision = sum(r['precision'] for r in results) / len(results)
    
    return {
        'results': results,
        'average_precision': avg_precision
    }

Monitoring

from prometheus_client import Counter, Histogram
import time

# Define metrics
QUERY_LATENCY = Histogram(
    'rag_query_latency_seconds',
    'Query latency in seconds',
    ['stage']
)

CACHE_HITS = Counter(
    'rag_cache_hits_total',
    'Total cache hits'
)

LLM_CALLS = Counter(
    'rag_llm_calls_total',
    'Total LLM API calls'
)

def monitored_retrieve(query):
    """Monitored retrieval."""
    with QUERY_LATENCY.labels(stage='retrieve').time():
        return retrieve_context(query)

def monitored_generate(prompt):
    """Monitored generation."""
    LLM_CALLS.inc()
    with QUERY_LATENCY.labels(stage='generate').time():
        return generate_response(prompt)

Production Best Practices

Deploy AI applications reliably.

Scalability

# Connection pooling
from pymongo import MongoClient

client = MongoClient(
    'mongodb://localhost:27017',
    maxPoolSize=50,
    minPoolSize=10,
    maxIdleTimeMS=30000
)

# Use connection pooling in LangChain
vector_store = MongoDBAtlasVectorSearch.from_documents(
    documents=docs,
    embedding=embeddings,
    collection=collection,
    index_name='index',
    connection=client  # Pass pooled connection
)

Error Handling

import logging
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def robust_generate(prompt):
    """Generate with retry logic."""
    try:
        response = openai_client.chat.completions.create(
            model='gpt-4o',
            messages=[{'role': 'user', 'content': prompt}]
        )
        return response.choices[0].message.content
    
    except Exception as e:
        logger.error(f"Generation failed: {e}")
        raise

def safe_rag_pipeline(query):
    """RAG pipeline with error handling."""
    try:
        context = retrieve_context(query)
    except Exception as e:
        logger.error(f"Retrieval failed: {e}")
        context = []
    
    try:
        response = robust_generate(build_prompt(query, context))
    except Exception as e:
        logger.error(f"Generation failed: {e}")
        response = "I apologize, but I encountered an error processing your query."
    
    return {'response': response, 'sources': context}

External Resources

Conclusion

MongoDB provides a robust foundation for AI applications. With native vector search, excellent integration with LangChain, and support for feature stores, MongoDB enables developers to build sophisticated AI-powered applications.

Key takeaways:

  • Vector search enables semantic similarity matching
  • RAG pipelines combine LLM power with your data
  • LangChain integration simplifies AI application development
  • Proper chunking and caching are crucial for performance

By leveraging these capabilities, you can build intelligent applications that understand context, provide accurate responses, and scale to meet demand.

In the final article of this series, we will explore real-world production use cases for MongoDB across various industries and application types.

Comments