Introduction
Cassandra powers production systems across virtually every industry requiring high write throughput and global distribution. This article explores real-world use cases with practical implementation patterns.
IoT Platforms
Time-Series Sensor Data
-- Sensor readings table
CREATE TABLE sensor_readings (
device_id TEXT,
timestamp TIMESTAMP,
temperature DECIMAL,
humidity DECIMAL,
battery_level DECIMAL,
signal_strength INT,
PRIMARY KEY (device_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
AND compaction = {
'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'HOURS',
'compaction_window_size': 24
};
-- Device metadata
CREATE TABLE device_metadata (
device_id TEXT PRIMARY KEY,
device_type TEXT,
firmware_version TEXT,
location TEXT,
last_seen TIMESTAMP,
status TEXT
);
-- Alerts
CREATE TABLE device_alerts (
device_id TEXT,
alert_id TIMEUUID,
timestamp TIMESTAMP,
alert_type TEXT,
severity TEXT,
message TEXT,
acknowledged BOOLEAN DEFAULT false,
PRIMARY KEY ((device_id), alert_id)
) WITH CLUSTERING ORDER BY (alert_id DESC);
IoT Python Application
from cassandra.cluster import Cluster
from datetime import datetime
import json
class IoTPlatform:
def __init__(self):
self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
self.session = self.cluster.connect('iot_platform')
def ingest_sensor_data(self, device_id, reading):
"""Ingest sensor reading"""
self.session.execute(
"""
INSERT INTO sensor_readings
(device_id, timestamp, temperature, humidity, battery_level, signal_strength)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(device_id, datetime.now(),
reading['temperature'], reading['humidity'],
reading['battery'], reading['signal'])
)
# Update device metadata
self.session.execute(
"""
INSERT INTO device_metadata
(device_id, last_seen, status)
VALUES (%s, %s, 'active')
""",
(device_id, datetime.now())
)
def get_device_readings(self, device_id, hours=24):
"""Get recent readings"""
rows = self.session.execute(
"""
SELECT * FROM sensor_readings
WHERE device_id = %s
AND timestamp > now() - %dh
""",
(device_id, hours)
)
return list(rows)
def create_alert(self, device_id, alert_type, severity, message):
"""Create alert"""
self.session.execute(
"""
INSERT INTO device_alerts
(device_id, alert_id, timestamp, alert_type, severity, message)
VALUES (%s, now(), now(), %s, %s, %s)
""",
(device_id, alert_type, severity, message)
)
Messaging Systems
Message Storage
-- Messages table
CREATE TABLE messages (
message_id TIMEUUID,
conversation_id UUID,
sender_id UUID,
recipient_id UUID,
content TEXT,
created_at TIMESTAMP,
read_at TIMESTAMP,
deleted_at TIMESTAMP,
PRIMARY KEY (conversation_id, message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);
-- Conversations
CREATE TABLE conversations (
conversation_id UUID PRIMARY KEY,
participants SET<UUID>,
last_message_at TIMESTAMP,
last_message_preview TEXT,
unread_count MAP<UUID, INT>
);
-- User conversations
CREATE TABLE user_conversations (
user_id UUID,
conversation_id UUID,
last_read_message_id TIMEUUID,
added_at TIMESTAMP,
PRIMARY KEY (user_id, last_message_at)
) WITH CLUSTERING ORDER BY (last_message_at DESC);
Message Operations
class MessagingPlatform:
def __init__(self):
self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
self.session = self.cluster.connect('messaging')
def send_message(self, sender_id, recipient_id, content):
"""Send a message"""
import uuid
from datetime import datetime
conversation_id = uuid.uuid4()
message_id = uuid.uuid1()
# Create conversation if needed
self.session.execute(
"""
INSERT INTO conversations
(conversation_id, participants, last_message_at, last_message_preview)
VALUES (%s, {%s, %s}, %s, %s)
""",
(conversation_id, sender_id, recipient_id,
datetime.now(), content[:50])
)
# Insert message
self.session.execute(
"""
INSERT INTO messages
(message_id, conversation_id, sender_id, recipient_id, content, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(message_id, conversation_id, sender_id, recipient_id,
content, datetime.now())
)
return conversation_id
def get_conversation(self, conversation_id, limit=50):
"""Get conversation messages"""
rows = self.session.execute(
"""
SELECT * FROM messages
WHERE conversation_id = %s
LIMIT %s
""",
(conversation_id, limit)
)
return list(rows)
User Activity Tracking
Event Logging
-- User events
CREATE TABLE user_events (
user_id UUID,
event_type TEXT,
event_id TIMEUUID,
timestamp TIMESTAMP,
properties MAP<TEXT, TEXT>,
session_id UUID,
PRIMARY KEY ((user_id, event_type), event_id)
) WITH CLUSTERING ORDER BY (event_id DESC);
-- Daily active users
CREATE TABLE daily_active_users (
date DATE,
user_id UUID,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
event_count INT,
PRIMARY KEY (date, user_id)
);
-- Event aggregations
CREATE TABLE event_aggregations (
event_type TEXT,
date DATE,
count COUNTER,
unique_users COUNTER,
PRIMARY KEY ((event_type), date)
);
Analytics Pipeline
class AnalyticsTracker:
def __init__(self):
self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
self.session = self.cluster.connect('analytics')
def track_event(self, user_id, event_type, properties=None, session_id=None):
"""Track user event"""
import json
from datetime import datetime
# Insert event
self.session.execute(
"""
INSERT INTO user_events
(user_id, event_type, event_id, timestamp, properties, session_id)
VALUES (%, event_type%, %, %, %, %)
""",
(user_id, event_type, datetime.now(),
json.dumps(properties) if properties else None,
session_id)
)
def get_user_journey(self, user_id, event_type=None, limit=100):
"""Get user event history"""
if event_type:
rows = self.session.execute(
"""
SELECT * FROM user_events
WHERE user_id = %s AND event_type = %s
LIMIT %s
""",
(user_id, event_type, limit)
)
else:
rows = self.session.execute(
"""
SELECT * FROM user_events
WHERE user_id = %s
LIMIT %s
""",
(user_id, limit)
)
return list(rows)
Gaming Applications
Player Data
-- Player profiles
CREATE TABLE player_profiles (
player_id UUID PRIMARY KEY,
username TEXT,
email TEXT,
level INT,
experience BIGINT,
currency MAP<TEXT, INT>,
created_at TIMESTAMP,
last_login TIMESTAMP
);
-- Player inventory
CREATE TABLE player_inventory (
player_id UUID,
item_id UUID,
quantity INT,
acquired_at TIMESTAMP,
PRIMARY KEY ((player_id), item_id)
);
-- Player scores
CREATE TABLE player_scores (
game_id TEXT,
player_id UUID,
score INT,
achieved_at TIMESTAMP,
PRIMARY KEY ((game_id), score, player_id)
) WITH CLUSTERING ORDER BY (score DESC);
-- Leaderboards
CREATE TABLE leaderboard_daily (
game_id TEXT,
date DATE,
player_id UUID,
score INT,
rank INT,
PRIMARY KEY ((game_id, date), rank)
);
Game Server
class GameBackend:
def __init__(self):
self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
self.session = self.cluster.connect('gaming')
def record_score(self, game_id, player_id, score):
"""Record player score"""
from datetime import datetime
self.session.execute(
"""
INSERT INTO player_scores
(game, score, achieved_at)
VALUES (%s, %s, %s, %s)
""",
_id, player_id (game_id, player_id, score, datetime.now())
)
def get_leaderboard(self, game_id, limit=10):
"""Get top scores"""
rows = self.session.execute(
"""
SELECT * FROM player_scores
WHERE game_id = %s
LIMIT %s
""",
(game_id, limit)
)
return list(rows)
def update_inventory(self, player_id, item_id, quantity):
"""Update player inventory"""
self.session.execute(
"""
UPDATE player_inventory
SET quantity = quantity + %s
WHERE player_id = %s AND item_id = %s
""",
(quantity, player_id, item_id)
)
Financial Applications
Transaction Logging
-- Transactions
CREATE TABLE transactions (
transaction_id TIMEUUID,
account_id UUID,
transaction_type TEXT,
amount DECIMAL,
balance_after DECIMAL,
timestamp TIMESTAMP,
status TEXT,
metadata MAP<TEXT, TEXT>,
PRIMARY KEY (account_id, timestamp, transaction_id)
) WITH CLUSTERING ORDER BY (timestamp DESC);
-- Account balances
CREATE TABLE account_balances (
account_id UUID PRIMARY KEY,
balance DECIMAL,
currency TEXT,
last_transaction_id TIMEUUID,
updated_at TIMESTAMP
);
-- Transaction history
CREATE TABLE transaction_history (
account_id UUID,
year INT,
month INT,
transaction_id TIMEUUID,
amount DECIMAL,
PRIMARY KEY ((account_id, year, month), transaction_id)
) WITH CLUSTERING ORDER BY (transaction_id DESC);
Best Practices Summary
| Use Case | Key Features | Cassandra Solution |
|---|---|---|
| IoT | High write throughput | TWCS compaction |
| Messaging | Low latency reads | Time clustering |
| Gaming | Real-time scores | Counter tables |
| Analytics | Aggregation | Materialized views |
| Financial | Audit trail | Time partitioning |
Performance Optimization
Write-Heavy Workloads
-- Batch non-logged for same partition
BEGIN BATCH
INSERT INTO events (user_id, event) VALUES (1, 'event1');
INSERT INTO events (user_id, event) VALUES (1, 'event2');
APPLY BATCH;
-- Avoid batch for different partitions
-- Use asynchronous writes instead
Read Optimization
-- Denormalize for read patterns
-- Create separate tables for queries
-- Use appropriate clustering order
-- Limit partition size
Conclusion
Cassandra excels in production environments requiring high write throughput, linear scalability, and fault tolerance. From IoT platforms to gaming leaderboards, Cassandra powers real-time applications across industries.
With proper data modeling and cluster management, Cassandra can handle millions of writes per second while maintaining low latency reads.
Comments