Skip to main content

DuckDB for AI: Vector Search, ML Pipelines, and RAG Implementation

Created: March 5, 2026 CalmOps 8 min read

Introduction

DuckDB’s support for vector similarity search through the vss extension enables AI applications directly within your analytical workflows. Combined with its excellent Python integration, DuckDB becomes a powerful tool for building ML pipelines, feature engineering, and retrieval-augmented generation (RAG) systems.

This comprehensive guide explores how to leverage DuckDB for AI applications, from embedding storage to production ML pipelines.


Setting Up VSS Extension

-- Install and load vector search extension
INSTALL vss;
LOAD vss;

-- Check vss is available
SELECT * FROM duckdb_extensions() WHERE extension_name = 'vss';

Creating Vector Tables

-- Create table with vector column
CREATE TABLE document_embeddings (
    id INTEGER PRIMARY KEY,
    document_id INTEGER,
    text_content TEXT,
    embedding FLOAT[384]  -- 384 dimensions for MiniLM
);

-- Create HNSW index for fast search
CREATE INDEX idx_embedding ON document_embeddings 
USING HNSW (embedding)
WITH (ef_construction = 200, ef_search = 50);

Inserting Vectors

import duckdb
import numpy as np
from sentence_transformers import SentenceTransformer

# Connect to DuckDB
con = duckdb.connect('vectors.db')
con.execute("INSTALL vss")
con.execute("LOAD vss")

# Create table
con.execute("""
    CREATE TABLE IF NOT EXISTS embeddings (
        id INTEGER,
        text TEXT,
        embedding FLOAT[384]
    )
""")

# Create index
con.execute("""
    CREATE INDEX IF NOT EXISTS idx_embedding 
    ON embeddings USING HNSW (embedding)
""")

# Generate embeddings using sentence-transformers
model = SentenceTransformer('all-MiniLM-L6-v2')

documents = [
    (1, "Python is a high-level programming language"),
    (2, "Machine learning is a subset of AI"),
    (3, "Data science combines statistics and programming"),
    (4, "Neural networks are inspired by biological brains"),
    (5, "Natural language processing deals with text data")
]

for doc_id, text in documents:
    embedding = model.encode(text)
    # Convert numpy array to list
    embedding_list = embedding.tolist()
    con.execute(
        "INSERT INTO embeddings VALUES (?, ?, ?)",
        (doc_id, text, embedding_list)
    )

print("Inserted embeddings")

Vector Similarity Search

-- Cosine distance search
SELECT 
    id,
    text,
    array_cosine_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;

-- Euclidean distance
SELECT 
    id,
    text,
    array_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;

-- Filtered search
SELECT 
    id,
    text,
    array_cosine_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
WHERE array_cosine_distance(embedding, [0.1, 0.2, ...]) < 0.5
ORDER BY distance
LIMIT 5;

Building RAG Pipelines

RAG Architecture with DuckDB

import duckdb
import numpy as np
from sentence_transformers import SentenceTransformer
import openai

