Skip to main content
โšก Calmops

Cassandra in Production: Real-World Patterns and Best Practices

Introduction

Cassandra powers production systems across virtually every industry requiring high write throughput and global distribution. This article explores real-world use cases with practical implementation patterns.


IoT Platforms

Time-Series Sensor Data

-- Sensor readings table
CREATE TABLE sensor_readings (
    device_id TEXT,
    timestamp TIMESTAMP,
    temperature DECIMAL,
    humidity DECIMAL,
    battery_level DECIMAL,
    signal_strength INT,
    PRIMARY KEY (device_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'HOURS',
    'compaction_window_size': 24
};

-- Device metadata
CREATE TABLE device_metadata (
    device_id TEXT PRIMARY KEY,
    device_type TEXT,
    firmware_version TEXT,
    location TEXT,
    last_seen TIMESTAMP,
    status TEXT
);

-- Alerts
CREATE TABLE device_alerts (
    device_id TEXT,
    alert_id TIMEUUID,
    timestamp TIMESTAMP,
    alert_type TEXT,
    severity TEXT,
    message TEXT,
    acknowledged BOOLEAN DEFAULT false,
    PRIMARY KEY ((device_id), alert_id)
) WITH CLUSTERING ORDER BY (alert_id DESC);

IoT Python Application

from cassandra.cluster import Cluster
from datetime import datetime
import json

class IoTPlatform:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('iot_platform')
    
    def ingest_sensor_data(self, device_id, reading):
        """Ingest sensor reading"""
        self.session.execute(
            """
            INSERT INTO sensor_readings 
            (device_id, timestamp, temperature, humidity, battery_level, signal_strength)
            VALUES (%s, %s, %s, %s, %s, %s)
            """,
            (device_id, datetime.now(), 
             reading['temperature'], reading['humidity'],
             reading['battery'], reading['signal'])
        )
        
        # Update device metadata
        self.session.execute(
            """
            INSERT INTO device_metadata 
            (device_id, last_seen, status)
            VALUES (%s, %s, 'active')
            """,
            (device_id, datetime.now())
        )
    
    def get_device_readings(self, device_id, hours=24):
        """Get recent readings"""
        rows = self.session.execute(
            """
            SELECT * FROM sensor_readings 
            WHERE device_id = %s 
            AND timestamp > now() - %dh
            """,
            (device_id, hours)
        )
        return list(rows)
    
    def create_alert(self, device_id, alert_type, severity, message):
        """Create alert"""
        self.session.execute(
            """
            INSERT INTO device_alerts 
            (device_id, alert_id, timestamp, alert_type, severity, message)
            VALUES (%s, now(), now(), %s, %s, %s)
            """,
            (device_id, alert_type, severity, message)
        )

Messaging Systems

Message Storage

-- Messages table
CREATE TABLE messages (
    message_id TIMEUUID,
    conversation_id UUID,
    sender_id UUID,
    recipient_id UUID,
    content TEXT,
    created_at TIMESTAMP,
    read_at TIMESTAMP,
    deleted_at TIMESTAMP,
    PRIMARY KEY (conversation_id, message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);

-- Conversations
CREATE TABLE conversations (
    conversation_id UUID PRIMARY KEY,
    participants SET<UUID>,
    last_message_at TIMESTAMP,
    last_message_preview TEXT,
    unread_count MAP<UUID, INT>
);

-- User conversations
CREATE TABLE user_conversations (
    user_id UUID,
    conversation_id UUID,
    last_read_message_id TIMEUUID,
    added_at TIMESTAMP,
    PRIMARY KEY (user_id, last_message_at)
) WITH CLUSTERING ORDER BY (last_message_at DESC);

Message Operations

class MessagingPlatform:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('messaging')
    
    def send_message(self, sender_id, recipient_id, content):
        """Send a message"""
        import uuid
        from datetime import datetime
        
        conversation_id = uuid.uuid4()
        message_id = uuid.uuid1()
        
        # Create conversation if needed
        self.session.execute(
            """
            INSERT INTO conversations 
            (conversation_id, participants, last_message_at, last_message_preview)
            VALUES (%s, {%s, %s}, %s, %s)
            """,
            (conversation_id, sender_id, recipient_id, 
             datetime.now(), content[:50])
        )
        
        # Insert message
        self.session.execute(
            """
            INSERT INTO messages 
            (message_id, conversation_id, sender_id, recipient_id, content, created_at)
            VALUES (%s, %s, %s, %s, %s, %s)
            """,
            (message_id, conversation_id, sender_id, recipient_id, 
             content, datetime.now())
        )
        
        return conversation_id
    
    def get_conversation(self, conversation_id, limit=50):
        """Get conversation messages"""
        rows = self.session.execute(
            """
            SELECT * FROM messages 
            WHERE conversation_id = %s
            LIMIT %s
            """,
            (conversation_id, limit)
        )
        return list(rows)

User Activity Tracking

Event Logging

-- User events
CREATE TABLE user_events (
    user_id UUID,
    event_type TEXT,
    event_id TIMEUUID,
    timestamp TIMESTAMP,
    properties MAP<TEXT, TEXT>,
    session_id UUID,
    PRIMARY KEY ((user_id, event_type), event_id)
) WITH CLUSTERING ORDER BY (event_id DESC);

-- Daily active users
CREATE TABLE daily_active_users (
    date DATE,
    user_id UUID,
    first_seen TIMESTAMP,
    last_seen TIMESTAMP,
    event_count INT,
    PRIMARY KEY (date, user_id)
);

-- Event aggregations
CREATE TABLE event_aggregations (
    event_type TEXT,
    date DATE,
    count COUNTER,
    unique_users COUNTER,
    PRIMARY KEY ((event_type), date)
);

Analytics Pipeline

class AnalyticsTracker:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('analytics')
    
    def track_event(self, user_id, event_type, properties=None, session_id=None):
        """Track user event"""
        import json
        from datetime import datetime
        
        # Insert event
        self.session.execute(
            """
            INSERT INTO user_events 
            (user_id, event_type, event_id, timestamp, properties, session_id)
            VALUES (%, event_type%, %, %, %, %)
            """,
            (user_id, event_type, datetime.now(), 
             json.dumps(properties) if properties else None,
             session_id)
        )
    
    def get_user_journey(self, user_id, event_type=None, limit=100):
        """Get user event history"""
        if event_type:
            rows = self.session.execute(
                """
                SELECT * FROM user_events 
                WHERE user_id = %s AND event_type = %s
                LIMIT %s
                """,
                (user_id, event_type, limit)
            )
        else:
            rows = self.session.execute(
                """
                SELECT * FROM user_events 
                WHERE user_id = %s
                LIMIT %s
                """,
                (user_id, limit)
            )
        return list(rows)

Gaming Applications

Player Data

-- Player profiles
CREATE TABLE player_profiles (
    player_id UUID PRIMARY KEY,
    username TEXT,
    email TEXT,
    level INT,
    experience BIGINT,
    currency MAP<TEXT, INT>,
    created_at TIMESTAMP,
    last_login TIMESTAMP
);

-- Player inventory
CREATE TABLE player_inventory (
    player_id UUID,
    item_id UUID,
    quantity INT,
    acquired_at TIMESTAMP,
    PRIMARY KEY ((player_id), item_id)
);

-- Player scores
CREATE TABLE player_scores (
    game_id TEXT,
    player_id UUID,
    score INT,
    achieved_at TIMESTAMP,
    PRIMARY KEY ((game_id), score, player_id)
) WITH CLUSTERING ORDER BY (score DESC);

-- Leaderboards
CREATE TABLE leaderboard_daily (
    game_id TEXT,
    date DATE,
    player_id UUID,
    score INT,
    rank INT,
    PRIMARY KEY ((game_id, date), rank)
);

Game Server

class GameBackend:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('gaming')
    
    def record_score(self, game_id, player_id, score):
        """Record player score"""
        from datetime import datetime
        
        self.session.execute(
            """
            INSERT INTO player_scores 
            (game, score, achieved_at)
            VALUES (%s, %s, %s, %s)
            """,
           _id, player_id (game_id, player_id, score, datetime.now())
        )
    
    def get_leaderboard(self, game_id, limit=10):
        """Get top scores"""
        rows = self.session.execute(
            """
            SELECT * FROM player_scores 
            WHERE game_id = %s
            LIMIT %s
            """,
            (game_id, limit)
        )
        return list(rows)
    
    def update_inventory(self, player_id, item_id, quantity):
        """Update player inventory"""
        self.session.execute(
            """
            UPDATE player_inventory 
            SET quantity = quantity + %s
            WHERE player_id = %s AND item_id = %s
            """,
            (quantity, player_id, item_id)
        )

Financial Applications

Transaction Logging

-- Transactions
CREATE TABLE transactions (
    transaction_id TIMEUUID,
    account_id UUID,
    transaction_type TEXT,
    amount DECIMAL,
    balance_after DECIMAL,
    timestamp TIMESTAMP,
    status TEXT,
    metadata MAP<TEXT, TEXT>,
    PRIMARY KEY (account_id, timestamp, transaction_id)
) WITH CLUSTERING ORDER BY (timestamp DESC);

-- Account balances
CREATE TABLE account_balances (
    account_id UUID PRIMARY KEY,
    balance DECIMAL,
    currency TEXT,
    last_transaction_id TIMEUUID,
    updated_at TIMESTAMP
);

-- Transaction history
CREATE TABLE transaction_history (
    account_id UUID,
    year INT,
    month INT,
    transaction_id TIMEUUID,
    amount DECIMAL,
    PRIMARY KEY ((account_id, year, month), transaction_id)
) WITH CLUSTERING ORDER BY (transaction_id DESC);

Best Practices Summary

Use Case Key Features Cassandra Solution
IoT High write throughput TWCS compaction
Messaging Low latency reads Time clustering
Gaming Real-time scores Counter tables
Analytics Aggregation Materialized views
Financial Audit trail Time partitioning

Performance Optimization

Write-Heavy Workloads

-- Batch non-logged for same partition
BEGIN BATCH
INSERT INTO events (user_id, event) VALUES (1, 'event1');
INSERT INTO events (user_id, event) VALUES (1, 'event2');
APPLY BATCH;

-- Avoid batch for different partitions
-- Use asynchronous writes instead

Read Optimization

-- Denormalize for read patterns
-- Create separate tables for queries
-- Use appropriate clustering order
-- Limit partition size

Conclusion

Cassandra excels in production environments requiring high write throughput, linear scalability, and fault tolerance. From IoT platforms to gaming leaderboards, Cassandra powers real-time applications across industries.

With proper data modeling and cluster management, Cassandra can handle millions of writes per second while maintaining low latency reads.

Resources

Comments