Introduction
The intersection of databases and artificial intelligence has created new opportunities for building intelligent applications. MongoDB has positioned itself as an excellent choice for AI applications with native vector search capabilities, integration with popular ML frameworks, and support for the Retrieval-Augmented Generation (RAG) pattern.
This comprehensive guide explores how to leverage MongoDB for AI applications. We will cover vector search, building RAG pipelines, ML feature stores, and integration with frameworks like LangChain.
Understanding Vector Search
Vector search enables semantic similarity matching, going beyond traditional keyword search to understand meaning and context.
What are Embeddings?
Embeddings are numerical representations of data that capture semantic meaning. They are generated by machine learning models and represented as arrays of floating-point numbers.
# Generate embeddings using OpenAI
from openai import OpenAI
client = OpenAI(api_key='your-api-key')
response = client.embeddings.create(
input="The quick brown fox jumps over the lazy dog",
model="text-embedding-3-small"
)
embedding = response.data[0].embedding
print(f"Dimension: {len(embedding)}")
# Output: Dimension: 1536
# Generate embeddings using Hugging Face
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
embedding = model.encode("The quick brown fox")
print(f"Dimension: {len(embedding)}")
# Output: Dimension: 384
MongoDB Vector Search
MongoDB Atlas provides native vector search with the $vectorSearch aggregation stage.
// Create vector index
db.articles.createIndex(
{ embedding: 'knnVector' },
{
knnVector: {
dimension: 1536,
m: 2,
efConstruction: 100
}
}
)
// Insert document with embedding
db.articles.insertOne({
title: "Introduction to Machine Learning",
content: "Machine learning is a subset of artificial intelligence...",
embedding: [0.123, -0.456, 0.789, ...], // 1536-dim vector
author: "Dr. Smith",
published_at: new Date("2026-01-15"),
tags: ["AI", "ML", "Tutorial"]
})
Building RAG Pipelines
Retrieval-Augmented Generation (RAG) combines the power of LLMs with your data.
RAG Architecture
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ Query โโโโโโถโ Retrieval โโโโโโถโ LLM โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ
โโโโโโโดโโโโโโ
โ MongoDB โ
โ (Context) โ
โโโโโโโโโโโโโ
Complete RAG Implementation
from pymongo import MongoClient
from openai import OpenAI
from sentence_transformers import SentenceTransformer
import numpy as np
# Initialize clients
mongo_client = MongoClient('mongodb://localhost:27017')
openai_client = OpenAI(api_key='your-api-key')
embed_model = SentenceTransformer('all-MiniLM-L6-v2')
db = mongo_client['knowledge_base']
collection = db['documents']
def retrieve_context(query, top_k=5):
"""Retrieve relevant documents for a query."""
# Generate query embedding
query_embedding = embed_model.encode(query).tolist()
# Perform vector search
pipeline = [
{
'$vectorSearch': {
'index': 'content_index',
'path': 'embedding',
'queryVector': query_embedding,
'numCandidates': 20,
'limit': top_k
}
},
{
'$project': {
'title': 1,
'content': 1,
'source': 1,
'score': {'$meta': 'vectorSearchScore'}
}
}
]
results = list(collection.aggregate(pipeline))
return results
def generate_response(query, context):
"""Generate response using retrieved context."""
# Build prompt with context
context_text = "\n\n".join([
f"Source: {doc['title']}\n{doc['content'][:500]}"
for doc in context
])
prompt = f"""Based on the following context, answer the question.
Context:
{context_text}
Question: {query}
Answer:"""
# Generate response
response = openai_client.chat.completions.create(
model='gpt-4o',
messages=[{'role': 'user', 'content': prompt}],
temperature=0.7
)
return response.choices[0].message.content
def rag_pipeline(query):
"""Complete RAG pipeline."""
# Step 1: Retrieve
context = retrieve_context(query)
# Step 2: Generate
response = generate_response(query, context)
return {
'response': response,
'sources': [doc['title'] for doc in context]
}
# Execute RAG pipeline
result = rag_pipeline("What is machine learning?")
print(result['response'])
print(f"\nSources: {result['sources']}")
Hybrid Search
Combine vector search with traditional filtering for better results.
// Hybrid search: vector + filter + keyword
const searchResults = await db.products.aggregate([
{
$vectorSearch: {
index: 'product_embedding_index',
path: 'embedding',
queryVector: queryEmbedding,
numCandidates: 100,
limit: 20,
filter: {
category: 'Electronics',
price: { $gte: 100, $lte: 2000 }
}
}
},
{
$match: {
$text: { $search: query }
}
},
{
$limit: 10
},
{
$project: {
name: 1,
description: 1,
price: 1,
vectorScore: { $meta: 'vectorSearchScore' },
textScore: { $meta: 'textSearchScore' },
combinedScore: {
$add: [
{ $multiply: ['$vectorScore', 0.6] },
{ $multiply: ['$textScore', 0.4] }
]
}
}
}
])
Integration with LangChain
LangChain provides excellent integration with MongoDB for building AI applications.
MongoDB as Document Store
from langchain_community.document_loaders import TextLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pymongo import MongoClient
# Initialize MongoDB
client = MongoClient('mongodb://localhost:27017')
db = client['langchain']
collection = db['documents']
# Load and split documents
loader = TextLoader('article.txt')
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100
)
splits = text_splitter.split_documents(documents)
# Create embeddings
embeddings = HuggingFaceEmbeddings(
model_name='sentence-transformers/all-MiniLM-L6-v2'
)
# Create vector store
vector_store = MongoDBAtlasVectorSearch.from_documents(
documents=splits,
embedding=embeddings,
collection=collection,
index_name='content_index'
)
# Perform similarity search
query = "What is machine learning?"
docs = vector_store.similarity_search(query)
print(docs[0].page_content)
RAG Chain with MongoDB
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
# Initialize components
llm = ChatOpenAI(model='gpt-4o', temperature=0)
vector_store = MongoDBAtlasVectorSearch(...) # From previous example
# Create retriever
retriever = vector_store.as_retriever(
search_type='similarity',
search_kwargs={'k': 5}
)
# Create QA chain
prompt_template = """Use the following context to answer the question.
If the answer is not in the context, say so.
Context: {context}
Question: {question}
Answer:"""
prompt = PromptTemplate(
template=prompt_template,
input_variables=['context', 'question']
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type='stuff',
retriever=retriever,
return_source_documents=True
)
# Execute
result = qa_chain.invoke("What is machine learning?")
print(result['result'])
# Check sources
for doc in result['source_documents']:
print(f"Source: {doc.metadata.get('source', 'Unknown')}")
ML Feature Store
MongoDB can serve as a feature store for machine learning workflows.
Feature Store Architecture
from pymongo import MongoClient
import pandas as pd
from datetime import datetime, timedelta
client = MongoClient('mongodb://localhost:27017')
db = client['feature_store']
# Define feature schema
feature_schema = {
'user_id': str,
'timestamp': datetime,
'features': {
'account_age_days': float,
'total_purchases': int,
'avg_order_value': float,
'last_login_days_ago': int,
'device_mobile': bool,
'session_count_30d': int
},
'label': int # For supervised learning
}
def store_features(user_id, features, label=None):
"""Store computed features for a user."""
document = {
'user_id': user_id,
'timestamp': datetime.utcnow(),
'features': features,
}
if label is not None:
document['label'] = label
db.features.insert_one(document)
def get_features(user_id, window_days=30):
"""Retrieve features for a user within time window."""
cutoff = datetime.utcnow() - timedelta(days=window_days)
cursor = db.features.find({
'user_id': user_id,
'timestamp': {'$gte': cutoff}
}).sort('timestamp', -1).limit(1)
return list(cursor)
def compute_training_data(feature_ids):
"""Prepare training dataset."""
data = []
for feature_doc in db.features.find({
'_id': {'$in': feature_ids}
}):
row = {
**feature_doc['features'],
'label': feature_doc.get('label')
}
data.append(row)
return pd.DataFrame(data)
Real-Time Feature Computation
from datetime import datetime
def update_user_features(user_id, event_data):
"""Update features based on user event."""
# Get latest features
latest = db.features.find_one(
{'user_id': user_id},
sort=[('timestamp', -1)]
)
features = latest['features'] if latest else {
'account_age_days': 0,
'total_purchases': 0,
'avg_order_value': 0,
'last_login_days_ago': 0,
'device_mobile': False,
'session_count_30d': 0
}
# Update based on event
if event_data['type'] == 'purchase':
features['total_purchases'] += 1
features['avg_order_value'] = (
(features['avg_order_value'] * (features['total_purchases'] - 1) +
event_data['amount']) / features['total_purchases']
)
elif event_data['type'] == 'login':
features['last_login_days_ago'] = 0
features['session_count_30d'] += 1
# Store updated features
store_features(user_id, features)
Semantic Caching
Reduce LLM costs and latency with semantic caching.
Implementation
import numpy as np
from sentence_transformers import SentenceTransformer
from pymongo import MongoClient
import hashlib
import json
client = MongoClient('mongodb://localhost:27017')
db = client['cache']
embed_model = SentenceTransformer('all-MiniLM-L6-v2')
def get_cache_key(query, threshold=0.95):
"""Generate cache key based on semantic similarity."""
embedding = embed_model.encode(query).tolist()
# Search for similar query in cache
cached = list(db.cache.aggregate([
{
'$vectorSearch': {
'index': 'cache_embedding_index',
'path': 'embedding',
'queryVector': embedding,
'numCandidates': 5,
'limit': 1,
'scoreDetails': True
}
},
{
'$match': {
'score': { '$gte': threshold }
}
},
{
'$project': {
'query': 1,
'response': 1,
'score': 1
}
}
]))
if cached:
return cached[0]['response'], cached[0]['query']
return None, None
def cache_response(query, response):
"""Cache query-response pair."""
embedding = embed_model.encode(query).tolist()
db.cache.insert_one({
'query': query,
'response': response,
'embedding': embedding,
'timestamp': datetime.utcnow()
})
def semantic_search_with_cache(query, llm_func):
"""Search with semantic caching."""
# Check cache
cached_response, original_query = get_cache_key(query)
if cached_response:
print(f"Cache hit! Original query: {original_query}")
return cached_response
# Generate response
response = llm_func(query)
# Cache result
cache_response(query, response)
return response
Document Chunking Strategies
Effective chunking is crucial for RAG performance.
Strategies
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter,
PythonCodeTextSplitter
)
# Strategy 1: Recursive character splitting
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
separators=['\n\n', '\n', ' ', '']
)
# Strategy 2: Markdown-aware splitting
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
('#', 'Header 1'),
('##', 'Header 2'),
('###', 'Header 3'),
]
)
# Strategy 3: Code-aware splitting
code_splitter = PythonCodeTextSplitter(
chunk_size=500,
chunk_overlap=50
)
def smart_chunk(document, content_type='markdown'):
"""Choose appropriate chunking strategy."""
if content_type == 'markdown':
docs = markdown_splitter.split_text(document)
elif content_type == 'code':
docs = code_splitter.split_text(document)
else:
docs = recursive_splitter.split_text(document)
return docs
Evaluation and Monitoring
Ensure your AI application performs well.
RAG Evaluation
def evaluate_rag_system(test_queries, ground_truth, rag_func):
"""Evaluate RAG system with test queries."""
results = []
for query in test_queries:
# Get RAG response
result = rag_func(query)
# Calculate metrics
retrieved_docs = result['sources']
response = result['response']
# Simple relevance score
relevant = sum([
any(doc in gt for doc in retrieved_docs)
for gt in ground_truth.get(query, [])
])
results.append({
'query': query,
'response': response[:100],
'retrieved': retrieved_docs,
'relevant_count': relevant,
'precision': relevant / len(retrieved_docs) if retrieved_docs else 0
})
# Calculate average metrics
avg_precision = sum(r['precision'] for r in results) / len(results)
return {
'results': results,
'average_precision': avg_precision
}
Monitoring
from prometheus_client import Counter, Histogram
import time
# Define metrics
QUERY_LATENCY = Histogram(
'rag_query_latency_seconds',
'Query latency in seconds',
['stage']
)
CACHE_HITS = Counter(
'rag_cache_hits_total',
'Total cache hits'
)
LLM_CALLS = Counter(
'rag_llm_calls_total',
'Total LLM API calls'
)
def monitored_retrieve(query):
"""Monitored retrieval."""
with QUERY_LATENCY.labels(stage='retrieve').time():
return retrieve_context(query)
def monitored_generate(prompt):
"""Monitored generation."""
LLM_CALLS.inc()
with QUERY_LATENCY.labels(stage='generate').time():
return generate_response(prompt)
Production Best Practices
Deploy AI applications reliably.
Scalability
# Connection pooling
from pymongo import MongoClient
client = MongoClient(
'mongodb://localhost:27017',
maxPoolSize=50,
minPoolSize=10,
maxIdleTimeMS=30000
)
# Use connection pooling in LangChain
vector_store = MongoDBAtlasVectorSearch.from_documents(
documents=docs,
embedding=embeddings,
collection=collection,
index_name='index',
connection=client # Pass pooled connection
)
Error Handling
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def robust_generate(prompt):
"""Generate with retry logic."""
try:
response = openai_client.chat.completions.create(
model='gpt-4o',
messages=[{'role': 'user', 'content': prompt}]
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Generation failed: {e}")
raise
def safe_rag_pipeline(query):
"""RAG pipeline with error handling."""
try:
context = retrieve_context(query)
except Exception as e:
logger.error(f"Retrieval failed: {e}")
context = []
try:
response = robust_generate(build_prompt(query, context))
except Exception as e:
logger.error(f"Generation failed: {e}")
response = "I apologize, but I encountered an error processing your query."
return {'response': response, 'sources': context}
External Resources
- MongoDB Atlas Vector Search
- LangChain MongoDB Integration
- OpenAI Embeddings
- Hugging Face Sentence Transformers
Conclusion
MongoDB provides a robust foundation for AI applications. With native vector search, excellent integration with LangChain, and support for feature stores, MongoDB enables developers to build sophisticated AI-powered applications.
Key takeaways:
- Vector search enables semantic similarity matching
- RAG pipelines combine LLM power with your data
- LangChain integration simplifies AI application development
- Proper chunking and caching are crucial for performance
By leveraging these capabilities, you can build intelligent applications that understand context, provide accurate responses, and scale to meet demand.
In the final article of this series, we will explore real-world production use cases for MongoDB across various industries and application types.
Comments