Skip to main content
โšก Calmops

Redis for AI and Vector Search: Building Intelligent Applications

Introduction

Redis has emerged as a critical component in AI applications, serving as a vector database, semantic cache, and session store for LLM-powered applications. This comprehensive guide explores how to leverage Redis for building intelligent AI systems in 2026.


Why Redis for AI?

Advantages

Feature Benefit
Vector Search Native similarity search
Low Latency Sub-ms response for real-time AI
Rich Data Types Store embeddings + metadata
Redis Stack Built-in search, JSON, AI modules
Mature Ecosystem Well-documented, wide client support

Vector Search with Redis

Understanding Vector Embeddings

# Embeddings are numerical representations of data
# Text, images, audio converted to vectors

# Example: 384-dimensional embedding
embedding = [0.023, -0.145, 0.892, ..., 0.001]  # 384 floats

# Similar items have similar vectors
# Cosine similarity measures similarity
# Euclidean distance also common

Setting Up Redis Stack

# Start Redis with modules
docker run -d --name redis-stack \
  -p 6379:6379 \
  -p 8001:8001 \
  redis/redis-stack:latest

# Verify vector search module
redis-cli INFO modules | grep -i search

Creating Vector Index

import numpy as np
import redis
from redis.commands.search.field import TextField, VectorField, NumericField
from redis.commands.search.indexDefinition import IndexDefinition

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Create vector index
try:
    r.ft('idx:docs').create_index(
        [
            TextField('title'),
            TextField('content'),
            TextField('metadata'),
            VectorField(
                'embedding',
                'FLAT',  # Algorithm: FLAT or HNSW
                {
                    'TYPE': 'FLOAT32',
                    'DIM': 384,  # Embedding dimension
                    'DISTANCE_METRIC': 'COSINE'  # or EUCLIDEAN, IP
                }
            )
        ],
        definition=IndexDefinition(prefix=['doc:'])
    )
    print("Index created successfully")
except Exception as e:
    print(f"Index may already exist: {e}")

Indexing Documents

def get_embedding(text):
    """Generate embedding using any model"""
    # Using sentence-transformers, OpenAI, etc.
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer('all-MiniLM-L6-v2')
    embedding = model.encode(text)
    return embedding.astype(np.float32).tobytes()

# Index a document
def index_document(doc_id, title, content, metadata=None):
    embedding = get_embedding(content)
    
    r.ft('idx:docs').add_document(
        f'doc:{doc_id}',
        title=title,
        content=content,
        metadata=str(metadata or {}),
        embedding=embedding
    )

# Index multiple documents
for doc in documents:
    index_document(doc['id'], doc['title'], doc['content'], doc.get('meta'))
def semantic_search(query, top_k=5):
    """Search for similar documents"""
    query_embedding = get_embedding(query)
    
    results = r.ft('idx:docs').search(
        f'*=>[KNN {top_k} @embedding $vector AS score]',
        query_params={
            'vector': np.array(query_embedding).astype(np.float32).tobytes()
        },
        sortby='score',
        include_scores=True
    )
    
    return [
        {
            'id': doc.id,
            'title': doc.title,
            'content': doc.content[:200],
            'score': float(doc.score)
        }
        for doc in results.docs
    ]

# Usage
results = semantic_search("How to implement caching in Python")
for r in results:
    print(f"Score: {r['score']:.3f} - {r['title']}")
# Combine vector search with keyword filtering
def hybrid_search(query, category=None, top_k=10):
    """Vector + keyword search"""
    
    base_query = f'(@category:{category}) ' if category else '*'
    query_embedding = get_embedding(query)
    
    search_query = (
        f'{base_query}'
        f'=>[KNN {top_k} @embedding $vector AS score]'
    )
    
    results = r.ft('idx:hybrid').search(
        search_query,
        query_params={'vector': query_embedding},
        sortby='score',
        include_scores=True,
        with_payloads=True
    )
    
    return results

# Range filtering
def search_with_filter(query, min_score=0.5, max_score=1.0):
    results = r.ft('idx:docs').search(
        f'*=>[KNN 10 @embedding $vector AS score]',
        query_params={'vector': get_embedding(query)},
        filter=f'@score=[{min_score} {max_score}]'
    )
    return results

RAG Pipeline Implementation

Retrieval-Augmented Generation

from typing import List, Dict, Any

