Introduction
Cassandra’s high write throughput and linear scalability make it ideal for AI and machine learning applications. This article explores how Cassandra powers AI data pipelines, feature stores, and real-time analytics.
Time-Series Data Storage
Cassandra excels at storing time-series data:
-- Time-series table
CREATE TABLE sensor_data (
sensor_id TEXT,
timestamp TIMESTAMP,
temperature DECIMAL,
humidity DECIMAL,
pressure DECIMAL,
PRIMARY KEY (sensor_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1
};
-- Query recent data
SELECT * FROM sensor_data
WHERE sensor_id = 'sensor-001'
AND timestamp > now() - 24h;
IoT Data Ingestion
from cassandra.cluster import Cluster
from datetime import datetime
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('iot_db')
# Batch insert sensor data
import time
def insert_sensor_data(sensor_id, readings):
batch = BatchStatement()
for reading in readings:
batch.add(
"""
INSERT INTO sensor_data
(sensor_id, timestamp, temperature, humidity, pressure)
VALUES (?, ?, ?, ?, ?)
""",
(sensor_id, datetime.now(), reading['temp'], reading['humidity'], reading['pressure'])
)
session.execute(batch)
# Simulate high-throughput ingestion
while True:
readings = generate_readings()
insert_sensor_data('sensor-001', readings)
time.sleep(0.1) # 10 writes per second
Feature Store with Cassandra
Cassandra can serve as an ML feature store:
-- Feature store table
CREATE TABLE user_features (
feature_key TEXT,
user_id UUID,
feature_name TEXT,
feature_value TEXT,
updated_at TIMESTAMP,
PRIMARY KEY ((feature_key, user_id), feature_name)
);
-- Store features
INSERT INTO user_features
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_profile', 123e4567-e89b-12d3-a456-426614174000, 'age', '30', now());
INSERT INTO user_features
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_profile', 123e4567-e89b-12d3-a456-426614174000, 'country', 'USA', now());
INSERT INTO user_features
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_profile', 123e4567-e89b-12d3-a456-426614174000, 'last_login_days', '5', now());
Python Feature Store
class FeatureStore:
def __init__(self, session):
self.session = session
def store_features(self, user_id, features):
"""Store precomputed features"""
import json
batch = BatchStatement()
for name, value in features.items():
batch.add(
"""
INSERT INTO user_features
(feature_key, user_id, feature_name, feature_value, updated_at)
VALUES ('user_features', ?, ?, ?, ?, now())
""",
(user_id, name, str(value))
)
self.session.execute(batch)
def get_features(self, user_id):
"""Retrieve features for inference"""
rows = self.session.execute(
"""
SELECT feature_name, feature_value
FROM user_features
WHERE feature_key = 'user_features' AND user_id = ?
""",
(user_id,)
)
return {
row.feature_name: row.feature_value
for row in rows
}
def get_batch_features(self, user_ids):
"""Batch retrieve features"""
rows = self.session.execute(
"""
SELECT user_id, feature_name, feature_value
FROM user_features
WHERE feature_key = 'user_features'
AND user_id IN (%s)
""" % ','.join(['?' for _ in user_ids]),
user_ids
)
features = {}
for row in rows:
if row.user_id not in features:
features[row.user_id] = {}
features[row.user_id][row.feature_name] = row.feature_value
return features
Real-Time Analytics
Streaming Data Processing
from cassandra.cluster import Cluster
import json
class AnalyticsProcessor:
def __init__(self):
self.cluster = Cluster(['127.0.0.1'])
self.session = self.cluster.connect('analytics')
def process_event(self, event):
"""Process incoming event"""
import time
from datetime import datetime
# Extract features
user_id = event['user_id']
event_type = event['type']
timestamp = datetime.fromtimestamp(event['timestamp'])
# Update aggregations
self.session.execute(
"""
INSERT INTO user_events (user_id, event_type, timestamp, count)
VALUES (%s, %s, %s, 1)
""",
(user_id, event_type, timestamp)
)
# Update real-time metrics
self.session.execute(
"""
INSERT INTO event_metrics (event_type, bucket, count)
VALUES (%s, dateof(now()), 1)
""",
(event_type,)
)
def get_user_stats(self, user_id):
"""Get user statistics"""
rows = self.session.execute(
"""
SELECT event_type, count(*) as total
FROM user_events
WHERE user_id = ?
GROUP BY event_type
""",
(user_id,)
)
return {row.event_type: row.total for row in rows}
Aggregations Table
-- Event metrics
CREATE TABLE event_metrics (
event_type TEXT,
bucket DATE,
count COUNTER,
PRIMARY KEY ((event_type), bucket)
);
-- User events aggregation
CREATE TABLE user_events (
user_id UUID,
event_type TEXT,
timestamp TIMESTAMP,
count COUNTER,
PRIMARY KEY ((user_id, event_type), timestamp)
);
ML Model Storage
Model Metadata
-- Model registry
CREATE TABLE ml_models (
model_id UUID PRIMARY KEY,
model_name TEXT,
model_type TEXT,
version TEXT,
framework TEXT,
parameters MAP<TEXT, TEXT>,
metrics MAP<TEXT, DOUBLE>,
trained_at TIMESTAMP,
is_production BOOLEAN,
created_at TIMESTAMP DEFAULT now()
);
-- Model artifacts (metadata only, files in object storage)
CREATE TABLE model_artifacts (
model_id UUID PRIMARY KEY,
model_name TEXT,
artifact_path TEXT,
created_at TIMESTAMP DEFAULT now()
);
Python: Model Registry
class ModelRegistry:
def __init__(self, session):
self.session = session
def register_model(self, name, model_type, version, framework, parameters, metrics, is_production=False):
"""Register a new model"""
import uuid
model_id = uuid.uuid4()
self.session.execute(
"""
INSERT INTO ml_models
(model_id, model_name, model_type, version, framework, parameters, metrics, trained_at, is_production)
VALUES (%s, %s, %s, %s, %s, %s, %s, now(), %s)
""",
(model_id, name, model_type, version, framework,
str(parameters), str(metrics), is_production)
)
return model_id
def get_production_model(self, model_name):
"""Get current production model"""
rows = self.session.execute(
"""
SELECT * FROM ml_models
WHERE model_name = %s AND is_production = true
""",
(model_name,)
)
return rows.one()
def get_model_versions(self, model_name):
"""Get all versions of a model"""
rows = self.session.execute(
"""
SELECT * FROM ml_models
WHERE model_name = %s
ORDER BY trained """,
(_at DESC
model_name,)
)
return list(rows)
Real-Time Recommendations
User Activity Tracking
-- User activity stream
CREATE TABLE user_activity (
user_id UUID,
session_id UUID,
timestamp TIMESTAMP,
event_type TEXT,
item_id UUID,
metadata MAP<TEXT, TEXT>,
PRIMARY KEY ((user_id, session_id), timestamp, event_type)
) WITH CLUSTERING ORDER BY (timestamp DESC);
-- Item recommendations
CREATE TABLE user_recommendations (
user_id UUID,
item_id UUID,
score DOUBLE,
model_version TEXT,
updated_at TIMESTAMP,
PRIMARY KEY (user_id, score)
) WITH CLUSTERING ORDER BY (score DESC);
Python: Recommendation Engine
class RecommendationEngine:
def __init__(self, session):
self.session = session
def track_activity(self, user_id, session_id, event_type, item_id, metadata=None):
"""Track user activity"""
import json
from datetime import datetime
self.session.execute(
"""
INSERT INTO user_activity
(user_id, session_id, timestamp, event_type, item_id, metadata)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_id, session_id, datetime.now(), event_type, item_id,
json.dumps(metadata) if metadata else None)
)
def get_recommendations(self, user_id, limit=10):
"""Get recommendations for user"""
rows = self.session.execute(
"""
SELECT item_id, score
FROM user_recommendations
WHERE user_id = ?
LIMIT ?
""",
(user_id, limit)
)
return [(row.item_id, row.score) for row in rows]
def update_recommendations(self, user_id, recommendations, model_version):
"""Update recommendations for user"""
batch = BatchStatement()
# Clear old recommendations
batch.add(
"DELETE FROM user_recommendations WHERE user_id = %s",
(user_id,)
)
# Add new recommendations
for item_id, score in recommendations:
batch.add(
"""
INSERT INTO user_recommendations
(user_id, item_id, score, model_version, updated_at)
VALUES (?, ?, ?, ?, now())
""",
(user_id, item_id, score, model_version)
)
self.session.execute(batch)
Monitoring ML Workloads
-- Track prediction latency
CREATE TABLE prediction_metrics (
model_name TEXT,
timestamp TIMESTAMP,
latency_ms DOUBLE,
success BOOLEAN,
PRIMARY KEY ((model_name), timestamp)
);
-- Feature store metrics
CREATE TABLE feature_store_metrics (
feature_name TEXT,
timestamp TIMESTAMP,
requests BIGINT,
hits BIGINT,
misses BIGINT,
PRIMARY KEY ((feature_name), timestamp)
);
Best Practices
Performance Optimization
-- Batch non-logged operations
BEGIN BATCH
INSERT INTO events (user_id, event) VALUES (1, 'click');
INSERT INTO events (user_id, event) VALUES (2, 'view');
APPLY BATCH;
-- Use prepared statements
prepared = session.prepare("""
INSERT INTO events (user_id, event) VALUES (?, ?)
""")
Data Modeling for ML
-- Design for query patterns
-- Denormalize if needed
-- Pre-compute aggregations
-- Use appropriate compaction
Conclusion
Cassandra’s high write throughput and distributed architecture make it ideal for AI applications. From time-series data ingestion to feature stores and real-time recommendations, Cassandra powers modern ML data pipelines.
In the final article, we’ll explore real-world production use cases for Cassandra across different industries.
Comments