Skip to main content
โšก Calmops

Vector Search at Scale: Building Semantic Search Systems

Introduction

Vector search powers semantic understanding. From RAG systems to recommendation engines, understanding how to build and scale vector search is essential for modern AI applications.

Key Statistics:

  • Vector search reduces search latency by 90%
  • Semantic search outperforms keyword by 30% on complex queries
  • Top vector databases handle 1B+ vectors
  • Embedding models: 1536 dimensions typical

Vector Search Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Vector Search Pipeline                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚  โ”‚  Data   โ”‚โ”€โ”€โ”€โ–ถโ”‚ Generateโ”‚โ”€โ”€โ”€โ–ถโ”‚  Store  โ”‚โ”€โ”€โ”€โ–ถโ”‚  Query  โ”‚     โ”‚
โ”‚  โ”‚ (Text)  โ”‚    โ”‚ Embeddinโ”‚    โ”‚ Vector  โ”‚    โ”‚ Search  โ”‚     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚   gs    โ”‚    โ”‚   DB    โ”‚    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚                 โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜          โ”‚          โ”‚
โ”‚                                      โ”‚             โ–ผ          โ”‚
โ”‚                                      โ”‚     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚                                      โ”‚     โ”‚ Top K Results   โ”‚ โ”‚
โ”‚                                      โ”‚     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚                                      โ”‚                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ”‚
โ”‚  โ”‚                                                         โ”‚
โ”‚  โ–ผ                                                         โ”‚
โ”‚  Embedding Models                                          โ”‚
โ”‚  โ”œโ”€โ”€ OpenAI text-embedding-3                              โ”‚
โ”‚  โ”œโ”€โ”€ Cohere embed-multilingual                            โ”‚
โ”‚  โ””โ”€โ”€ Open-source (BGE, MPNet)                            โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Embedding Generation

#!/usr/bin/env python3
"""Embedding generation."""

from openai import OpenAI
import numpy as np
from typing import List, Dict

class EmbeddingGenerator:
    """Generate text embeddings."""
    
    def __init__(self, model: str = "text-embedding-3-small"):
        self.client = OpenAI()
        self.model = model
        self.dimensions = 1536
    
    def embed_text(self, text: str) -> List[float]:
        """Generate embedding for single text."""
        
        response = self.client.embeddings.create(
            model=self.model,
            input=text
        )
        
        return response.data[0].embedding
    
    def embed_batch(self, texts: List[str], 
                   batch_size: int = 100) -> List[List[float]]:
        """Generate embeddings for batch of texts."""
        
        all_embeddings = []
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            
            response = self.client.embeddings.create(
                model=self.model,
                input=batch
            )
            
            embeddings = [item.embedding for item in response.data]
            all_embeddings.extend(embeddings)
        
        return all_embeddings
    
    def embed_documents(self, documents: List[Dict]) -> List[Dict]:
        """Embed documents with metadata."""
        
        results = []
        
        for doc in documents:
            # Combine text fields for embedding
            text = f"{doc.get('title', '')} {doc.get('content', '')}"
            
            embedding = self.embed_text(text)
            
            results.append({
                'id': doc.get('id'),
                'embedding': embedding,
                'text': text,
                'metadata': {
                    k: v for k, v in doc.items()
                    if k not in ['id', 'content']
                }
            })
        
        return results

# Open-source alternative
class OpenSourceEmbedder:
    """Using sentence-transformers."""
    
    def __init__(self, model_name: str = "BAAI/bge-base-en-v1.5"):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer(model_name)
        self.dimensions = self.model.get_sentence_embedding_dimension()
    
    def embed_text(self, text: str) -> List[float]:
        return self.model.encode(text).tolist()
    
    def embed_batch(self, texts: List[str]) -> List[List[float]]:
        return self.model.encode(texts).tolist()

Vector Database

Pinecone

#!/usr/bin/env python3
"""Pinecone vector database."""

from pinecone import Pinecone, ServerlessSpec
import numpy as np

class PineconeVectorStore:
    """Pinecone vector operations."""
    
    def __init__(self, api_key: str, index_name: str):
        self.client = Pinecone(api_key=api_key)
        self.index_name = index_name
        self.index = None
    
    def create_index(self, dimension: int = 1536):
        """Create index if not exists."""
        
        if self.index_name not in self.client.list_indexes().names():
            self.client.create_index(
                name=self.index_name,
                dimension=dimension,
                metric='cosine',
                spec=ServerlessSpec(
                    cloud='aws',
                    region='us-east-1'
                )
            )
        
        self.index = self.client.Index(self.index_name)
    
    def upsert_vectors(self, vectors: List[Dict]):
        """Insert vectors."""
        
        self.index.upsert(
            vectors=[
                {
                    'id': v['id'],
                    'values': v['embedding'],
                    'metadata': v.get('metadata', {})
                }
                for v in vectors
            ],
            namespace='default'
        )
    
    def search(self, query_vector: List[float], 
              top_k: int = 10,
              filter_dict: Dict = None,
              include_metadata: bool = True) -> List[Dict]:
        """Search for similar vectors."""
        
        results = self.index.query(
            vector=query_vector,
            top_k=top_k,
            filter=filter_dict,
            include_metadata=include_metadata,
            include_values=False
        )
        
        return [
            {
                'id': match['id'],
                'score': match['score'],
                'metadata': match.get('metadata', {})
            }
            for match in results['matches']
        ]
    
    def delete_vectors(self, ids: List[str]):
        """Delete vectors by ID."""
        
        self.index.delete(ids=ids)

