Skip to main content
โšก Calmops

Cassandra for AI and Machine Learning Applications

Introduction

Cassandra’s high write throughput and linear scalability make it ideal for AI and machine learning applications. This article explores how Cassandra powers AI data pipelines, feature stores, and real-time analytics.


Time-Series Data Storage

Cassandra excels at storing time-series data:

-- Time-series table
CREATE TABLE sensor_data (
    sensor_id TEXT,
    timestamp TIMESTAMP,
    temperature DECIMAL,
    humidity DECIMAL,
    pressure DECIMAL,
    PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'DAYS',
    'compaction_window_size': 1
};

-- Query recent data
SELECT * FROM sensor_data 
WHERE sensor_id = 'sensor-001' 
AND timestamp > now() - 24h;

IoT Data Ingestion

from cassandra.cluster import Cluster
from datetime import datetime

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('iot_db')

# Batch insert sensor data
import time

def insert_sensor_data(sensor_id, readings):
    batch = BatchStatement()
    
    for reading in readings:
        batch.add(
            """
            INSERT INTO sensor_data 
            (sensor_id, timestamp, temperature, humidity, pressure)
            VALUES (?, ?, ?, ?, ?)
            """,
            (sensor_id, datetime.now(), reading['temp'], reading['humidity'], reading['pressure'])
        )
    
    session.execute(batch)

# Simulate high-throughput ingestion
while True:
    readings = generate_readings()
    insert_sensor_data('sensor-001', readings)
    time.sleep(0.1)  # 10 writes per second

Feature Store with Cassandra

Cassandra can serve as an ML feature store:

-- Feature store table
CREATE TABLE user_features (
    feature_key TEXT,
    user_id UUID,
    feature_name TEXT,
    feature_value TEXT,
    updated_at TIMESTAMP,
    PRIMARY KEY ((feature_key, user_id), feature_name)
);

-- Store features
INSERT INTO user_features 
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_profile', 123e4567-e89b-12d3-a456-426614174000, 'age', '30', now());

INSERT INTO user_features 
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_profile', 123e4567-e89b-12d3-a456-426614174000, 'country', 'USA', now());

INSERT INTO user_features 
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_profile', 123e4567-e89b-12d3-a456-426614174000, 'last_login_days', '5', now());

Python Feature Store

class FeatureStore:
    def __init__(self, session):
        self.session = session
    
    def store_features(self, user_id, features):
        """Store precomputed features"""
        import json
        
        batch = BatchStatement()
        for name, value in features.items():
            batch.add(
                """
                INSERT INTO user_features 
                (feature_key, user_id, feature_name, feature_value, updated_at)
                VALUES ('user_features', ?, ?, ?, ?, now())
                """,
                (user_id, name, str(value))
            )
        self.session.execute(batch)
    
    def get_features(self, user_id):
        """Retrieve features for inference"""
        rows = self.session.execute(
            """
            SELECT feature_name, feature_value 
            FROM user_features 
            WHERE feature_key = 'user_features' AND user_id = ?
            """,
            (user_id,)
        )
        
        return {
            row.feature_name: row.feature_value 
            for row in rows
        }
    
    def get_batch_features(self, user_ids):
        """Batch retrieve features"""
        rows = self.session.execute(
            """
            SELECT user_id, feature_name, feature_value 
            FROM user_features 
            WHERE feature_key = 'user_features' 
            AND user_id IN (%s)
            """ % ','.join(['?' for _ in user_ids]),
            user_ids
        )
        
        features = {}
        for row in rows:
            if row.user_id not in features:
                features[row.user_id] = {}
            features[row.user_id][row.feature_name] = row.feature_value
        
        return features

Real-Time Analytics

Streaming Data Processing

from cassandra.cluster import Cluster
import json

class AnalyticsProcessor:
    def __init__(self):
        self.cluster = Cluster(['127.0.0.1'])
        self.session = self.cluster.connect('analytics')
    
    def process_event(self, event):
        """Process incoming event"""
        import time
        from datetime import datetime
        
        # Extract features
        user_id = event['user_id']
        event_type = event['type']
        timestamp = datetime.fromtimestamp(event['timestamp'])
        
        # Update aggregations
        self.session.execute(
            """
            INSERT INTO user_events (user_id, event_type, timestamp, count)
            VALUES (%s, %s, %s, 1)
            """,
            (user_id, event_type, timestamp)
        )
        
        # Update real-time metrics
        self.session.execute(
            """
            INSERT INTO event_metrics (event_type, bucket, count)
            VALUES (%s, dateof(now()), 1)
            """,
            (event_type,)
        )
    
    def get_user_stats(self, user_id):
        """Get user statistics"""
        rows = self.session.execute(
            """
            SELECT event_type, count(*) as total
            FROM user_events
            WHERE user_id = ?
            GROUP BY event_type
            """,
            (user_id,)
        )
        
        return {row.event_type: row.total for row in rows}

Aggregations Table

-- Event metrics
CREATE TABLE event_metrics (
    event_type TEXT,
    bucket DATE,
    count COUNTER,
    PRIMARY KEY ((event_type), bucket)
);

-- User events aggregation
CREATE TABLE user_events (
    user_id UUID,
    event_type TEXT,
    timestamp TIMESTAMP,
    count COUNTER,
    PRIMARY KEY ((user_id, event_type), timestamp)
);

ML Model Storage

Model Metadata

