Skip to main content
โšก Calmops

ClickHouse for AI: Vector Search, RAG Pipelines, and ML Integration

Introduction

ClickHouse’s recent addition of vector similarity search enables AI applications directly within your analytical infrastructure. Combined with its existing strengths in analytics, ClickHouse becomes a powerful platform for building AI-powered applications including retrieval-augmented generation (RAG) systems.

This guide explores how to leverage ClickHouse for AI applications, from embedding storage to production ML pipelines.


Getting Started

-- Create table with vector column (25.x)
CREATE TABLE embeddings (
    id UInt64,
    document_id UInt64,
    text String,
    embedding Array(Float32)  -- e.g., 384 dimensions
) ENGINE = MergeTree()
ORDER BY document_id;

-- Insert vector data
INSERT INTO embeddings VALUES
    (1, 1, 'Python tutorial', [0.1, 0.2, 0.3, ...]),
    (2, 1, 'Learn Python', [0.15, 0.25, 0.35, ...]),
    (3, 2, 'JavaScript guide', [0.8, 0.1, 0.05, ...]);

Vector Index

-- Add vector similarity index (25.x)
ALTER TABLE embeddings 
ADD INDEX vec_idx embedding 
TYPE vector_similarity('metric=cosine')
GRANULARITY 1;

-- Or without index (full scan)
SELECT 
    id,
    text,
    arrayCosineDistance(embedding, [0.1, 0.2, ...]) as distance
FROM embeddings
ORDER BY distance
LIMIT 5;

Python Integration

import clickhouse_connect
import numpy as np
from sentence_transformers import SentenceTransformer

class ClickHouseVectorStore:
    """Vector store using ClickHouse."""
    
    def __init__(self, host='localhost', port=8123):
        self.client = clickhouse_connect.get_client(
            host=host,
            port=port
        )
        self._create_table()
    
    def _create_table(self):
        """Create vector table."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS vectors (
                id UInt64,
                text String,
                embedding Array(Float32)
            ) ENGINE = MergeTree()
            ORDER BY id
        """)
    
    def add_documents(self, documents):
        """Add documents with embeddings."""
        model = SentenceTransformer('all-MiniLM-L6-v2')
        
        for i, text in enumerate(documents):
            embedding = model.encode(text).tolist()
            self.client.command(
                "INSERT INTO vectors VALUES",
                [[i, text, embedding]]
            )
    
    def search(self, query, top_k=5):
        """Search for similar documents."""
        model = SentenceTransformer('all-MiniLM-L6-v2')
        query_vector = model.encode(query).tolist()
        
        results = self.client.query(f"""
            SELECT id, text,
                arrayCosineDistance(embedding, {query_vector}) as distance
            FROM vectors
            ORDER BY distance
            LIMIT {top_k}
        """)
        
        return [
            {'id': r[0], 'text': r[1], 'distance': r[2]}
            for r in results.result_rows
        ]

# Usage
store = ClickHouseVectorStore()
store.add_documents([
    "Python is a great programming language",
    "Machine learning is fascinating",
    "Data science combines programming and statistics"
])

results = store.search("What is programming?")
for r in results:
    print(f"Text: {r['text']}, Distance: {r['distance']:.4f}")

Building RAG Pipelines

Complete RAG Implementation

import clickhouse_connect
from sentence_transformers import SentenceTransformer
import openai

