Skip to main content
โšก Calmops

DuckDB for AI: Vector Search, ML Pipelines, and RAG Implementation

Introduction

DuckDB’s support for vector similarity search through the vss extension enables AI applications directly within your analytical workflows. Combined with its excellent Python integration, DuckDB becomes a powerful tool for building ML pipelines, feature engineering, and retrieval-augmented generation (RAG) systems.

This comprehensive guide explores how to leverage DuckDB for AI applications, from embedding storage to production ML pipelines.


Setting Up VSS Extension

-- Install and load vector search extension
INSTALL vss;
LOAD vss;

-- Check vss is available
SELECT * FROM duckdb_extensions() WHERE extension_name = 'vss';

Creating Vector Tables

-- Create table with vector column
CREATE TABLE document_embeddings (
    id INTEGER PRIMARY KEY,
    document_id INTEGER,
    text_content TEXT,
    embedding FLOAT[384]  -- 384 dimensions for MiniLM
);

-- Create HNSW index for fast search
CREATE INDEX idx_embedding ON document_embeddings 
USING HNSW (embedding)
WITH (ef_construction = 200, ef_search = 50);

Inserting Vectors

import duckdb
import numpy as np
from sentence_transformers import SentenceTransformer

# Connect to DuckDB
con = duckdb.connect('vectors.db')
con.execute("INSTALL vss")
con.execute("LOAD vss")

# Create table
con.execute("""
    CREATE TABLE IF NOT EXISTS embeddings (
        id INTEGER,
        text TEXT,
        embedding FLOAT[384]
    )
""")

# Create index
con.execute("""
    CREATE INDEX IF NOT EXISTS idx_embedding 
    ON embeddings USING HNSW (embedding)
""")

# Generate embeddings using sentence-transformers
model = SentenceTransformer('all-MiniLM-L6-v2')

documents = [
    (1, "Python is a high-level programming language"),
    (2, "Machine learning is a subset of AI"),
    (3, "Data science combines statistics and programming"),
    (4, "Neural networks are inspired by biological brains"),
    (5, "Natural language processing deals with text data")
]

for doc_id, text in documents:
    embedding = model.encode(text)
    # Convert numpy array to list
    embedding_list = embedding.tolist()
    con.execute(
        "INSERT INTO embeddings VALUES (?, ?, ?)",
        (doc_id, text, embedding_list)
    )

print("Inserted embeddings")

Vector Similarity Search

-- Cosine distance search
SELECT 
    id,
    text,
    array_cosine_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;

-- Euclidean distance
SELECT 
    id,
    text,
    array_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;

-- Filtered search
SELECT 
    id,
    text,
    array_cosine_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
WHERE array_cosine_distance(embedding, [0.1, 0.2, ...]) < 0.5
ORDER BY distance
LIMIT 5;

Building RAG Pipelines

RAG Architecture with DuckDB

import duckdb
import numpy as np
from sentence_transformers import SentenceTransformer
import openai

class DuckDBRAG:
    """RAG pipeline using DuckDB for vector storage."""
    
    def __init__(self, db_path='rag.db'):
        self.con = duckdb.connect(db_path)
        self.con.execute("INSTALL vss")
        self.con.execute("LOAD vss")
        self._init_schema()
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
    
    def _init_schema(self):
        """Initialize database schema."""
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS chunks (
                id INTEGER PRIMARY KEY,
                document_id INTEGER,
                chunk_text TEXT,
                chunk_index INTEGER,
                embedding FLOAT[384]
            )
        """)
        
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS documents (
                id INTEGER PRIMARY KEY,
                title TEXT,
                source TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        # Create HNSW index
        self.con.execute("""
            CREATE INDEX IF NOT EXISTS idx_chunks_embedding 
            ON chunks USING HNSW (embedding)
        """)
    
    def ingest_document(self, document_id, title, source, text, chunk_size=500):
        """Ingest document with chunking and embeddings."""
        # Split into chunks
        chunks = [text[i:i+chunk_size] 
                  for i in range(0, len(text), chunk_size)]
        
        # Insert document metadata
        self.con.execute(
            "INSERT INTO documents (id, title, source) VALUES (?, ?, ?)",
            (document_id, title, source)
        )
        
        # Generate embeddings and insert chunks
        for i, chunk in enumerate(chunks):
            embedding = self.model.encode(chunk)
            self.con.execute("""
                INSERT INTO chunks (document_id, chunk_text, chunk_index, embedding)
                VALUES (?, ?, ?, ?)
            """, (document_id, chunk, i, embedding.tolist()))
        
        self.con.commit()
        return len(chunks)
    
    def retrieve(self, query, top_k=5):
        """Retrieve relevant chunks for query."""
        query_embedding = self.model.encode(query)
        
        results = self.con.execute("""
            SELECT 
                chunk_text,
                document_id,
                array_cosine_distance(embedding, ?) as distance
            FROM chunks
            ORDER BY distance
            LIMIT ?
        """, (query_embedding.tolist(), top_k)).fetchall()
        
        return results
    
    def answer(self, question, max_context=2000):
        """Answer question using RAG."""
        # Retrieve relevant chunks
        chunks = self.retrieve(question, top_k=10)
        
        # Build context
        context = ""
        for chunk, doc_id, distance in chunks:
            if len(context) + len(chunk) > max_context:
                break
            context += chunk + "\n\n"
        
        # Generate answer using LLM
        prompt = f"""Based on the following context, answer the question.