-- Model registry
CREATE TABLE ml_models (
    model_id UUID PRIMARY KEY,
    model_name TEXT,
    model_type TEXT,
    version TEXT,
    framework TEXT,
    parameters MAP<TEXT, TEXT>,
    metrics MAP<TEXT, DOUBLE>,
    trained_at TIMESTAMP,
    is_production BOOLEAN,
    created_at TIMESTAMP DEFAULT now()
);

-- Model artifacts (metadata only, files in object storage)
CREATE TABLE model_artifacts (
    model_id UUID PRIMARY KEY,
    model_name TEXT,
    artifact_path TEXT,
    created_at TIMESTAMP DEFAULT now()
);

Python: Model Registry

class ModelRegistry:
    def __init__(self, session):
        self.session = session
    
    def register_model(self, name, model_type, version, framework, parameters, metrics, is_production=False):
        """Register a new model"""
        import uuid
        
        model_id = uuid.uuid4()
        
        self.session.execute(
            """
            INSERT INTO ml_models 
            (model_id, model_name, model_type, version, framework, parameters, metrics, trained_at, is_production)
            VALUES (%s, %s, %s, %s, %s, %s, %s, now(), %s)
            """,
            (model_id, name, model_type, version, framework, 
             str(parameters), str(metrics), is_production)
        )
        
        return model_id
    
    def get_production_model(self, model_name):
        """Get current production model"""
        rows = self.session.execute(
            """
            SELECT * FROM ml_models 
            WHERE model_name = %s AND is_production = true
            """,
            (model_name,)
        )
        
        return rows.one()
    
    def get_model_versions(self, model_name):
        """Get all versions of a model"""
        rows = self.session.execute(
            """
            SELECT * FROM ml_models 
            WHERE model_name = %s
            ORDER BY trained """,
            (_at DESC
           model_name,)
        )
        
        return list(rows)

Real-Time Recommendations

User Activity Tracking

-- User activity stream
CREATE TABLE user_activity (
    user_id UUID,
    session_id UUID,
    timestamp TIMESTAMP,
    event_type TEXT,
    item_id UUID,
    metadata MAP<TEXT, TEXT>,
    PRIMARY KEY ((user_id, session_id), timestamp, event_type)
) WITH CLUSTERING ORDER BY (timestamp DESC);

-- Item recommendations
CREATE TABLE user_recommendations (
    user_id UUID,
    item_id UUID,
    score DOUBLE,
    model_version TEXT,
    updated_at TIMESTAMP,
    PRIMARY KEY (user_id, score)
) WITH CLUSTERING ORDER BY (score DESC);

Python: Recommendation Engine

class RecommendationEngine:
    def __init__(self, session):
        self.session = session
    
    def track_activity(self, user_id, session_id, event_type, item_id, metadata=None):
        """Track user activity"""
        import json
        from datetime import datetime
        
        self.session.execute(
            """
            INSERT INTO user_activity 
            (user_id, session_id, timestamp, event_type, item_id, metadata)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (user_id, session_id, datetime.now(), event_type, item_id, 
             json.dumps(metadata) if metadata else None)
        )
    
    def get_recommendations(self, user_id, limit=10):
        """Get recommendations for user"""
        rows = self.session.execute(
            """
            SELECT item_id, score 
            FROM user_recommendations
            WHERE user_id = ?
            LIMIT ?
            """,
            (user_id, limit)
        )
        
        return [(row.item_id, row.score) for row in rows]
    
    def update_recommendations(self, user_id, recommendations, model_version):
        """Update recommendations for user"""
        batch = BatchStatement()
        
        # Clear old recommendations
        batch.add(
            "DELETE FROM user_recommendations WHERE user_id = %s",
            (user_id,)
        )
        
        # Add new recommendations
        for item_id, score in recommendations:
            batch.add(
                """
                INSERT INTO user_recommendations 
                (user_id, item_id, score, model_version, updated_at)
                VALUES (?, ?, ?, ?, now())
                """,
                (user_id, item_id, score, model_version)
            )
        
        self.session.execute(batch)

Monitoring ML Workloads

-- Track prediction latency
CREATE TABLE prediction_metrics (
    model_name TEXT,
    timestamp TIMESTAMP,
    latency_ms DOUBLE,
    success BOOLEAN,
    PRIMARY KEY ((model_name), timestamp)
);

-- Feature store metrics
CREATE TABLE feature_store_metrics (
    feature_name TEXT,
    timestamp TIMESTAMP,
    requests BIGINT,
    hits BIGINT,
    misses BIGINT,
    PRIMARY KEY ((feature_name), timestamp)
);

Best Practices

Performance Optimization

-- Batch non-logged operations
BEGIN BATCH
INSERT INTO events (user_id, event) VALUES (1, 'click');
INSERT INTO events (user_id, event) VALUES (2, 'view');
APPLY BATCH;

-- Use prepared statements
prepared = session.prepare("""
    INSERT INTO events (user_id, event) VALUES (?, ?)
""")

Data Modeling for ML

-- Design for query patterns
-- Denormalize if needed
-- Pre-compute aggregations
-- Use appropriate compaction

Conclusion

Cassandra’s high write throughput and distributed architecture make it ideal for AI applications. From time-series data ingestion to feature stores and real-time recommendations, Cassandra powers modern ML data pipelines.

In the final article, we’ll explore real-world production use cases for Cassandra across different industries.

Comments