Skip to main content

Cassandra in Production: Real-World Patterns and Best Practices

Created: March 5, 2026 CalmOps 6 min read

Introduction

Cassandra powers production systems across virtually every industry requiring high write throughput and global distribution. This article explores real-world use cases with practical implementation patterns.


IoT Platforms

Time-Series Sensor Data

-- Sensor readings table
CREATE TABLE sensor_readings (
    device_id TEXT,
    timestamp TIMESTAMP,
    temperature DECIMAL,
    humidity DECIMAL,
    battery_level DECIMAL,
    signal_strength INT,
    PRIMARY KEY (device_id, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC)
AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'HOURS',
    'compaction_window_size': 24
};

-- Device metadata
CREATE TABLE device_metadata (
    device_id TEXT PRIMARY KEY,
    device_type TEXT,
    firmware_version TEXT,
    location TEXT,
    last_seen TIMESTAMP,
    status TEXT
);

-- Alerts
CREATE TABLE device_alerts (
    device_id TEXT,
    alert_id TIMEUUID,
    timestamp TIMESTAMP,
    alert_type TEXT,
    severity TEXT,
    message TEXT,
    acknowledged BOOLEAN DEFAULT false,
    PRIMARY KEY ((device_id), alert_id)
) WITH CLUSTERING ORDER BY (alert_id DESC);

IoT Python Application

from cassandra.cluster import Cluster
from datetime import datetime
import json

class IoTPlatform:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('iot_platform')
    
    def ingest_sensor_data(self, device_id, reading):
        """Ingest sensor reading"""
        self.session.execute(
            """
            INSERT INTO sensor_readings 
            (device_id, timestamp, temperature, humidity, battery_level, signal_strength)
            VALUES (%s, %s, %s, %s, %s, %s)
            """,
            (device_id, datetime.now(), 
             reading['temperature'], reading['humidity'],
             reading['battery'], reading['signal'])
        )
        
        # Update device metadata
        self.session.execute(
            """
            INSERT INTO device_metadata 
            (device_id, last_seen, status)
            VALUES (%s, %s, 'active')
            """,
            (device_id, datetime.now())
        )
    
    def get_device_readings(self, device_id, hours=24):
        """Get recent readings"""
        rows = self.session.execute(
            """
            SELECT * FROM sensor_readings 
            WHERE device_id = %s 
            AND timestamp > now() - %dh
            """,
            (device_id, hours)
        )
        return list(rows)
    
    def create_alert(self, device_id, alert_type, severity, message):
        """Create alert"""
        self.session.execute(
            """
            INSERT INTO device_alerts 
            (device_id, alert_id, timestamp, alert_type, severity, message)
            VALUES (%s, now(), now(), %s, %s, %s)
            """,
            (device_id, alert_type, severity, message)
        )

Messaging Systems

Message Storage

