Skip to main content

ClickHouse for AI: Vector Search, RAG Pipelines, and ML Integration

Created: March 5, 2026 CalmOps 7 min read

Introduction

ClickHouse’s recent addition of vector similarity search enables AI applications directly within your analytical infrastructure. Combined with its existing strengths in analytics, ClickHouse becomes a powerful platform for building AI-powered applications including retrieval-augmented generation (RAG) systems.

This guide explores how to leverage ClickHouse for AI applications, from embedding storage to production ML pipelines.


Getting Started

-- Create table with vector column (25.x)
CREATE TABLE embeddings (
    id UInt64,
    document_id UInt64,
    text String,
    embedding Array(Float32)  -- e.g., 384 dimensions
) ENGINE = MergeTree()
ORDER BY document_id;

-- Insert vector data
INSERT INTO embeddings VALUES
    (1, 1, 'Python tutorial', [0.1, 0.2, 0.3, ...]),
    (2, 1, 'Learn Python', [0.15, 0.25, 0.35, ...]),
    (3, 2, 'JavaScript guide', [0.8, 0.1, 0.05, ...]);

Vector Index

-- Add vector similarity index (25.x)
ALTER TABLE embeddings 
ADD INDEX vec_idx embedding 
TYPE vector_similarity('metric=cosine')
GRANULARITY 1;

-- Or without index (full scan)
SELECT 
    id,
    text,
    arrayCosineDistance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;

Python Integration

import clickhouse_connect
import numpy as np
from sentence_transformers import SentenceTransformer

class ClickHouseVectorStore:
    """Vector store using ClickHouse."""
    
    def __init__(self, host='localhost', port=8123):
        self.client = clickhouse_connect.get_client(
            host=host,
            port=port
        )
        self._create_table()
    
    def _create_table(self):
        """Create vector table."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS vectors (
                id UInt64,
                text String,
                embedding Array(Float32)
            ) ENGINE = MergeTree()
            ORDER BY id
        """)
    
    def add_documents(self, documents):
        """Add documents with embeddings."""
        model = SentenceTransformer('all-MiniLM-L6-v2')
        
        for i, text in enumerate(documents):
            embedding = model.encode(text).tolist()
            self.client.command(
                "INSERT INTO vectors VALUES",
                [[i, text, embedding]]
            )
    
    def search(self, query, top_k=5):
        """Search for similar documents."""
        model = SentenceTransformer('all-MiniLM-L6-v2')
        query_vector = model.encode(query).tolist()
        
        results = self.client.query(f"""
            SELECT id, text,
                arrayCosineDistance(embedding, {query_vector}) as distance
            FROM vectors
            ORDER BY distance
            LIMIT {top_k}
        """)
        
        return [
            {'id': r[0], 'text': r[1], 'distance': r[2]}
            for r in results.result_rows
        ]

# Usage
store = ClickHouseVectorStore()
store.add_documents([
    "Python is a great programming language",
    "Machine learning is fascinating",
    "Data science combines programming and statistics"
])

results = store.search("What is programming?")
for r in results:
    print(f"Text: {r['text']}, Distance: {r['distance']:.4f}")

Building RAG Pipelines

Complete RAG Implementation

import clickhouse_connect
from sentence_transformers import SentenceTransformer
import openai