class DuckDBRAG:
    """RAG pipeline using DuckDB for vector storage."""
    
    def __init__(self, db_path='rag.db'):
        self.con = duckdb.connect(db_path)
        self.con.execute("INSTALL vss")
        self.con.execute("LOAD vss")
        self._init_schema()
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
    
    def _init_schema(self):
        """Initialize database schema."""
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS chunks (
                id INTEGER PRIMARY KEY,
                document_id INTEGER,
                chunk_text TEXT,
                chunk_index INTEGER,
                embedding FLOAT[384]
            )
        """)
        
        self.con.execute("""
            CREATE TABLE IF NOT EXISTS documents (
                id INTEGER PRIMARY KEY,
                title TEXT,
                source TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        # Create HNSW index
        self.con.execute("""
            CREATE INDEX IF NOT EXISTS idx_chunks_embedding 
            ON chunks USING HNSW (embedding)
        """)
    
    def ingest_document(self, document_id, title, source, text, chunk_size=500):
        """Ingest document with chunking and embeddings."""
        # Split into chunks
        chunks = [text[i:i+chunk_size] 
                  for i in range(0, len(text), chunk_size)]
        
        # Insert document metadata
        self.con.execute(
            "INSERT INTO documents (id, title, source) VALUES (?, ?, ?)",
            (document_id, title, source)
        )
        
        # Generate embeddings and insert chunks
        for i, chunk in enumerate(chunks):
            embedding = self.model.encode(chunk)
            self.con.execute("""
                INSERT INTO chunks (document_id, chunk_text, chunk_index, embedding)
                VALUES (?, ?, ?, ?)
            """, (document_id, chunk, i, embedding.tolist()))
        
        self.con.commit()
        return len(chunks)
    
    def retrieve(self, query, top_k=5):
        """Retrieve relevant chunks for query."""
        query_embedding = self.model.encode(query)
        
        results = self.con.execute("""
            SELECT 
                chunk_text,
                document_id,
                array_cosine_distance(embedding, ?) as distance
            FROM chunks
            ORDER BY distance
            LIMIT ?
        """, (query_embedding.tolist(), top_k)).fetchall()
        
        return results
    
    def answer(self, question, max_context=2000):
        """Answer question using RAG."""
        # Retrieve relevant chunks
        chunks = self.retrieve(question, top_k=10)
        
        # Build context
        context = ""
        for chunk, doc_id, distance in chunks:
            if len(context) + len(chunk) > max_context:
                break
            context += chunk + "\n\n"
        
        # Generate answer using LLM
        prompt = f"""Based on the following context, answer the question.

Context:
{context}

Question: {question}

Answer:"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content
    
    def close(self):
        """Close connection."""
        self.con.close()

# Usage
rag = DuckDBRAG('knowledge.db')

# Ingest documents
with open('article.txt') as f:
    rag.ingest_document(1, 'AI Article', 'article.txt', f.read())

# Ask questions
answer = rag.answer("What is machine learning?")
print(answer)

rag.close()

Feature Engineering

ML Feature Creation

import duckdb
import numpy as np

class FeatureEngineer:
    """Feature engineering with DuckDB."""
    
    def __init__(self, db_path='features.db'):
        self.con = duckdb.connect(db_path)
    
    def create_user_features(self, events_df):
        """Create user-level features from events."""
        # Register DataFrame as view
        self.con.execute("CREATE OR REPLACE VIEW events AS SELECT * FROM events_df")
        
        # Create features
        self.con.execute("""
            CREATE TABLE user_features AS
            SELECT 
                user_id,
                COUNT(*) as total_events,
                COUNT(DISTINCT session_id) as unique_sessions,
                AVG(duration) as avg_duration,
                MIN(event_time) as first_event,
                MAX(event_time) as last_event,
                SUM(CASE WHEN action = 'purchase' THEN 1 ELSE 0 END) as purchase_count,
                SUM(CASE WHEN action = 'add_to_cart' THEN 1 ELSE 0 END) as cart_adds,
                AVG(CASE WHEN action = 'purchase' THEN amount ELSE NULL END) as avg_purchase_amount,
                DATEDIFF('day', MIN(event_time), MAX(event_time)) as active_days
            FROM events
            GROUP BY user_id
        """)
        
        return self.con.execute("SELECT * FROM user_features").df()
    
    def create_time_features(self):
        """Create time-based features."""
        self.con.execute("""
            CREATE TABLE time_features AS
            SELECT 
                user_id,
                EXTRACT(HOUR FROM event_time) as hour_of_day,
                EXTRACT(DOW FROM event_time) as day_of_week,
                EXTRACT(MONTH FROM event_time) as month,
                COUNT(*) as event_count
            FROM events
            GROUP BY 1, 2, 3, 4
        """)
    
    def create_aggregation_features(self):
        """Create aggregation features."""
        self.con.execute("""
            CREATE TABLE agg_features AS
            SELECT 
                user_id,
                -- Rolling aggregations
                COUNT(*) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time 
                    ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
                ) as rolling_7day_events,
                
                AVG(amount) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time 
                    ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
                ) as rolling_30day_avg_amount,
                
                -- Running total
                SUM(amount) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time
                ) as running_total,
                
                -- Lead/lag
                LAG(amount, 1) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time
                ) as prev_amount,
                
                LEAD(amount, 1) OVER (
                    PARTITION BY user_id 
                    ORDER BY event_time
                ) as next_amount
            FROM events
            WHERE amount IS NOT NULL
        """)

# Usage
engineer = FeatureEngineer('ml_features.db')

# Create features from DataFrame
features = engineer.create_user_features(events_df)

# Export to ML pipeline
X = features.drop('user_id', axis=1).values
y = features['target'].values

Model Evaluation

Query-Level Features

import duckdb

class ModelFeatures:
    """Generate features for model evaluation."""
    
    def __init__(self, db_path='model_features.db'):
        self.con = duckdb.connect(db_path)
    
    def get_prediction_features(self, query):
        """Get features for a specific query."""
        result = self.con.execute("""
            SELECT 
                query_length,
                num_filters,
                has_join,
                has_aggregation,
                num_ctes,
                complexity_score
            FROM query_features
            WHERE query = ?
        """, [query]).fetchone()
        
        return result
    
    def batch_features(self, queries):
        """Get features for multiple queries."""
        placeholders = ','.join(['?' for _ in queries])
        results = self.con.execute(f"""
            SELECT 
                query,
                query_length,
                num_filters,
                has_join,
                has_aggregation,
                complexity_score
            FROM query_features
            WHERE query IN ({placeholders})
        """, queries).fetchall()
        
        return {r[0]: r[1:] for r in results}

# Usage
feature_store = ModelFeatures('ml.db')
features = feature_store.get_prediction_features(
    "SELECT * FROM users WHERE age > 25"
)

Data Preparation for ML

Train/Test Split

import duckdb
from sklearn.model_selection import train_test_split

con = duckdb.connect('ml.db')

# Get all data
data = con.execute("""
    SELECT 
        feature1,
        feature2,
        feature3,
        target
    FROM ml_table