class RAGPipeline:
    def __init__(self, redis_client, embedding_model):
        self.redis = redis_client
        self.model = embedding_model
        self.index_name = 'rag:documents'
    
    def index_document(self, doc_id: str, content: str, metadata: Dict):
        """Index a document for retrieval"""
        embedding = self.model.encode(content)
        
        self.redis.ft(self.index_name).add_document(
            f'doc:{doc_id}',
            content=content,
            metadata=json.dumps(metadata),
            source=metadata.get('source', 'unknown'),
            embedding=embedding.astype(np.float32).tobytes()
        )
    
    def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
        """Retrieve relevant documents"""
        query_embedding = self.model.encode(query)
        
        results = self.redis.ft(self.index_name).search(
            f'*=>[KNN {top_k} @embedding $vector AS score]',
            query_params={
                'vector': query_embedding.astype(np.float32).tobytes()
            },
            include_scores=True
        )
        
        return [
            {
                'content': doc.content,
                'metadata': json.loads(doc.metadata),
                'score': float(doc.score)
            }
            for doc in results.docs
        ]
    
    def generate_response(self, query: str, llm_client) -> Dict:
        """RAG: retrieve + generate"""
        # 1. Retrieve relevant context
        context_docs = self.retrieve(query, top_k=5)
        
        # 2. Build prompt
        context = '\n\n'.join([d['content'] for d in context_docs])
        prompt = f"""Answer based on the following context:

Context:
{context}

Question: {query}

Answer:"""
        
        # 3. Generate response
        response = llm_client.complete(prompt)
        
        return {
            'answer': response,
            'sources': [d['metadata'] for d in context_docs]
        }

Complete RAG Example

# Complete implementation
from sentence_transformers import SentenceTransformer
import openai

# Setup
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
redis_client = redis.Redis(host='localhost', port=6379)
openai.api_key = os.getenv('OPENAI_API_KEY')

# Create pipeline
rag = RAGPipeline(redis_client, embedding_model)

# Index sample documents
documents = [
    {
        'id': '1',
        'content': 'Redis is an in-memory data structure store...',
        'metadata': {'source': 'redis-docs', 'topic': 'redis'}
    },
    {
        'id': '2', 
        'content': 'Vector embeddings represent data as numerical vectors...',
        'metadata': {'source': 'ml-basics', 'topic': 'embeddings'}
    }
]

for doc in documents:
    rag.index_document(doc['id'], doc['content'], doc['metadata'])

# Query
result = rag.generate_response(
    query="What is Redis and how does it work?",
    llm_client=openai
)

print(result['answer'])
print("\nSources:", result['sources'])

Semantic Caching for LLMs

Cache LLM Responses

import hashlib
import json

class LLMCache:
    def __init__(self, redis_client, ttl=3600):
        self.redis = redis_client
        self.ttl = ttl
        self.prefix = 'llm:cache:'
    
    def _make_key(self, prompt: str) -> str:
        """Create cache key from prompt"""
        # Use hash for consistent key length
        prompt_hash = hashlib.sha256(prompt.encode()).hexdigest()
        return f"{self.prefix}{prompt_hash}"
    
    def get(self, prompt: str) -> str:
        """Get cached response"""
        key = self._make_key(prompt)
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None
    
    def set(self, prompt: str, response: str):
        """Cache response"""
        key = self._make_key(prompt)
        self.redis.setex(key, self.ttl, json.dumps(response))
    
    def clear(self):
        """Clear all cached responses"""
        cursor = 0
        while True:
            cursor, keys = self.redis.scan(
                cursor, match=f"{self.prefix}*", count=100
            )
            if keys:
                self.redis.delete(*keys)
            if cursor == 0:
                break


# Usage
cache = LLMCache(redis_client, ttl=3600)  # 1 hour TTL

def generate_with_cache(prompt: str) -> str:
    # Check cache first
    cached = cache.get(prompt)
    if cached:
        print("Cache hit!")
        return cached
    
    # Cache miss - call LLM
    print("Cache miss - calling LLM...")
    response = openai.Completion.create(
        model='gpt-4',
        prompt=prompt
    )['choices'][0]['text']
    
    # Cache the response
    cache.set(prompt, response)
    
    return response

Semantic Cache (Vector-Based)