Context:
{context}

Question: {question}

Answer:"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content
    
    def close(self):
        """Close connection."""
        self.con.close()

# Usage
rag = DuckDBRAG('knowledge.db')

# Ingest documents
with open('article.txt') as f:
    rag.ingest_document(1, 'AI Article', 'article.txt', f.read())

# Ask questions
answer = rag.answer("What is machine learning?")
print(answer)

rag.close()

Feature Engineering

ML Feature Creation

import duckdb
import numpy as np

class FeatureEngineer:
    """Feature engineering with DuckDB."""
    
    def __init__(self, db_path='features.db'):
        self.con = duckdb.connect(db_path)
    
    def create_user_features(self, events_df):
        """Create user-level features from events."""
        # Register DataFrame as view
        self.con.execute("CREATE OR REPLACE VIEW events AS SELECT * FROM events_df")
        
        # Create features
        self.con.execute("""
            CREATE TABLE user_features AS
            SELECT 
                user_id,
                COUNT(*) as total_events,
                COUNT(DISTINCT session_id) as unique_sessions,
                AVG(duration) as avg_duration,
                MIN(event_time) as first_event,
                MAX(event_time) as last_event,
                SUM(CASE WHEN action = 'purchase' THEN 1 ELSE 0 END) as purchase_count,
                SUM(CASE WHEN action = 'add_to_cart' THEN 1 ELSE 0 END) as cart_adds,
                AVG(CASE WHEN action = 'purchase' THEN amount ELSE NULL END) as avg_purchase_amount,
                DATEDIFF('day', MIN(event_time), MAX(event_time)) as active_days
            FROM events
            GROUP BY user_id
        """)
        
        return self.con.execute("SELECT * FROM user_features").df()
    
    def create_time_features(self):
        """Create time-based features."""
        self.con.execute("""
            CREATE TABLE time_features AS
            SELECT 
                user_id,
                EXTRACT(HOUR FROM event_time) as hour_of_day,
                EXTRACT(DOW FROM event_time) as day_of_week,
                EXTRACT(MONTH FROM event_time) as month,
                COUNT(*) as event_count
            FROM events
            GROUP BY 1, 2, 3, 4
        """)
    
    def create_aggregation_features(self):
        """Create aggregation features."""
        self.con.execute("""
            CREATE TABLE agg_features AS
            SELECT 
                user_id,
                -- Rolling aggregations
                COUNT(*) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time 
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ) as rolling_7day_events,
                
                AVG(amount) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time 
                    ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
                ) as rolling_30day_avg_amount,
                
                -- Running total
                SUM(amount) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time
                ) as running_total,
                
                -- Lead/lag
                LAG(amount, 1) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time
                ) as prev_amount,
                
                LEAD(amount, 1) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time
                ) as next_amount
            FROM events
            WHERE amount IS NOT NULL
        """)

# Usage
engineer = FeatureEngineer('ml_features.db')

# Create features from DataFrame
features = engineer.create_user_features(events_df)

# Export to ML pipeline
X = features.drop('user_id', axis=1).values
y = features['target'].values

Model Evaluation

Query-Level Features

import duckdb