-- Messages table
CREATE TABLE messages (
    message_id TIMEUUID,
    conversation_id UUID,
    sender_id UUID,
    recipient_id UUID,
    content TEXT,
    created_at TIMESTAMP,
    read_at TIMESTAMP,
    deleted_at TIMESTAMP,
    PRIMARY KEY (conversation_id, message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);

-- Conversations
CREATE TABLE conversations (
    conversation_id UUID PRIMARY KEY,
    participants SET<UUID>,
    last_message_at TIMESTAMP,
    last_message_preview TEXT,
    unread_count MAP<UUID, INT>
);

-- User conversations
CREATE TABLE user_conversations (
    user_id UUID,
    conversation_id UUID,
    last_read_message_id TIMEUUID,
    added_at TIMESTAMP,
    PRIMARY KEY (user_id, last_message_at)
) WITH CLUSTERING ORDER BY (last_message_at DESC);

Message Operations

class MessagingPlatform:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('messaging')
    
    def send_message(self, sender_id, recipient_id, content):
        """Send a message"""
        import uuid
        from datetime import datetime
        
        conversation_id = uuid.uuid4()
        message_id = uuid.uuid1()
        
        # Create conversation if needed
        self.session.execute(
            """
            INSERT INTO conversations 
            (conversation_id, participants, last_message_at, last_message_preview)
            VALUES (%s, {%s, %s}, %s, %s)
            """,
            (conversation_id, sender_id, recipient_id, 
             datetime.now(), content[:50])
        )
        
        # Insert message
        self.session.execute(
            """
            INSERT INTO messages 
            (message_id, conversation_id, sender_id, recipient_id, content, created_at)
            VALUES (%s, %s, %s, %s, %s, %s)
            """,
            (message_id, conversation_id, sender_id, recipient_id, 
             content, datetime.now())
        )
        
        return conversation_id
    
    def get_conversation(self, conversation_id, limit=50):
        """Get conversation messages"""
        rows = self.session.execute(
            """
            SELECT * FROM messages 
            WHERE conversation_id = %s
            LIMIT %s
            """,
            (conversation_id, limit)
        )
        return list(rows)

User Activity Tracking

Event Logging

-- User events
CREATE TABLE user_events (
    user_id UUID,
    event_type TEXT,
    event_id TIMEUUID,
    timestamp TIMESTAMP,
    properties MAP<TEXT, TEXT>,
    session_id UUID,
    PRIMARY KEY ((user_id, event_type), event_id)
) WITH CLUSTERING ORDER BY (event_id DESC);

-- Daily active users
CREATE TABLE daily_active_users (
    date DATE,
    user_id UUID,
    first_seen TIMESTAMP,
    last_seen TIMESTAMP,
    event_count INT,
    PRIMARY KEY (date, user_id)
);

-- Event aggregations
CREATE TABLE event_aggregations (
    event_type TEXT,
    date DATE,
    count COUNTER,
    unique_users COUNTER,
    PRIMARY KEY ((event_type), date)
);

Analytics Pipeline

class AnalyticsTracker:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('analytics')
    
    def track_event(self, user_id, event_type, properties=None, session_id=None):
        """Track user event"""
        import json
        from datetime import datetime
        
        # Insert event
        self.session.execute(
            """
            INSERT INTO user_events 
            (user_id, event_type, event_id, timestamp, properties, session_id)
            VALUES (%, event_type%, %, %, %, %)
            """,
            (user_id, event_type, datetime.now(), 
             json.dumps(properties) if properties else None,
             session_id)
        )
    
    def get_user_journey(self, user_id, event_type=None, limit=100):
        """Get user event history"""
        if event_type:
            rows = self.session.execute(
                """
                SELECT * FROM user_events 
                WHERE user_id = %s AND event_type = %s
                LIMIT %s
                """,
                (user_id, event_type, limit)
            )
        else:
            rows = self.session.execute(
                """
                SELECT * FROM user_events 
                WHERE user_id = %s
                LIMIT %s
                """,
                (user_id, limit)
            )
        return list(rows)

Gaming Applications

Player Data

-- Player profiles
CREATE TABLE player_profiles (
    player_id UUID PRIMARY KEY,
    username TEXT,
    email TEXT,
    level INT,
    experience BIGINT,
    currency MAP<TEXT, INT>,
    created_at TIMESTAMP,
    last_login TIMESTAMP
);

-- Player inventory
CREATE TABLE player_inventory (
    player_id UUID,
    item_id UUID,
    quantity INT,
    acquired_at TIMESTAMP,
    PRIMARY KEY ((player_id), item_id)
);

-- Player scores
CREATE TABLE player_scores (
    game_id TEXT,
    player_id UUID,
    score INT,
    achieved_at TIMESTAMP,
    PRIMARY KEY ((game_id), score, player_id)
) WITH CLUSTERING ORDER BY (score DESC);

-- Leaderboards
CREATE TABLE leaderboard_daily (
    game_id TEXT,
    date DATE,
    player_id UUID,
    score INT,
    rank INT,
    PRIMARY KEY ((game_id, date), rank)
);

Game Server

class GameBackend:
    def __init__(self):
        self.cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
        self.session = self.cluster.connect('gaming')
    
    def record_score(self, game_id, player_id, score):
        """Record player score"""
        from datetime import datetime
        
        self.session.execute(
            """
            INSERT INTO player_scores 
            (game, score, achieved_at)
            VALUES (%s, %s, %s, %s)
            """,
           _id, player_id (game_id, player_id, score, datetime.now())
        )
    
    def get_leaderboard(self, game_id, limit=10):
        """Get top scores"""
        rows = self.session.execute(
            """
            SELECT * FROM player_scores 
            WHERE game_id = %s
            LIMIT %s
            """,
            (game_id, limit)
        )
        return list(rows)
    
    def update_inventory(self, player_id, item_id, quantity):
        """Update player inventory"""
        self.session.execute(
            """
            UPDATE player_inventory 
            SET quantity = quantity + %s
            WHERE player_id = %s AND item_id = %s
            """,
            (quantity, player_id, item_id)
        )

Financial Applications

Transaction Logging

-- Transactions
CREATE TABLE transactions (
    transaction_id TIMEUUID,
    account_id UUID,
    transaction_type TEXT,
    amount DECIMAL,
    balance_after DECIMAL,
    timestamp TIMESTAMP,
    status TEXT,
    metadata MAP<TEXT, TEXT>,
    PRIMARY KEY (account_id, timestamp, transaction_id)
) WITH CLUSTERING ORDER BY (timestamp DESC);

-- Account balances
CREATE TABLE account_balances (
    account_id UUID PRIMARY KEY,
    balance DECIMAL,
    currency TEXT,
    last_transaction_id TIMEUUID,
    updated_at TIMESTAMP
);

-- Transaction history
CREATE TABLE transaction_history (
    account_id UUID,
    year INT,
    month INT,
    transaction_id TIMEUUID,
    amount DECIMAL,
    PRIMARY KEY ((account_id, year, month), transaction_id)
) WITH CLUSTERING ORDER BY (transaction_id DESC);

Best Practices Summary

Use Case Key Features Cassandra Solution
IoT High write throughput TWCS compaction
Messaging Low latency reads Time clustering
Gaming Real-time scores Counter tables
Analytics Aggregation Materialized views
Financial Audit trail Time partitioning

Performance Optimization

Write-Heavy Workloads

-- Batch non-logged for same partition
BEGIN BATCH
INSERT INTO events (user_id, event) VALUES (1, 'event1');
INSERT INTO events (user_id, event) VALUES (1, 'event2');
APPLY BATCH;

-- Avoid batch for different partitions
-- Use asynchronous writes instead

Read Optimization

-- Denormalize for read patterns
-- Create separate tables for queries
-- Use appropriate clustering order
-- Limit partition size

Conclusion

Cassandra excels in production environments requiring high write throughput, linear scalability, and fault tolerance. From IoT platforms to gaming leaderboards, Cassandra powers real-time applications across industries.

With proper data modeling and cluster management, Cassandra can handle millions of writes per second while maintaining low latency reads.

Resources

Comments

Share this article

Scan to read on mobile