class SemanticCache:
    """Cache based on semantic similarity"""
    
    def __init__(self, redis_client, embedding_model, threshold=0.95):
        self.redis = redis_client
        self.model = embedding_model
        self.threshold = threshold
        self.index_name = 'semantic:cache'
    
    def _init_index(self):
        """Initialize cache index"""
        try:
            self.redis.ft(self.index_name).create_index(
                [
                    TextField('prompt'),
                    VectorField(
                        'embedding',
                        'FLAT',
                        {'TYPE': 'FLOAT32', 'DIM': 384, 'DISTANCE_METRIC': 'COSINE'}
                    )
                ],
                definition=IndexDefinition(prefix=['cache:'])
            )
        except:
            pass
    
    def get_or_none(self, prompt: str) -> str:
        """Find similar cached prompt"""
        embedding = self.model.encode(prompt)
        
        results = self.redis.ft(self.index_name).search(
            f'*=>[KNN 1 @embedding $vector AS score]',
            query_params={
                'vector': embedding.astype(np.float32).tobytes()
            },
            filter='@score>=[$threshold]'.replace('$threshold', str(self.threshold))
        )
        
        if results.docs:
            cached_prompt = results.docs[0].prompt
            cached_response = self.redis.get(f'cache:response:{cached_prompt}')
            return cached_response
        
        return None
    
    def set(self, prompt: str, response: str):
        """Cache with embedding"""
        embedding = self.model.encode(prompt)
        
        self.redis.ft(self.index_name).add_document(
            f'cache:{prompt}',
            prompt=prompt,
            embedding=embedding.astype(np.float32).tobytes()
        )
        
        self.redis.set(f'cache:response:{prompt}', response, ex=3600)

Session Management for AI Applications

Chat Session Storage

import json
from datetime import timedelta

class AIChatSession:
    def __init__(self, redis_client, session_ttl=3600):
        self.redis = redis_client
        self.ttl = session_ttl
        self.prefix = 'chat:session:'
    
    def create_session(self, user_id: str) -> str:
        """Create new chat session"""
        import uuid
        session_id = str(uuid.uuid4())
        
        session_data = {
            'id': session_id,
            'user_id': user_id,
            'messages': [],
            'created_at': str(datetime.utcnow())
        }
        
        key = f"{self.prefix}{session_id}"
        self.redis.setex(key, self.ttl, json.dumps(session_data))
        
        return session_id
    
    def add_message(self, session_id: str, role: str, content: str):
        """Add message to session"""
        key = f"{self.prefix}{session_id}"
        session = json.loads(self.redis.get(key))
        
        session['messages'].append({
            'role': role,
            'content': content,
            'timestamp': str(datetime.utcnow())
        })
        
        self.redis.setex(key, self.ttl, json.dumps(session))
    
    def get_messages(self, session_id: str) -> List[Dict]:
        """Get all session messages"""
        key = f"{self.prefix}{session_id}"
        session = self.redis.get(key)
        
        if session:
            return json.loads(session)['messages']
        return []
    
    def build_context(self, session_id: str, max_tokens=4000):
        """Build context for LLM from session history"""
        messages = self.get_messages(session_id)
        
        context = []
        total_tokens = 0
        
        # Build messages in reverse (newest first)
        for msg in reversed(messages):
            msg_tokens = len(msg['content']) // 4  # Rough estimate
            if total_tokens + msg_tokens > max_tokens:
                break
            
            context.insert(0, msg)
            total_tokens += msg_tokens
        
        return context

Token Usage Tracking

class TokenTracker:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def track_usage(self, user_id: str, prompt_tokens: int, completion_tokens: int):
        """Track token usage"""
        today = datetime.utcnow().strftime('%Y-%m-%d')
        
        # Daily usage
        daily_key = f"tokens:daily:{user_id}:{today}"
        self.redis.incrby(daily_key, prompt_tokens + completion_tokens)
        self.redis.expire(daily_key, 86400 * 30)  # 30 days
        
        # Total usage
        total_key = f"tokens:total:{user_id}"
        self.redis.incrby(total_key, prompt_tokens + completion_tokens)
    
    def get_daily_usage(self, user_id: str) -> int:
        """Get today's token usage"""
        today = datetime.utcnow().strftime('%Y-%m-%d')
        daily_key = f"tokens:daily:{user_id}:{today}"
        return int(self.redis.get(daily_key) or 0)
    
    def get_monthly_usage(self, user_id: str) -> int:
        """Get monthly token usage"""
        total = 0
        for i in range(30):
            date = (datetime.utcnow() - timedelta(days=i)).strftime('%Y-%m-%d')
            daily_key = f"tokens:daily:{user_id}:{date}"
            total += int(self.redis.get(daily_key) or 0)
        return total