Milvus

#!/usr/bin/env python3
"""Milvus vector database."""

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType

class MilvusVectorStore:
    """Milvus vector operations."""
    
    def __init__(self, host: str = 'localhost', port: int = 19530):
        connections.connect(host=host, port=port)
        self.collection = None
    
    def create_collection(self, name: str, dimension: int = 1536):
        """Create collection with schema."""
        
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
            FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dimension),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
            FieldSchema(name="metadata", dtype=DataType.VARCHAR, max_length=4096)
        ]
        
        schema = CollectionSchema(fields, description=f"Collection {name}")
        self.collection = Collection(name, schema)
        
        # Create index
        index_params = {
            "metric_type": "IP",
            "index_type": "IVF_FLAT",
            "params": {"nlist": 128}
        }
        
        self.collection.create_index(
            field_name="vector",
            index_params=index_params
        )
    
    def insert(self, vectors: List[Dict]):
        """Insert vectors."""
        
        data = [
            [v['id'] for v in vectors],
            [v['embedding'] for v in vectors],
            [v['text'][:65535] for v in vectors],
            [str(v.get('metadata', {}))[:4096] for v in vectors]
        ]
        
        self.collection.insert(data)
        self.collection.flush()
    
    def search(self, query_vector: List[float], 
               top_k: int = 10) -> List[Dict]:
        """Search vectors."""
        
        self.collection.load()
        
        results = self.collection.search(
            data=[query_vector],
            anns_field="vector",
            param={"metric_type": "IP", "params": {"nprobe": 10}},
            limit=top_k,
            output_fields=["id", "text", "metadata"]
        )
        
        return [
            {
                'id': hit['id'],
                'score': hit['distance'],
                'text': hit['text'],
                'metadata': eval(hit['metadata'])
            }
            for hit in results[0]
        ]

Semantic Search Application

#!/usr/bin/env python3
"""Complete RAG application."""

class SemanticSearchApp:
    """End-to-end semantic search."""
    
    def __init__(self, embedding_generator, vector_store):
        self.embedder = embedding_generator
        self.store = vector_store
    
    def index_documents(self, documents: List[Dict]):
        """Index documents for search."""
        
        # Generate embeddings
        embedded_docs = self.embedder.embed_documents(documents)
        
        # Store in vector DB
        self.store.upsert_vectors(embedded_docs)
        
        return len(embedded_docs)
    
    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """Search documents."""
        
        # Generate query embedding
        query_embedding = self.embedder.embed_text(query)
        
        # Search vector DB
        results = self.store.search(query_embedding, top_k=top_k)
        
        return results
    
    def search_with_rerank(self, query: str, 
                         top_k: int = 20, 
                         rerank_to: int = 5) -> List[Dict]:
        """Search with reranking."""
        
        # Initial search
        results = self.search(query, top_k=top_k)
        
        # Rerank using cross-encoder
        from sentence_transformers import CrossEncoder
        
        cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        
        # Create query-doc pairs
        pairs = [(query, r['text']) for r in results]
        
        # Get scores
        scores = cross_encoder.predict(pairs)
        
        # Reorder by score
        results = [
            {**r, 'rerank_score': float(scores[i])}
            for i, r in enumerate(results)
        ]
        
        results.sort(key=lambda x: x['rerank_score'], reverse=True)
        
        return results[:rerank_to]

Scaling Strategies

#!/usr/bin/env python3
"""Scaling vector search."""

class VectorSearchScaler:
    """Scale vector search horizontally."""
    
    def __init__(self):
        self.shards = []
    
    def partition_data(self, vectors: List[Dict], 
                      num_shards: int) -> List[List[Dict]]:
        """Partition vectors across shards."""
        
        # Simple round-robin partitioning
        shards = [[] for _ in range(num_shards)]
        
        for i, v in enumerate(vectors):
            shard_idx = i % num_shards
            shards[shard_idx].append(v)
        
        return shards
    
    def federated_search(self, query: List[float], 
                        shards: List) -> List[Dict]:
        """Search across shards and merge results."""
        
        import heapq
        
        # Search each shard
        all_results = []
        for shard in shards:
            results = shard.search(query, top_k=10)
            all_results.extend(results)
        
        # Merge and dedupe
        seen_ids = set()
        merged = []
        
        for r in sorted(all_results, key=lambda x: x['score'], reverse=True):
            if r['id'] not in seen_ids:
                seen_ids.add(r['id'])
                merged.append(r)
        
        return merged[:10]

External Resources


Comments