Introduction
DuckDB’s support for vector similarity search through the vss extension enables AI applications directly within your analytical workflows. Combined with its excellent Python integration, DuckDB becomes a powerful tool for building ML pipelines, feature engineering, and retrieval-augmented generation (RAG) systems.
This comprehensive guide explores how to leverage DuckDB for AI applications, from embedding storage to production ML pipelines.
Vector Similarity Search
Setting Up VSS Extension
-- Install and load vector search extension
INSTALL vss;
LOAD vss;
-- Check vss is available
SELECT * FROM duckdb_extensions() WHERE extension_name = 'vss';
Creating Vector Tables
-- Create table with vector column
CREATE TABLE document_embeddings (
id INTEGER PRIMARY KEY,
document_id INTEGER,
text_content TEXT,
embedding FLOAT[384] -- 384 dimensions for MiniLM
);
-- Create HNSW index for fast search
CREATE INDEX idx_embedding ON document_embeddings
USING HNSW (embedding)
WITH (ef_construction = 200, ef_search = 50);
Inserting Vectors
import duckdb
import numpy as np
from sentence_transformers import SentenceTransformer
# Connect to DuckDB
con = duckdb.connect('vectors.db')
con.execute("INSTALL vss")
con.execute("LOAD vss")
# Create table
con.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER,
text TEXT,
embedding FLOAT[384]
)
""")
# Create index
con.execute("""
CREATE INDEX IF NOT EXISTS idx_embedding
ON embeddings USING HNSW (embedding)
""")
# Generate embeddings using sentence-transformers
model = SentenceTransformer('all-MiniLM-L6-v2')
documents = [
(1, "Python is a high-level programming language"),
(2, "Machine learning is a subset of AI"),
(3, "Data science combines statistics and programming"),
(4, "Neural networks are inspired by biological brains"),
(5, "Natural language processing deals with text data")
]
for doc_id, text in documents:
embedding = model.encode(text)
# Convert numpy array to list
embedding_list = embedding.tolist()
con.execute(
"INSERT INTO embeddings VALUES (?, ?, ?)",
(doc_id, text, embedding_list)
)
print("Inserted embeddings")
Vector Similarity Search
-- Cosine distance search
SELECT
id,
text,
array_cosine_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;
-- Euclidean distance
SELECT
id,
text,
array_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;
-- Filtered search
SELECT
id,
text,
array_cosine_distance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
WHERE array_cosine_distance(embedding, [0.1, 0.2, ...]) < 0.5
ORDER BY distance
LIMIT 5;
Building RAG Pipelines
RAG Architecture with DuckDB
import duckdb
import numpy as np
from sentence_transformers import SentenceTransformer
import openai
class DuckDBRAG:
"""RAG pipeline using DuckDB for vector storage."""
def __init__(self, db_path='rag.db'):
self.con = duckdb.connect(db_path)
self.con.execute("INSTALL vss")
self.con.execute("LOAD vss")
self._init_schema()
self.model = SentenceTransformer('all-MiniLM-L6-v2')
def _init_schema(self):
"""Initialize database schema."""
self.con.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY,
document_id INTEGER,
chunk_text TEXT,
chunk_index INTEGER,
embedding FLOAT[384]
)
""")
self.con.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY,
title TEXT,
source TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create HNSW index
self.con.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
ON chunks USING HNSW (embedding)
""")
def ingest_document(self, document_id, title, source, text, chunk_size=500):
"""Ingest document with chunking and embeddings."""
# Split into chunks
chunks = [text[i:i+chunk_size]
for i in range(0, len(text), chunk_size)]
# Insert document metadata
self.con.execute(
"INSERT INTO documents (id, title, source) VALUES (?, ?, ?)",
(document_id, title, source)
)
# Generate embeddings and insert chunks
for i, chunk in enumerate(chunks):
embedding = self.model.encode(chunk)
self.con.execute("""
INSERT INTO chunks (document_id, chunk_text, chunk_index, embedding)
VALUES (?, ?, ?, ?)
""", (document_id, chunk, i, embedding.tolist()))
self.con.commit()
return len(chunks)
def retrieve(self, query, top_k=5):
"""Retrieve relevant chunks for query."""
query_embedding = self.model.encode(query)
results = self.con.execute("""
SELECT
chunk_text,
document_id,
array_cosine_distance(embedding, ?) as distance
FROM chunks
ORDER BY distance
LIMIT ?
""", (query_embedding.tolist(), top_k)).fetchall()
return results
def answer(self, question, max_context=2000):
"""Answer question using RAG."""
# Retrieve relevant chunks
chunks = self.retrieve(question, top_k=10)
# Build context
context = ""
for chunk, doc_id, distance in chunks:
if len(context) + len(chunk) > max_context:
break
context += chunk + "\n\n"
# Generate answer using LLM
prompt = f"""Based on the following context, answer the question.
Context:
{context}
Question: {question}
Answer:"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return response.choices[0].message.content
def close(self):
"""Close connection."""
self.con.close()
# Usage
rag = DuckDBRAG('knowledge.db')
# Ingest documents
with open('article.txt') as f:
rag.ingest_document(1, 'AI Article', 'article.txt', f.read())
# Ask questions
answer = rag.answer("What is machine learning?")
print(answer)
rag.close()
Feature Engineering
ML Feature Creation
import duckdb
import numpy as np
class FeatureEngineer:
"""Feature engineering with DuckDB."""
def __init__(self, db_path='features.db'):
self.con = duckdb.connect(db_path)
def create_user_features(self, events_df):
"""Create user-level features from events."""
# Register DataFrame as view
self.con.execute("CREATE OR REPLACE VIEW events AS SELECT * FROM events_df")
# Create features
self.con.execute("""
CREATE TABLE user_features AS
SELECT
user_id,
COUNT(*) as total_events,
COUNT(DISTINCT session_id) as unique_sessions,
AVG(duration) as avg_duration,
MIN(event_time) as first_event,
MAX(event_time) as last_event,
SUM(CASE WHEN action = 'purchase' THEN 1 ELSE 0 END) as purchase_count,
SUM(CASE WHEN action = 'add_to_cart' THEN 1 ELSE 0 END) as cart_adds,
AVG(CASE WHEN action = 'purchase' THEN amount ELSE NULL END) as avg_purchase_amount,
DATEDIFF('day', MIN(event_time), MAX(event_time)) as active_days
FROM events
GROUP BY user_id
""")
return self.con.execute("SELECT * FROM user_features").df()
def create_time_features(self):
"""Create time-based features."""
self.con.execute("""
CREATE TABLE time_features AS
SELECT
user_id,
EXTRACT(HOUR FROM event_time) as hour_of_day,
EXTRACT(DOW FROM event_time) as day_of_week,
EXTRACT(MONTH FROM event_time) as month,
COUNT(*) as event_count
FROM events
GROUP BY 1, 2, 3, 4
""")
def create_aggregation_features(self):
"""Create aggregation features."""
self.con.execute("""
CREATE TABLE agg_features AS
SELECT
user_id,
-- Rolling aggregations
COUNT(*) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as rolling_7day_events,
AVG(amount) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) as rolling_30day_avg_amount,
-- Running total
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY event_time
) as running_total,
-- Lead/lag
LAG(amount, 1) OVER (
PARTITION BY user_id
ORDER BY event_time
) as prev_amount,
LEAD(amount, 1) OVER (
PARTITION BY user_id
ORDER BY event_time
) as next_amount
FROM events
WHERE amount IS NOT NULL
""")
# Usage
engineer = FeatureEngineer('ml_features.db')
# Create features from DataFrame
features = engineer.create_user_features(events_df)
# Export to ML pipeline
X = features.drop('user_id', axis=1).values
y = features['target'].values
Model Evaluation
Query-Level Features
import duckdb
class ModelFeatures:
"""Generate features for model evaluation."""
def __init__(self, db_path='model_features.db'):
self.con = duckdb.connect(db_path)
def get_prediction_features(self, query):
"""Get features for a specific query."""
result = self.con.execute("""
SELECT
query_length,
num_filters,
has_join,
has_aggregation,
num_ctes,
complexity_score
FROM query_features
WHERE query = ?
""", [query]).fetchone()
return result
def batch_features(self, queries):
"""Get features for multiple queries."""
placeholders = ','.join(['?' for _ in queries])
results = self.con.execute(f"""
SELECT
query,
query_length,
num_filters,
has_join,
has_aggregation,
complexity_score
FROM query_features
WHERE query IN ({placeholders})
""", queries).fetchall()
return {r[0]: r[1:] for r in results}
# Usage
feature_store = ModelFeatures('ml.db')
features = feature_store.get_prediction_features(
"SELECT * FROM users WHERE age > 25"
)
Data Preparation for ML
Train/Test Split
import duckdb
from sklearn.model_selection import train_test_split
con = duckdb.connect('ml.db')
# Get all data
data = con.execute("""
SELECT
feature1,
feature2,
feature3,
target
FROM ml_table
""").fetchall()
X = [row[:-1] for row in data]
y = [row[-1] for row in data]
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Create train/test tables
con.execute("CREATE TABLE train_data AS SELECT * FROM ml_table WHERE RANDOM() < 0.8")
con.execute("CREATE TABLE test_data AS SELECT * FROM ml_table WHERE RANDOM() >= 0.8")
Cross-Validation
import duckdb
import numpy as np
from sklearn.model_selection import KFold
con = duckdb.connect('ml.db')
# Get data
data = con.execute("SELECT * FROM ml_table").df()
X = data.drop('target', axis=1).values
y = data['target'].values
# K-Fold cross-validation
kf = KFold(n_splits=5, shuffle=True, random_state=42)
scores = []
for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
# Train model (example with simple model)
# model.fit(X_train, y_train)
# score = model.score(X_val, y_val)
# scores.append(score)
# Store fold results in DuckDB
con.execute("""
INSERT INTO cv_results VALUES (?, ?)
""", (fold, np.mean(scores)))
print(f"Mean CV Score: {np.mean(scores):.4f}")
Production ML Integration
Batch Prediction
import duckdb
import joblib
class BatchPredictor:
"""Batch prediction using DuckDB."""
def __init__(self, db_path, model_path):
self.con = duckdb.connect(db_path)
self.model = joblib.load(model_path)
def predict(self):
"""Run batch prediction."""
# Get features
features = self.con.execute("""
SELECT
feature1,
feature2,
feature3
FROM prediction_input
""").fetchall()
# Predict
predictions = self.model.predict(features)
# Store predictions
for i, pred in enumerate(predictions):
self.con.execute("""
UPDATE prediction_input
SET prediction = ?
WHERE rowid = ?
""", (pred, i + 1))
self.con.commit()
return predictions
def predict_proba(self):
"""Get prediction probabilities."""
features = self.con.execute("SELECT * FROM prediction_input").fetchall()
probas = self.model.predict_proba(features)
# Store probabilities
for i, proba in enumerate(probas):
self.con.execute("""
UPDATE prediction_input
SET prob_class_0 = ?, prob_class_1 = ?
WHERE rowid = ?
""", (proba[0], proba[1], i + 1))
# Usage
predictor = BatchPredictor('predictions.db', 'model.joblib')
predictions = predictor.predict()
Best Practices
Vector Search Optimization
-- Choose appropriate dimensions
-- MiniLM: 384
-- ada-002: 1536
-- BGE: 768
-- Index tuning
CREATE INDEX idx_embedding ON table USING HNSW (embedding)
WITH (ef_construction = 200, ef_search = 50);
ML Pipeline Tips
# Use appropriate data types
# INTEGER for categorical
# DOUBLE for continuous
# Batch processing
# Process in chunks to avoid memory issues
chunk_size = 100000
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
process(chunk)
Resources
Conclusion
DuckDB provides excellent support for AI applications through its vector search capabilities and seamless Python integration. From RAG pipelines to ML feature engineering, DuckDB offers a unified platform for analytics and AI workflows.
In the next article, we’ll explore real-world DuckDB use cases, including production patterns and implementation strategies.
Comments