Introduction
Vector search powers semantic understanding. From RAG systems to recommendation engines, understanding how to build and scale vector search is essential for modern AI applications.
Key Statistics:
- Vector search reduces search latency by 90%
- Semantic search outperforms keyword by 30% on complex queries
- Top vector databases handle 1B+ vectors
- Embedding models: 1536 dimensions typical
Vector Search Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Vector Search Pipeline โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Data โโโโโถโ Generateโโโโโถโ Store โโโโโถโ Query โ โ
โ โ (Text) โ โ Embeddinโ โ Vector โ โ Search โ โ
โ โโโโโโโโโโโ โ gs โ โ DB โ โโโโโโฌโโโโโ โ
โ โโโโโโโโโโโ โโโโโโฌโโโโโ โ โ
โ โ โผ โ
โ โ โโโโโโโโโโโโโโโโโโโ โ
โ โ โ Top K Results โ โ
โ โ โโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ Embedding Models โ
โ โโโ OpenAI text-embedding-3 โ
โ โโโ Cohere embed-multilingual โ
โ โโโ Open-source (BGE, MPNet) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Embedding Generation
#!/usr/bin/env python3
"""Embedding generation."""
from openai import OpenAI
import numpy as np
from typing import List, Dict
class EmbeddingGenerator:
"""Generate text embeddings."""
def __init__(self, model: str = "text-embedding-3-small"):
self.client = OpenAI()
self.model = model
self.dimensions = 1536
def embed_text(self, text: str) -> List[float]:
"""Generate embedding for single text."""
response = self.client.embeddings.create(
model=self.model,
input=text
)
return response.data[0].embedding
def embed_batch(self, texts: List[str],
batch_size: int = 100) -> List[List[float]]:
"""Generate embeddings for batch of texts."""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = self.client.embeddings.create(
model=self.model,
input=batch
)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
def embed_documents(self, documents: List[Dict]) -> List[Dict]:
"""Embed documents with metadata."""
results = []
for doc in documents:
# Combine text fields for embedding
text = f"{doc.get('title', '')} {doc.get('content', '')}"
embedding = self.embed_text(text)
results.append({
'id': doc.get('id'),
'embedding': embedding,
'text': text,
'metadata': {
k: v for k, v in doc.items()
if k not in ['id', 'content']
}
})
return results
# Open-source alternative
class OpenSourceEmbedder:
"""Using sentence-transformers."""
def __init__(self, model_name: str = "BAAI/bge-base-en-v1.5"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name)
self.dimensions = self.model.get_sentence_embedding_dimension()
def embed_text(self, text: str) -> List[float]:
return self.model.encode(text).tolist()
def embed_batch(self, texts: List[str]) -> List[List[float]]:
return self.model.encode(texts).tolist()
Vector Database
Pinecone
#!/usr/bin/env python3
"""Pinecone vector database."""
from pinecone import Pinecone, ServerlessSpec
import numpy as np
class PineconeVectorStore:
"""Pinecone vector operations."""
def __init__(self, api_key: str, index_name: str):
self.client = Pinecone(api_key=api_key)
self.index_name = index_name
self.index = None
def create_index(self, dimension: int = 1536):
"""Create index if not exists."""
if self.index_name not in self.client.list_indexes().names():
self.client.create_index(
name=self.index_name,
dimension=dimension,
metric='cosine',
spec=ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
self.index = self.client.Index(self.index_name)
def upsert_vectors(self, vectors: List[Dict]):
"""Insert vectors."""
self.index.upsert(
vectors=[
{
'id': v['id'],
'values': v['embedding'],
'metadata': v.get('metadata', {})
}
for v in vectors
],
namespace='default'
)
def search(self, query_vector: List[float],
top_k: int = 10,
filter_dict: Dict = None,
include_metadata: bool = True) -> List[Dict]:
"""Search for similar vectors."""
results = self.index.query(
vector=query_vector,
top_k=top_k,
filter=filter_dict,
include_metadata=include_metadata,
include_values=False
)
return [
{
'id': match['id'],
'score': match['score'],
'metadata': match.get('metadata', {})
}
for match in results['matches']
]
def delete_vectors(self, ids: List[str]):
"""Delete vectors by ID."""
self.index.delete(ids=ids)
Milvus
#!/usr/bin/env python3
"""Milvus vector database."""
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
class MilvusVectorStore:
"""Milvus vector operations."""
def __init__(self, host: str = 'localhost', port: int = 19530):
connections.connect(host=host, port=port)
self.collection = None
def create_collection(self, name: str, dimension: int = 1536):
"""Create collection with schema."""
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dimension),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="metadata", dtype=DataType.VARCHAR, max_length=4096)
]
schema = CollectionSchema(fields, description=f"Collection {name}")
self.collection = Collection(name, schema)
# Create index
index_params = {
"metric_type": "IP",
"index_type": "IVF_FLAT",
"params": {"nlist": 128}
}
self.collection.create_index(
field_name="vector",
index_params=index_params
)
def insert(self, vectors: List[Dict]):
"""Insert vectors."""
data = [
[v['id'] for v in vectors],
[v['embedding'] for v in vectors],
[v['text'][:65535] for v in vectors],
[str(v.get('metadata', {}))[:4096] for v in vectors]
]
self.collection.insert(data)
self.collection.flush()
def search(self, query_vector: List[float],
top_k: int = 10) -> List[Dict]:
"""Search vectors."""
self.collection.load()
results = self.collection.search(
data=[query_vector],
anns_field="vector",
param={"metric_type": "IP", "params": {"nprobe": 10}},
limit=top_k,
output_fields=["id", "text", "metadata"]
)
return [
{
'id': hit['id'],
'score': hit['distance'],
'text': hit['text'],
'metadata': eval(hit['metadata'])
}
for hit in results[0]
]
Semantic Search Application
#!/usr/bin/env python3
"""Complete RAG application."""
class SemanticSearchApp:
"""End-to-end semantic search."""
def __init__(self, embedding_generator, vector_store):
self.embedder = embedding_generator
self.store = vector_store
def index_documents(self, documents: List[Dict]):
"""Index documents for search."""
# Generate embeddings
embedded_docs = self.embedder.embed_documents(documents)
# Store in vector DB
self.store.upsert_vectors(embedded_docs)
return len(embedded_docs)
def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""Search documents."""
# Generate query embedding
query_embedding = self.embedder.embed_text(query)
# Search vector DB
results = self.store.search(query_embedding, top_k=top_k)
return results
def search_with_rerank(self, query: str,
top_k: int = 20,
rerank_to: int = 5) -> List[Dict]:
"""Search with reranking."""
# Initial search
results = self.search(query, top_k=top_k)
# Rerank using cross-encoder
from sentence_transformers import CrossEncoder
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
# Create query-doc pairs
pairs = [(query, r['text']) for r in results]
# Get scores
scores = cross_encoder.predict(pairs)
# Reorder by score
results = [
{**r, 'rerank_score': float(scores[i])}
for i, r in enumerate(results)
]
results.sort(key=lambda x: x['rerank_score'], reverse=True)
return results[:rerank_to]
Scaling Strategies
#!/usr/bin/env python3
"""Scaling vector search."""
class VectorSearchScaler:
"""Scale vector search horizontally."""
def __init__(self):
self.shards = []
def partition_data(self, vectors: List[Dict],
num_shards: int) -> List[List[Dict]]:
"""Partition vectors across shards."""
# Simple round-robin partitioning
shards = [[] for _ in range(num_shards)]
for i, v in enumerate(vectors):
shard_idx = i % num_shards
shards[shard_idx].append(v)
return shards
def federated_search(self, query: List[float],
shards: List) -> List[Dict]:
"""Search across shards and merge results."""
import heapq
# Search each shard
all_results = []
for shard in shards:
results = shard.search(query, top_k=10)
all_results.extend(results)
# Merge and dedupe
seen_ids = set()
merged = []
for r in sorted(all_results, key=lambda x: x['score'], reverse=True):
if r['id'] not in seen_ids:
seen_ids.add(r['id'])
merged.append(r)
return merged[:10]
Comments