""").fetchall()

X = [row[:-1] for row in data]
y = [row[-1] for row in data]

# Split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Create train/test tables
con.execute("CREATE TABLE train_data AS SELECT * FROM ml_table WHERE RANDOM() < 0.8")
con.execute("CREATE TABLE test_data AS SELECT * FROM ml_table WHERE RANDOM() >= 0.8")

Cross-Validation

import duckdb
import numpy as np
from sklearn.model_selection import KFold

con = duckdb.connect('ml.db')

# Get data
data = con.execute("SELECT * FROM ml_table").df()
X = data.drop('target', axis=1).values
y = data['target'].values

# K-Fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)

scores = []
for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
    X_train, X_val = X[train_idx], X[val_idx]
    y_train, y_val = y[train_idx], y[val_idx]
    
    # Train model (example with simple model)
    # model.fit(X_train, y_train)
    # score = model.score(X_val, y_val)
    # scores.append(score)
    
    # Store fold results in DuckDB
    con.execute("""
        INSERT INTO cv_results VALUES (?, ?)
    """, (fold, np.mean(scores)))

print(f"Mean CV Score: {np.mean(scores):.4f}")

Production ML Integration

Batch Prediction

import duckdb
import joblib

class BatchPredictor:
    """Batch prediction using DuckDB."""
    
    def __init__(self, db_path, model_path):
        self.con = duckdb.connect(db_path)
        self.model = joblib.load(model_path)
    
    def predict(self):
        """Run batch prediction."""
        # Get features
        features = self.con.execute("""
            SELECT 
                feature1,
                feature2,
                feature3
            FROM prediction_input
        """).fetchall()
        
        # Predict
        predictions = self.model.predict(features)
        
        # Store predictions
        for i, pred in enumerate(predictions):
            self.con.execute("""
                UPDATE prediction_input
                SET prediction = ?
                WHERE rowid = ?
            """, (pred, i + 1))
        
        self.con.commit()
        return predictions
    
    def predict_proba(self):
        """Get prediction probabilities."""
        features = self.con.execute("SELECT * FROM prediction_input").fetchall()
        probas = self.model.predict_proba(features)
        
        # Store probabilities
        for i, proba in enumerate(probas):
            self.con.execute("""
                UPDATE prediction_input
                SET prob_class_0 = ?, prob_class_1 = ?
                WHERE rowid = ?
            """, (proba[0], proba[1], i + 1))

# Usage
predictor = BatchPredictor('predictions.db', 'model.joblib')
predictions = predictor.predict()

Best Practices

Vector Search Optimization

-- Choose appropriate dimensions
-- MiniLM: 384
-- ada-002: 1536
-- BGE: 768

-- Index tuning
CREATE INDEX idx_embedding ON table USING HNSW (embedding)
WITH (ef_construction = 200, ef_search = 50);

ML Pipeline Tips

# Use appropriate data types
# INTEGER for categorical
# DOUBLE for continuous

# Batch processing
# Process in chunks to avoid memory issues
chunk_size = 100000
for i in range(0, len(data), chunk_size):
    chunk = data[i:i+chunk_size]
    process(chunk)

Resources


Conclusion

DuckDB provides excellent support for AI applications through its vector search capabilities and seamless Python integration. From RAG pipelines to ML feature engineering, DuckDB offers a unified platform for analytics and AI workflows.

In the next article, we’ll explore real-world DuckDB use cases, including production patterns and implementation strategies.

Comments

Share this article

Scan to read on mobile