Feature Store for ML

Store Precomputed Features

class FeatureStore:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.prefix = 'features:'
    
    def store_user_features(self, user_id: str, features: Dict):
        """Store precomputed user features"""
        key = f"{self.prefix}user:{user_id}"
        self.redis.setex(key, 3600, json.dumps(features))
    
    def get_user_features(self, user_id: str) -> Dict:
        """Retrieve user features"""
        key = f"{self.prefix}user:{user_id}"
        data = self.redis.get(key)
        return json.loads(data) if data else None
    
    def store_batch_features(self, user_ids: List[str], feature_dict: Dict):
        """Store features for multiple users"""
        pipe = self.redis.pipeline()
        
        for user_id in user_ids:
            features = feature_dict.get(user_id, {})
            key = f"{self.prefix}user:{user_id}"
            pipe.setex(key, 3600, json.dumps(features))
        
        pipe.execute()


# Example: User embedding features
def compute_user_features(user_id: str) -> Dict:
    """Compute features for ML model"""
    # This would connect to your feature computation pipeline
    return {
        'login_frequency': 10,
        'avg_session_duration': 300,
        'items_viewed': 50,
        'purchase_count': 5,
        'last_activity_days': 1
    }

# Store features
feature_store = FeatureStore(redis_client)
for user_id in user_batch:
    features = compute_user_features(user_id)
    feature_store.store_user_features(user_id, features)

RedisVL: Official Vector Library

Using RedisVL

pip install redisvl
from redisvl import SearchIndex
from redisvl.schema import Schema, Field, TextField, VectorField
from redisvl.query import VectorQuery

# Define schema
schema = Schema(
    TextField("content"),
    TextField("metadata"),
    VectorField(
        "embedding",
        "flat",
        {"type": "float32", "dim": 384, "distance_metric": "cosine"}
    )
)

# Create index
index = SearchIndex.from_schema(schema, name="my-index", redis_client=r)
index.create()

# Index documents
index.load([
    {"content": "Redis is fast", "embedding": [0.1] * 384},
    {"content": "Vectors are cool", "embedding": [0.2] * 384}
])

# Search
results = index.query(
    VectorQuery(
        "search text",
        "embedding",
        num_results=5,
        distance_metric="cosine"
    )
)

Best Practices

Performance Tips

# 1. Use appropriate vector dimensions
# Match your embedding model's dimensions exactly

# 2. Choose correct distance metric
# COSINE: Best for normalized vectors
# IP (Inner Product): Best for unnormalized
# EUCLIDEAN: Best for absolute distances

# 3. Index configuration
# HNSW: Faster search, more memory
# FLAT: Slower but exact results

# 4. Batch indexing
for batch in chunks(documents, 100):
    pipeline = r.pipeline()
    for doc in batch:
        pipeline.ft('idx').add_document(...)
    pipeline.execute()

Security

# Enable authentication
# redis.conf: requirepass your_password

# Use TLS
r = redis.Redis(
    host='localhost',
    port=6379,
    ssl=True,
    ssl_cert_reqs='required'
)

# Limit key patterns
# Use ACL for fine-grained permissions

Architecture Examples

Complete AI Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Frontend  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  API Server โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚    Redis    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚  (Vectors) โ”‚
                          โ”‚              โ”‚  (Cache)   โ”‚
                          โ–ผ              โ”‚  (Session) โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                    โ”‚     LLM     โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Redis serves:
- Vector search for RAG
- Semantic cache for LLM responses  
- Session storage for chat history
- Token usage tracking

Multi-Model RAG

# Switch embedding models based on use case
class MultiModelRAG:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.models = {
            'fast': SentenceTransformer('all-MiniLM-L6-v2'),
            'accurate': SentenceTransformer('all-mpnet-base-v2'),
            'multilingual': SentenceTransformer('paraphrase-multilingual-mpnet')
        }
    
    def index_with_model(self, content: str, model_name: str = 'fast'):
        model = self.models[model_name]
        embedding = model.encode(content)
        # Store in Redis with model tag

Resources


Conclusion

Redis has become essential for AI applications, providing the performance and flexibility needed for modern LLM-powered systems. From vector search enabling RAG pipelines to semantic caching reducing LLM costs, Redis serves as the data backbone for intelligent applications.

The combination of Redis Stack, native vector support, and existing Redis patterns makes it an excellent choice for building production AI systems in 2026 and beyond.

Comments