class ClickHouseRAG:
    """RAG pipeline with ClickHouse."""
    
    def __init__(self, config):
        self.client = clickhouse_connect.get_client(**config)
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self._init_schema()
    
    def _init_schema(self):
        """Initialize database schema."""
        # Chunks table
        self.client.command("""
            CREATE TABLE IF NOT EXISTS chunks (
                id UInt64,
                document_id UInt64,
                chunk_text String,
                chunk_index UInt32,
                embedding Array(Float32)
            ) ENGINE = MergeTree()
            ORDER BY (document_id, chunk_index)
        """)
        
        # Documents table
        self.client.command("""
            CREATE TABLE IF NOT EXISTS documents (
                id UInt64,
                title String,
                source String,
                created_at DateTime DEFAULT now()
            ) ENGINE = MergeTree()
            ORDER BY id
        """)
    
    def ingest_document(self, document_id, title, source, text, chunk_size=500):
        """Ingest document with chunking."""
        # Split into chunks
        chunks = [text[i:i+chunk_size] 
                  for i in range(0, len(text), chunk_size)]
        
        # Insert document metadata
        self.client.command(
            "INSERT INTO documents VALUES",
            [[document_id, title, source]]
        )
        
        # Generate embeddings and insert chunks
        for i, chunk in enumerate(chunks):
            embedding = self.model.encode(chunk).tolist()
            self.client.command(
                "INSERT INTO chunks VALUES",
                [[i, document_id, chunk, i, embedding]]
            )
    
    def retrieve(self, query, top_k=5):
        """Retrieve relevant context."""
        query_vector = self.model.encode(query).tolist()
        
        results = self.client.query(f"""
            SELECT 
                chunk_text,
                document_id,
                arrayCosineDistance(embedding, {query_vector}) as distance
            FROM chunks
            ORDER BY distance
            LIMIT {top_k}
        """)
        
        return [
            {'text': r[0], 'doc_id': r[1], 'distance': r[2]}
            for r in results.result_rows
        ]
    
    def answer(self, question, max_context=3000):
        """Answer question using RAG."""
        # Retrieve relevant chunks
        chunks = self.retrieve(question, top_k=10)
        
        # Build context
        context = ""
        for chunk in chunks:
            if len(context) + len(chunk['text']) > max_context:
                break
            context += chunk['text'] + "\n\n"
        
        # Generate answer
        prompt = f"""Based on the following context, answer the question.

Context:
{context}

Question: {question}

Answer:"""
        
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content
    
    def close(self):
        """Close connection."""
        self.client.close()

# Usage
rag = ClickHouseRAG({'host': 'localhost', 'port': 8123})

# Ingest document
with open('article.txt') as f:
    rag.ingest_document(1, 'Article', 'article.txt', f.read())

# Answer question
answer = rag.answer("What is the main topic?")
print(answer)

rag.close()

ML Feature Engineering

Feature Creation

import clickhouse_connect

class FeatureEngineering:
    """Feature engineering with ClickHouse."""
    
    def __init__(self, config):
        self.client = clickhouse_connect.get_client(**config)
    
    def create_user_features(self):
        """Create user-level features from events."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS user_features AS
            SELECT 
                user_id,
                count() as total_events,
                uniqExact(session_id) as unique_sessions,
                avg(duration) as avg_duration,
                min(event_time) as first_event,
                max(event_time) as last_event,
                sum(case when action = 'purchase' then 1 else 0 end) as purchase_count,
                sum(amount) as total_amount,
                stddevPop(amount) as amount_stddev
            FROM events
            GROUP BY user_id
        """)
    
    def create_time_features(self):
        """Create time-based features."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS time_features AS
            SELECT 
                user_id,
                toHour(event_time) as hour,
                toDayOfWeek(event_time) as day_of_week,
                toMonth(event_time) as month,
                count() as event_count
            FROM events
            GROUP BY user_id, hour, day_of_week, month
        """)
    
    def create_aggregation_features(self):
        """Create aggregation features."""
        self.client.command("""
            CREATE TABLE IF NOT EXISTS agg_features AS
            SELECT 
                user_id,
                -- Rolling aggregations
                anyLastIf(amount, action = 'view') as last_viewed_amount,
                sumIf(amount, action = 'purchase') as total_purchases,
                avgIf(amount, action = 'view') as avg_viewed_amount,
                
                -- Sequence features
                groupArray(1)(amount) as amounts,
                sequenceCount('(?1)(?2)(?3)')(
                    action, 
                    action = 'view', 
                    action = 'add_to_cart',
                    action = 'purchase'
                ) as conversion_funnel
            FROM events
            GROUP BY user_id
        """)

# Usage
features = FeatureEngineering({'host': 'localhost'})
features.create_user_features()

Export for ML

def export_for_ml(self, table, target_column):
    """Export features for ML training."""
    df = self.client.query(f"""
        SELECT * FROM {table}
        WHERE {target_column} IS NOT NULL
    """).result_set.to_pandas()
    
    return df

# Export
df = export_for_ml('user_features', 'purchase_count')
X = df.drop(columns=['purchase_count', 'user_id'])
y = df['purchase_count']