class ClickHouseRAG:
    """RAG pipeline with ClickHouse."""
    
    def __init__(self, config):
        self.client = clickhouse_connect.get_client(**config)
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self._init_schema()
    
    def _init_schema(self):
        """Initialize database schema."""
        # Chunks table
        self.client.command("""
            CREATE TABLE IF NOT EXISTS chunks (
                id UInt64,
                document_id UInt64,
                chunk_text String,
                chunk_index UInt32,
                embedding Array(Float32)
            ) ENGINE = MergeTree()
            ORDER BY (document_id, chunk_index)
        """)
        
        # Documents table
        self.client.command("""
            CREATE TABLE IF NOT EXISTS documents (
                id UInt64,
                title String,
                source String,
                created_at DateTime DEFAULT now()
            ) ENGINE = MergeTree()
            ORDER BY id
        """)
    
    def ingest_document(self, document_id, title, source, text, chunk_size=500):
        """Ingest document with chunking."""
        # Split into chunks
        chunks = [text[i:i+chunk_size] 
                  for i in range(0, len(text), chunk_size)]
        
        # Insert document metadata
        self.client.command(
            "INSERT INTO documents VALUES",
            [[document_id, title, source]]
        )
        
        # Generate embeddings and insert chunks
        for i, chunk in enumerate(chunks):
            embedding = self.model.encode(chunk).tolist()
            self.client.command(
                "INSERT INTO chunks VALUES",
                [[i, document_id, chunk, i, embedding]]
            )
    
    def retrieve(self, query, top_k=5):
        """Retrieve relevant context."""
        query_vector = self.model.encode(query).tolist()
        
        results = self.client.query(f"""
            SELECT 
                chunk_text,
                document_id,
                arrayCosineDistance(embedding, {query_vector}) as distance
            FROM chunks
            ORDER BY distance
            LIMIT {top_k}
        """)
        
        return [
            {'text': r[0], 'doc_id': r[1], 'distance': r[2]}
            for r in results.result_rows
        ]
    
    def answer(self, question, max_context=3000):
        """Answer question using RAG."""
        # Retrieve relevant chunks
        chunks = self.retrieve(question, top_k=10)
        
        # Build context
        context = ""
        for chunk in chunks:
            if len(context) + len(chunk['text']) > max_context:
                break
            context += chunk['text'] + "\n\n"
        
        # Generate answer
        prompt = f"""Based on the following context, answer the question.

Context:
{context}

Question: {question}

Answer:"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content
    
    def close(self):
        """Close connection."""
        self.client.close()

# Usage
rag = ClickHouseRAG({'host': 'localhost', 'port': 8123})

# Ingest document
with open('article.txt') as f:
    rag.ingest_document(1, 'Article', 'article.txt', f.read())

# Answer question
answer = rag.answer("What is the main topic?")
print(answer)

rag.close()

ML Feature Engineering

Feature Creation

import clickhouse_connect

class FeatureEngineering:
    """Feature engineering with ClickHouse."""
    
    def __init__(self, config):
        self.client = clickhouse_connect.get_client(**config)
    
    def create_user_features(self):
        """Create user-level features from events."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS user_features AS
            SELECT 
                user_id,
                count() as total_events,
                uniqExact(session_id) as unique_sessions,
                avg(duration) as avg_duration,
                min(event_time) as first_event,
                max(event_time) as last_event,
                sum(case when action = 'purchase' then 1 else 0 end) as purchase_count,
                sum(amount) as total_amount,
                stddevPop(amount) as amount_stddev
            FROM events
            GROUP BY user_id
        """)
    
    def create_time_features(self):
        """Create time-based features."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS time_features AS
            SELECT 
                user_id,
                toHour(event_time) as hour,
                toDayOfWeek(event_time) as day_of_week,
                toMonth(event_time) as month,
                count() as event_count
            FROM events
            GROUP BY user_id, hour, day_of_week, month
        """)
    
    def create_aggregation_features(self):
        """Create aggregation features."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS agg_features AS
            SELECT 
                user_id,
                -- Rolling aggregations
                anyLastIf(amount, action = 'view') as last_viewed_amount,
                sumIf(amount, action = 'purchase') as total_purchases,
                avgIf(amount, action = 'view') as avg_viewed_amount,
                
                -- Sequence features
                groupArray(1)(amount) as amounts,
                sequenceCount('(?1)(?2)(?3)')(
                    action, 
                    action = 'view', 
                    action = 'add_to_cart',
                    action = 'purchase'
                ) as conversion_funnel
            FROM events
            GROUP BY user_id
        """)

# Usage
features = FeatureEngineering({'host': 'localhost'})
features.create_user_features()

Export for ML

def export_for_ml(self, table, target_column):
    """Export features for ML training."""
    df = self.client.query(f"""
        SELECT * FROM {table}
        WHERE {target_column} IS NOT NULL
    """).result_set.to_pandas()
    
    return df

# Export
df = export_for_ml('user_features', 'purchase_count')
X = df.drop(columns=['purchase_count', 'user_id'])
y = df['purchase_count']

Recommendation Systems

Collaborative Filtering

-- User-item interactions
CREATE TABLE user_items (
    user_id UInt32,
    item_id UInt32,
    rating Float32,
    timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (user_id, timestamp);

-- Similar users
SELECT 
    user_id,
    neighbor_user_id,
    distance
FROM (
    SELECT 
        user_id,
        user_id as neighbor_user_id,
        0 as distance
    FROM user_items
    WHERE user_id = 123
    
    UNION ALL
    
    SELECT 
        123 as user_id,
        user_id as neighbor_user_id,
        1 - correlation(other_ratings, my_ratings) as distance
    FROM (
        SELECT 
            user_id,
            groupArray(rating) as other_ratings
        FROM user_items
        WHERE item_id IN (
            SELECT item_id FROM user_items WHERE user_id = 123
        )
        GROUP BY user_id
    )
)
ORDER BY distance
LIMIT 10;

Item-Based Recommendations

-- Item similarity
SELECT 
    a.item_id as item1,
    b.item_id as item2,
    count() / sqrt(a.count * b.count) as similarity
FROM (
    SELECT item_id, user_id, count() as count
    FROM user_items
    GROUP BY item_id, user_id
) a
JOIN (
    SELECT item_id, user_id, count() as count
    FROM user_items
    GROUP BY item_id, user_id
) b ON a.user_id = b.user_id
WHERE a.item_id < b.item_id
GROUP BY a.item_id, b.item_id
ORDER BY similarity DESC
LIMIT 100;

Anomaly Detection

Statistical Methods

-- Z-score based anomaly detection
SELECT 
    user_id,
    amount,
    avg_amount,
    stddev_amount,
    (amount - avg_amount) / stddev_amount as z_score
FROM (
    SELECT 
        user_id,
        amount,
        avg(amount) OVER (PARTITION BY user_id) as avg_amount,
        stddevPop(amount) OVER (PARTITION BY user_id) as stddev_amount
    FROM transactions
)
WHERE abs(z_score) > 3;

-- Moving average deviation
SELECT 
    user_id,
    timestamp,
    amount,
    avg_amount,
    amount - avg_amount as deviation
FROM (
    SELECT 
        user_id,
        timestamp,
        amount,
        avg(amount) OVER (
            PARTITION BY user_id 
            ORDER BY timestamp 
            ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
        ) as avg_amount
    FROM transactions
)
WHERE abs(amount - avg_amount) > avg_amount * 0.5;

Best Practices

Vector Search Optimization

-- Use appropriate vector dimensions
-- Smaller = faster, larger = more precise

-- Batch inserts
INSERT INTO embeddings VALUES
    (1, 'text1', [0.1, ...]),
    (2, 'text2', [0.2, ...]),
    ...;  -- Batch 1000 at a time

Query Optimization

-- Use LIMIT with vector search
SELECT * FROM embeddings
ORDER BY arrayCosineDistance(embedding, query)
LIMIT 10;  -- Always limit

-- Filter before distance calculation
SELECT * FROM embeddings
WHERE category = 'tech'
ORDER BY arrayCosineDistance(embedding, query)
LIMIT 10;

Integration with ML Libraries

Scikit-learn

import clickhouse_connect
from sklearn.model_selection import train_test_split

# Get features
client = clickhouse_connect.get_client()
df = client.query("""
    SELECT * FROM user_features
""").result_set.to_pandas()

X = df.drop(columns=['target', 'user_id'])
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Train model
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor()
model.fit(X_train, y_train)

# Predict
predictions = model.predict(X_test)

TensorFlow/PyTorch

import torch
import clickhouse_connect

# Load embeddings as tensors
client = clickhouse_connect.get_client()
embeddings = client.query("SELECT embedding FROM vectors").to_pandas()

embedding_matrix = torch.tensor(embeddings.values)

# Use in neural network
# embedding_layer = torch.nn.Embedding.from_pretrained(embedding_matrix)

Resources


Conclusion

ClickHouse provides a powerful platform for AI applications, combining analytical capabilities with vector search. From RAG pipelines to ML feature engineering, ClickHouse offers a unified solution for modern AI-powered applications.

In the next article, we’ll explore real-world ClickHouse use cases, including production patterns and implementation strategies.

Comments

Share this article

Scan to read on mobile