class ModelFeatures:
    """Generate features for model evaluation."""
    
    def __init__(self, db_path='model_features.db'):
        self.con = duckdb.connect(db_path)
    
    def get_prediction_features(self, query):
        """Get features for a specific query."""
        result = self.con.execute("""
            SELECT 
                query_length,
                num_filters,
                has_join,
                has_aggregation,
                num_ctes,
                complexity_score
            FROM query_features
            WHERE query = ?
        """, [query]).fetchone()
        
        return result
    
    def batch_features(self, queries):
        """Get features for multiple queries."""
        placeholders = ','.join(['?' for _ in queries])
        results = self.con.execute(f"""
            SELECT 
                query,
                query_length,
                num_filters,
                has_join,
                has_aggregation,
                complexity_score
            FROM query_features
            WHERE query IN ({placeholders})
        """, queries).fetchall()
        
        return {r[0]: r[1:] for r in results}

# Usage
feature_store = ModelFeatures('ml.db')
features = feature_store.get_prediction_features(
    "SELECT * FROM users WHERE age > 25"
)

Data Preparation for ML

Train/Test Split

import duckdb
from sklearn.model_selection import train_test_split

con = duckdb.connect('ml.db')

# Get all data
data = con.execute("""
    SELECT 
        feature1,
        feature2,
        feature3,
        target
    FROM ml_table
""").fetchall()

X = [row[:-1] for row in data]
y = [row[-1] for row in data]

# Split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Create train/test tables
con.execute("CREATE TABLE train_data AS SELECT * FROM ml_table WHERE RANDOM() < 0.8")
con.execute("CREATE TABLE test_data AS SELECT * FROM ml_table WHERE RANDOM() >= 0.8")

Cross-Validation

import duckdb
import numpy as np
from sklearn.model_selection import KFold

con = duckdb.connect('ml.db')

# Get data
data = con.execute("SELECT * FROM ml_table").df()
X = data.drop('target', axis=1).values
y = data['target'].values

# K-Fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)

scores = []
for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
    X_train, X_val = X[train_idx], X[val_idx]
    y_train, y_val = y[train_idx], y[val_idx]
    
    # Train model (example with simple model)
    # model.fit(X_train, y_train)
    # score = model.score(X_val, y_val)
    # scores.append(score)
    
    # Store fold results in DuckDB
    con.execute("""
        INSERT INTO cv_results VALUES (?, ?)
    """, (fold, np.mean(scores)))

print(f"Mean CV Score: {np.mean(scores):.4f}")

Production ML Integration

Batch Prediction

import duckdb
import joblib

class BatchPredictor:
    """Batch prediction using DuckDB."""
    
    def __init__(self, db_path, model_path):
        self.con = duckdb.connect(db_path)
        self.model = joblib.load(model_path)
    
    def predict(self):
        """Run batch prediction."""
        # Get features
        features = self.con.execute("""
            SELECT 
                feature1,
                feature2,
                feature3
            FROM prediction_input
        """).fetchall()
        
        # Predict
        predictions = self.model.predict(features)
        
        # Store predictions
        for i, pred in enumerate(predictions):
            self.con.execute("""
                UPDATE prediction_input
                SET prediction = ?
                WHERE rowid = ?
            """, (pred, i + 1))
        
        self.con.commit()
        return predictions
    
    def predict_proba(self):
        """Get prediction probabilities."""
        features = self.con.execute("SELECT * FROM prediction_input").fetchall()
        probas = self.model.predict_proba(features)
        
        # Store probabilities
        for i, proba in enumerate(probas):
            self.con.execute("""
                UPDATE prediction_input
                SET prob_class_0 = ?, prob_class_1 = ?
                WHERE rowid = ?
            """, (proba[0], proba[1], i + 1))

# Usage
predictor = BatchPredictor('predictions.db', 'model.joblib')
predictions = predictor.predict()

Best Practices

Vector Search Optimization

-- Choose appropriate dimensions
-- MiniLM: 384
-- ada-002: 1536
-- BGE: 768

-- Index tuning
CREATE INDEX idx_embedding ON table USING HNSW (embedding)
WITH (ef_construction = 200, ef_search = 50);

ML Pipeline Tips

# Use appropriate data types
# INTEGER for categorical
# DOUBLE for continuous

# Batch processing
# Process in chunks to avoid memory issues
chunk_size = 100000
for i in range(0, len(data), chunk_size):
    chunk = data[i:i+chunk_size]
    process(chunk)

Resources


Conclusion

DuckDB provides excellent support for AI applications through its vector search capabilities and seamless Python integration. From RAG pipelines to ML feature engineering, DuckDB offers a unified platform for analytics and AI workflows.

In the next article, we’ll explore real-world DuckDB use cases, including production patterns and implementation strategies.

Comments