Recommendation Systems

Collaborative Filtering

-- User-item interactions
CREATE TABLE user_items (
    user_id UInt32,
    item_id UInt32,
    rating Float32,
    timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (user_id, timestamp);

-- Similar users
SELECT 
    user_id,
    neighbor_user_id,
    distance
FROM (
    SELECT 
        user_id,
        user_id as neighbor_user_id,
        0 as distance
    FROM user_items
    WHERE user_id = 123
    
    UNION ALL
    
    SELECT 
        123 as user_id,
        user_id as neighbor_user_id,
        1 - correlation(other_ratings, my_ratings) as distance
    FROM (
        SELECT 
            user_id,
            groupArray(rating) as other_ratings
        FROM user_items
        WHERE item_id IN (
            SELECT item_id FROM user_items WHERE user_id = 123
        )
        GROUP BY user_id
    )
)
ORDER BY distance
LIMIT 10;

Item-Based Recommendations

-- Item similarity
SELECT 
    a.item_id as item1,
    b.item_id as item2,
    count() / sqrt(a.count * b.count) as similarity
FROM (
    SELECT item_id, user_id, count() as count
    FROM user_items
    GROUP BY item_id, user_id
) a
JOIN (
    SELECT item_id, user_id, count() as count
    FROM user_items
    GROUP BY item_id, user_id
) b ON a.user_id = b.user_id
WHERE a.item_id < b.item_id
GROUP BY a.item_id, b.item_id
ORDER BY similarity DESC
LIMIT 100;

Anomaly Detection

Statistical Methods

-- Z-score based anomaly detection
SELECT 
    user_id,
    amount,
    avg_amount,
    stddev_amount,
    (amount - avg_amount) / stddev_amount as z_score
FROM (
    SELECT 
        user_id,
        amount,
        avg(amount) OVER (PARTITION BY user_id) as avg_amount,
        stddevPop(amount) OVER (PARTITION BY user_id) as stddev_amount
    FROM transactions
)
WHERE abs(z_score) > 3;

-- Moving average deviation
SELECT 
    user_id,
    timestamp,
    amount,
    avg_amount,
    amount - avg_amount as deviation
FROM (
    SELECT 
        user_id,
        timestamp,
        amount,
        avg(amount) OVER (
            PARTITION BY user_id 
            ORDER BY timestamp 
            ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
        ) as avg_amount
    FROM transactions
)
WHERE abs(amount - avg_amount) > avg_amount * 0.5;

Best Practices

Vector Search Optimization

-- Use appropriate vector dimensions
-- Smaller = faster, larger = more precise

-- Batch inserts
INSERT INTO embeddings VALUES
    (1, 'text1', [0.1, ...]),
    (2, 'text2', [0.2, ...]),
    ...;  -- Batch 1000 at a time

Query Optimization

-- Use LIMIT with vector search
SELECT * FROM embeddings
ORDER BY arrayCosineDistance(embedding, query)
LIMIT 10;  -- Always limit

-- Filter before distance calculation
SELECT * FROM embeddings
WHERE category = 'tech'
ORDER BY arrayCosineDistance(embedding, query)
LIMIT 10;

Integration with ML Libraries

Scikit-learn

import clickhouse_connect
from sklearn.model_selection import train_test_split

# Get features
client = clickhouse_connect.get_client()
df = client.query("""
    SELECT * FROM user_features
""").result_set.to_pandas()

X = df.drop(columns=['target', 'user_id'])
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Train model
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor()
model.fit(X_train, y_train)

# Predict
predictions = model.predict(X_test)

TensorFlow/PyTorch

import torch
import clickhouse_connect

# Load embeddings as tensors
client = clickhouse_connect.get_client()
embeddings = client.query("SELECT embedding FROM vectors").to_pandas()

embedding_matrix = torch.tensor(embeddings.values)

# Use in neural network
# embedding_layer = torch.nn.Embedding.from_pretrained(embedding_matrix)

Resources


Conclusion

ClickHouse provides a powerful platform for AI applications, combining analytical capabilities with vector search. From RAG pipelines to ML feature engineering, ClickHouse offers a unified solution for modern AI-powered applications.

In the next article, we’ll explore real-world ClickHouse use cases, including production patterns and implementation strategies.

Comments