Introduction
ClickHouse’s recent addition of vector similarity search enables AI applications directly within your analytical infrastructure. Combined with its existing strengths in analytics, ClickHouse becomes a powerful platform for building AI-powered applications including retrieval-augmented generation (RAG) systems.
This guide explores how to leverage ClickHouse for AI applications, from embedding storage to production ML pipelines.
Vector Similarity Search
Getting Started
-- Create table with vector column (25.x)
CREATE TABLE embeddings (
id UInt64,
document_id UInt64,
text String,
embedding Array(Float32) -- e.g., 384 dimensions
) ENGINE = MergeTree()
ORDER BY document_id;
-- Insert vector data
INSERT INTO embeddings VALUES
(1, 1, 'Python tutorial', [0.1, 0.2, 0.3, ...]),
(2, 1, 'Learn Python', [0.15, 0.25, 0.35, ...]),
(3, 2, 'JavaScript guide', [0.8, 0.1, 0.05, ...]);
Vector Index
-- Add vector similarity index (25.x)
ALTER TABLE embeddings
ADD INDEX vec_idx embedding
TYPE vector_similarity('metric=cosine')
GRANULARITY 1;
-- Or without index (full scan)
SELECT
id,
text,
arrayCosineDistance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;
Python Integration
import clickhouse_connect
import numpy as np
from sentence_transformers import SentenceTransformer
class ClickHouseVectorStore:
"""Vector store using ClickHouse."""
def __init__(self, host='localhost', port=8123):
self.client = clickhouse_connect.get_client(
host=host,
port=port
)
self._create_table()
def _create_table(self):
"""Create vector table."""
self.client.command("""
CREATE TABLE IF NOT EXISTS vectors (
id UInt64,
text String,
embedding Array(Float32)
) ENGINE = MergeTree()
ORDER BY id
""")
def add_documents(self, documents):
"""Add documents with embeddings."""
model = SentenceTransformer('all-MiniLM-L6-v2')
for i, text in enumerate(documents):
embedding = model.encode(text).tolist()
self.client.command(
"INSERT INTO vectors VALUES",
[[i, text, embedding]]
)
def search(self, query, top_k=5):
"""Search for similar documents."""
model = SentenceTransformer('all-MiniLM-L6-v2')
query_vector = model.encode(query).tolist()
results = self.client.query(f"""
SELECT id, text,
arrayCosineDistance(embedding, {query_vector}) as distance
FROM vectors
ORDER BY distance
LIMIT {top_k}
""")
return [
{'id': r[0], 'text': r[1], 'distance': r[2]}
for r in results.result_rows
]
# Usage
store = ClickHouseVectorStore()
store.add_documents([
"Python is a great programming language",
"Machine learning is fascinating",
"Data science combines programming and statistics"
])
results = store.search("What is programming?")
for r in results:
print(f"Text: {r['text']}, Distance: {r['distance']:.4f}")
Building RAG Pipelines
Complete RAG Implementation
import clickhouse_connect
from sentence_transformers import SentenceTransformer
import openai
class ClickHouseRAG:
"""RAG pipeline with ClickHouse."""
def __init__(self, config):
self.client = clickhouse_connect.get_client(**config)
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self._init_schema()
def _init_schema(self):
"""Initialize database schema."""
# Chunks table
self.client.command("""
CREATE TABLE IF NOT EXISTS chunks (
id UInt64,
document_id UInt64,
chunk_text String,
chunk_index UInt32,
embedding Array(Float32)
) ENGINE = MergeTree()
ORDER BY (document_id, chunk_index)
""")
# Documents table
self.client.command("""
CREATE TABLE IF NOT EXISTS documents (
id UInt64,
title String,
source String,
created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY id
""")
def ingest_document(self, document_id, title, source, text, chunk_size=500):
"""Ingest document with chunking."""
# Split into chunks
chunks = [text[i:i+chunk_size]
for i in range(0, len(text), chunk_size)]
# Insert document metadata
self.client.command(
"INSERT INTO documents VALUES",
[[document_id, title, source]]
)
# Generate embeddings and insert chunks
for i, chunk in enumerate(chunks):
embedding = self.model.encode(chunk).tolist()
self.client.command(
"INSERT INTO chunks VALUES",
[[i, document_id, chunk, i, embedding]]
)
def retrieve(self, query, top_k=5):
"""Retrieve relevant context."""
query_vector = self.model.encode(query).tolist()
results = self.client.query(f"""
SELECT
chunk_text,
document_id,
arrayCosineDistance(embedding, {query_vector}) as distance
FROM chunks
ORDER BY distance
LIMIT {top_k}
""")
return [
{'text': r[0], 'doc_id': r[1], 'distance': r[2]}
for r in results.result_rows
]
def answer(self, question, max_context=3000):
"""Answer question using RAG."""
# Retrieve relevant chunks
chunks = self.retrieve(question, top_k=10)
# Build context
context = ""
for chunk in chunks:
if len(context) + len(chunk['text']) > max_context:
break
context += chunk['text'] + "\n\n"
# Generate answer
prompt = f"""Based on the following context, answer the question.
Context:
{context}
Question: {question}
Answer:"""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0
)
return response.choices[0].message.content
def close(self):
"""Close connection."""
self.client.close()
# Usage
rag = ClickHouseRAG({'host': 'localhost', 'port': 8123})
# Ingest document
with open('article.txt') as f:
rag.ingest_document(1, 'Article', 'article.txt', f.read())
# Answer question
answer = rag.answer("What is the main topic?")
print(answer)
rag.close()
ML Feature Engineering
Feature Creation
import clickhouse_connect
class FeatureEngineering:
"""Feature engineering with ClickHouse."""
def __init__(self, config):
self.client = clickhouse_connect.get_client(**config)
def create_user_features(self):
"""Create user-level features from events."""
self.client.command("""
CREATE TABLE IF NOT EXISTS user_features AS
SELECT
user_id,
count() as total_events,
uniqExact(session_id) as unique_sessions,
avg(duration) as avg_duration,
min(event_time) as first_event,
max(event_time) as last_event,
sum(case when action = 'purchase' then 1 else 0 end) as purchase_count,
sum(amount) as total_amount,
stddevPop(amount) as amount_stddev
FROM events
GROUP BY user_id
""")
def create_time_features(self):
"""Create time-based features."""
self.client.command("""
CREATE TABLE IF NOT EXISTS time_features AS
SELECT
user_id,
toHour(event_time) as hour,
toDayOfWeek(event_time) as day_of_week,
toMonth(event_time) as month,
count() as event_count
FROM events
GROUP BY user_id, hour, day_of_week, month
""")
def create_aggregation_features(self):
"""Create aggregation features."""
self.client.command("""
CREATE TABLE IF NOT EXISTS agg_features AS
SELECT
user_id,
-- Rolling aggregations
anyLastIf(amount, action = 'view') as last_viewed_amount,
sumIf(amount, action = 'purchase') as total_purchases,
avgIf(amount, action = 'view') as avg_viewed_amount,
-- Sequence features
groupArray(1)(amount) as amounts,
sequenceCount('(?1)(?2)(?3)')(
action,
action = 'view',
action = 'add_to_cart',
action = 'purchase'
) as conversion_funnel
FROM events
GROUP BY user_id
""")
# Usage
features = FeatureEngineering({'host': 'localhost'})
features.create_user_features()
Export for ML
def export_for_ml(self, table, target_column):
"""Export features for ML training."""
df = self.client.query(f"""
SELECT * FROM {table}
WHERE {target_column} IS NOT NULL
""").result_set.to_pandas()
return df
# Export
df = export_for_ml('user_features', 'purchase_count')
X = df.drop(columns=['purchase_count', 'user_id'])
y = df['purchase_count']
Recommendation Systems
Collaborative Filtering
-- User-item interactions
CREATE TABLE user_items (
user_id UInt32,
item_id UInt32,
rating Float32,
timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (user_id, timestamp);
-- Similar users
SELECT
user_id,
neighbor_user_id,
distance
FROM (
SELECT
user_id,
user_id as neighbor_user_id,
0 as distance
FROM user_items
WHERE user_id = 123
UNION ALL
SELECT
123 as user_id,
user_id as neighbor_user_id,
1 - correlation(other_ratings, my_ratings) as distance
FROM (
SELECT
user_id,
groupArray(rating) as other_ratings
FROM user_items
WHERE item_id IN (
SELECT item_id FROM user_items WHERE user_id = 123
)
GROUP BY user_id
)
)
ORDER BY distance
LIMIT 10;
Item-Based Recommendations
-- Item similarity
SELECT
a.item_id as item1,
b.item_id as item2,
count() / sqrt(a.count * b.count) as similarity
FROM (
SELECT item_id, user_id, count() as count
FROM user_items
GROUP BY item_id, user_id
) a
JOIN (
SELECT item_id, user_id, count() as count
FROM user_items
GROUP BY item_id, user_id
) b ON a.user_id = b.user_id
WHERE a.item_id < b.item_id
GROUP BY a.item_id, b.item_id
ORDER BY similarity DESC
LIMIT 100;
Anomaly Detection
Statistical Methods
-- Z-score based anomaly detection
SELECT
user_id,
amount,
avg_amount,
stddev_amount,
(amount - avg_amount) / stddev_amount as z_score
FROM (
SELECT
user_id,
amount,
avg(amount) OVER (PARTITION BY user_id) as avg_amount,
stddevPop(amount) OVER (PARTITION BY user_id) as stddev_amount
FROM transactions
)
WHERE abs(z_score) > 3;
-- Moving average deviation
SELECT
user_id,
timestamp,
amount,
avg_amount,
amount - avg_amount as deviation
FROM (
SELECT
user_id,
timestamp,
amount,
avg(amount) OVER (
PARTITION BY user_id
ORDER BY timestamp
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
) as avg_amount
FROM transactions
)
WHERE abs(amount - avg_amount) > avg_amount * 0.5;
Best Practices
Vector Search Optimization
-- Use appropriate vector dimensions
-- Smaller = faster, larger = more precise
-- Batch inserts
INSERT INTO embeddings VALUES
(1, 'text1', [0.1, ...]),
(2, 'text2', [0.2, ...]),
...; -- Batch 1000 at a time
Query Optimization
-- Use LIMIT with vector search
SELECT * FROM embeddings
ORDER BY arrayCosineDistance(embedding, query)
LIMIT 10; -- Always limit
-- Filter before distance calculation
SELECT * FROM embeddings
WHERE category = 'tech'
ORDER BY arrayCosineDistance(embedding, query)
LIMIT 10;
Integration with ML Libraries
Scikit-learn
import clickhouse_connect
from sklearn.model_selection import train_test_split
# Get features
client = clickhouse_connect.get_client()
df = client.query("""
SELECT * FROM user_features
""").result_set.to_pandas()
X = df.drop(columns=['target', 'user_id'])
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor()
model.fit(X_train, y_train)
# Predict
predictions = model.predict(X_test)
TensorFlow/PyTorch
import torch
import clickhouse_connect
# Load embeddings as tensors
client = clickhouse_connect.get_client()
embeddings = client.query("SELECT embedding FROM vectors").to_pandas()
embedding_matrix = torch.tensor(embeddings.values)
# Use in neural network
# embedding_layer = torch.nn.Embedding.from_pretrained(embedding_matrix)
Resources
Conclusion
ClickHouse provides a powerful platform for AI applications, combining analytical capabilities with vector search. From RAG pipelines to ML feature engineering, ClickHouse offers a unified solution for modern AI-powered applications.
In the next article, we’ll explore real-world ClickHouse use cases, including production patterns and implementation strategies.
Comments