Skip to main content
โšก Calmops

Real-Time Data Pipelines: Kafka, Flink, and Spark Streaming

Introduction

Real-time data pipelines are critical for modern data-driven applications. Processing data as it arrives enables real-time analytics, fraud detection, recommendations, and monitoring. This guide covers building production-grade real-time pipelines using Kafka for ingestion, Flink/Spark for processing, and best practices for scaling to millions of events per second.

Key Statistics:

  • Real-time analytics improve decision-making by 40%
  • Fraud detection latency reduced from hours to milliseconds
  • Processing 1M+ events/second requires careful architecture
  • 99.99% uptime required for critical pipelines

Core Concepts & Terminology

1. Stream Processing

Processing continuous data flows in real-time rather than batch.

2. Event

Individual data point flowing through the pipeline (transaction, click, sensor reading).

3. Topic

Named stream of events in Kafka, similar to a queue.

4. Partition

Subdivision of a topic for parallel processing and scalability.

5. Consumer Group

Set of consumers that together process all partitions of a topic.

6. Windowing

Grouping events into time-based or count-based windows for aggregation.

7. Stateful Processing

Maintaining state across events (e.g., running totals, user sessions).

8. Exactly-Once Semantics

Guarantee that each event is processed exactly once, not skipped or duplicated.

9. Backpressure

Mechanism to slow down producers when consumers can’t keep up.

10. Checkpointing

Saving processing state to enable recovery from failures.


Real-Time Pipeline Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Data Sources                              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”‚
โ”‚  โ”‚ Applications โ”‚  โ”‚ IoT Devices  โ”‚  โ”‚ Databases    โ”‚      โ”‚
โ”‚  โ”‚ (Events)     โ”‚  โ”‚ (Sensors)    โ”‚  โ”‚ (CDC)        โ”‚      โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Message Broker (Kafka)                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”‚
โ”‚  โ”‚ Topic 1      โ”‚  โ”‚ Topic 2      โ”‚  โ”‚ Topic N      โ”‚      โ”‚
โ”‚  โ”‚ (Partitions) โ”‚  โ”‚ (Partitions) โ”‚  โ”‚ (Partitions) โ”‚      โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚          Stream Processing (Flink/Spark)                    โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”‚
โ”‚  โ”‚ Filtering    โ”‚  โ”‚ Aggregation  โ”‚  โ”‚ Enrichment   โ”‚      โ”‚
โ”‚  โ”‚ Transformationโ”‚ โ”‚ Windowing    โ”‚  โ”‚ Joins        โ”‚      โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                     โ”‚
        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚                         โ”‚
        โ–ผ                         โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Real-Time Store  โ”‚    โ”‚ Data Warehouse   โ”‚
โ”‚ (Redis, DynamoDB)โ”‚    โ”‚ (S3, BigQuery)   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Kafka Setup & Configuration

Kafka Cluster Configuration

# docker-compose.yml for local development
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

Topic Creation & Configuration

from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

class KafkaTopicManager:
    """Manage Kafka topics"""
    
    def __init__(self, bootstrap_servers: list[str]):
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            client_id='topic-manager'
        )
    
    def create_topic(self, topic_name: str, 
                    num_partitions: int = 3,
                    replication_factor: int = 3) -> bool:
        """Create Kafka topic"""
        
        topic = NewTopic(
            name=topic_name,
            num_partitions=num_partitions,
            replication_factor=replication_factor,
            topic_configs={
                'retention.ms': str(7 * 24 * 60 * 60 * 1000),  # 7 days
                'compression.type': 'snappy',
                'min.insync.replicas': '2'
            }
        )
        
        try:
            self.admin_client.create_topics([topic], validate_only=False)
            print(f"Topic {topic_name} created successfully")
            return True
        except TopicAlreadyExistsError:
            print(f"Topic {topic_name} already exists")
            return False
    
    def list_topics(self) -> dict:
        """List all topics"""
        return self.admin_client.list_topics()
    
    def delete_topic(self, topic_name: str) -> bool:
        """Delete topic"""
        try:
            self.admin_client.delete_topics([topic_name])
            print(f"Topic {topic_name} deleted")
            return True
        except Exception as e:
            print(f"Error deleting topic: {e}")
            return False

# Usage
manager = KafkaTopicManager(['localhost:9092'])
manager.create_topic('events', num_partitions=6, replication_factor=3)
manager.create_topic('transactions', num_partitions=12, replication_factor=3)

Kafka Producer Implementation

High-Performance Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from typing import Callable, Optional

logger = logging.getLogger(__name__)

class ReliableKafkaProducer:
    """Production-grade Kafka producer"""
    
    def __init__(self, bootstrap_servers: list[str]):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            acks='all',  # Wait for all replicas
            retries=3,
            max_in_flight_requests_per_connection=5,
            compression_type='snappy',
            batch_size=16384,
            linger_ms=10,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
        self.metrics = {
            'sent': 0,
            'failed': 0,
            'latency_ms': []
        }
    
    def send_event(self, topic: str, event: dict, 
                  key: Optional[str] = None,
                  callback: Optional[Callable] = None) -> bool:
        """Send event to Kafka"""
        
        try:
            future = self.producer.send(
                topic,
                value=event,
                key=key,
                timestamp_ms=int(time.time() * 1000)
            )
            
            # Add callback
            future.add_callback(self._on_send_success)
            future.add_errback(self._on_send_error)
            
            self.metrics['sent'] += 1
            return True
        
        except Exception as e:
            logger.error(f"Error sending event: {e}")
            self.metrics['failed'] += 1
            return False
    
    def send_batch(self, topic: str, events: list[dict], 
                  key_field: Optional[str] = None) -> int:
        """Send batch of events"""
        
        sent_count = 0
        for event in events:
            key = event.get(key_field) if key_field else None
            if self.send_event(topic, event, key):
                sent_count += 1
        
        return sent_count
    
    def _on_send_success(self, record_metadata):
        """Callback on successful send"""
        logger.debug(f"Message sent to {record_metadata.topic} "
                    f"partition {record_metadata.partition} "
                    f"offset {record_metadata.offset}")
    
    def _on_send_error(self, exc):
        """Callback on send error"""
        logger.error(f"Error sending message: {exc}")
        self.metrics['failed'] += 1
    
    def flush(self, timeout_ms: int = 30000):
        """Flush pending messages"""
        self.producer.flush(timeout_ms)
    
    def close(self):
        """Close producer"""
        self.producer.close()
    
    def get_metrics(self) -> dict:
        """Get producer metrics"""
        return self.metrics

# Usage
producer = ReliableKafkaProducer(['localhost:9092'])

# Send single event
event = {
    'user_id': 'user_123',
    'action': 'purchase',
    'amount': 99.99,
    'timestamp': datetime.now().isoformat()
}

producer.send_event('events', event, key='user_123')

# Send batch
events = [
    {'user_id': f'user_{i}', 'action': 'click', 'timestamp': datetime.now().isoformat()}
    for i in range(1000)
]

producer.send_batch('events', events, key_field='user_id')
producer.flush()
producer.close()

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, FilterFunction, AggregateFunction
from pyflink.datastream.windowing.window import TimeWindow
from pyflink.datastream.windowing.time_window import TimeWindowAll
import json
from datetime import datetime

class EventParser(MapFunction):
    """Parse JSON events"""
    
    def map(self, value):
        try:
            event = json.loads(value)
            event['parsed_time'] = datetime.now().timestamp()
            return event
        except Exception as e:
            print(f"Error parsing event: {e}")
            return None

class TransactionFilter(FilterFunction):
    """Filter for transaction events"""
    
    def filter(self, event):
        return event.get('type') == 'transaction' and event.get('amount', 0) > 0

class TransactionAggregator(AggregateFunction):
    """Aggregate transactions by user"""
    
    def create_accumulator(self):
        return {
            'count': 0,
            'total_amount': 0.0,
            'min_amount': float('inf'),
            'max_amount': 0.0
        }
    
    def add(self, value, accumulator):
        accumulator['count'] += 1
        accumulator['total_amount'] += value.get('amount', 0)
        accumulator['min_amount'] = min(accumulator['min_amount'], value.get('amount', 0))
        accumulator['max_amount'] = max(accumulator['max_amount'], value.get('amount', 0))
        return accumulator
    
    def get_result(self, accumulator):
        return {
            'count': accumulator['count'],
            'total': accumulator['total_amount'],
            'average': accumulator['total_amount'] / accumulator['count'] if accumulator['count'] > 0 else 0,
            'min': accumulator['min_amount'],
            'max': accumulator['max_amount']
        }
    
    def merge(self, a, b):
        return {
            'count': a['count'] + b['count'],
            'total_amount': a['total_amount'] + b['total_amount'],
            'min_amount': min(a['min_amount'], b['min_amount']),
            'max_amount': max(a['max_amount'], b['max_amount'])
        }

def create_flink_job():
    """Create Flink streaming job"""
    
    env = StreamExecutionEnvironment.get_execution_environment()
    
    # Set parallelism
    env.set_parallelism(4)
    
    # Enable checkpointing
    env.enable_checkpointing(60000)  # 60 seconds
    env.get_checkpoint_config().set_checkpointing_mode('EXACTLY_ONCE')
    
    # Read from Kafka
    kafka_source = env.add_source(
        FlinkKafkaConsumer(
            topics=['events'],
            deserialization_schema=SimpleStringSchema(),
            properties={
                'bootstrap.servers': 'localhost:9092',
                'group.id': 'flink-group',
                'auto.offset.reset': 'earliest'
            }
        )
    )
    
    # Process stream
    result = (kafka_source
        .map(EventParser())
        .filter(TransactionFilter())
        .key_by(lambda x: x.get('user_id'))
        .window(TimeWindow.of(60000))  # 60 second window
        .aggregate(TransactionAggregator())
        .map(lambda x: json.dumps(x))
    )
    
    # Write to Kafka
    result.add_sink(
        FlinkKafkaProducer(
            topic='aggregated-transactions',
            serialization_schema=SimpleStringSchema(),
            producer_config={
                'bootstrap.servers': 'localhost:9092'
            }
        )
    )
    
    # Execute
    env.execute("Transaction Aggregation Job")

# Run job
if __name__ == '__main__':
    create_flink_job()

Spark Structured Streaming

Spark Streaming Job

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def create_spark_streaming_job():
    """Create Spark Structured Streaming job"""
    
    spark = SparkSession.builder \
        .appName("RealTimeAnalytics") \
        .getOrCreate()
    
    # Set log level
    spark.sparkContext.setLogLevel("WARN")
    
    # Define schema
    schema = StructType([
        StructField("user_id", StringType()),
        StructField("action", StringType()),
        StructField("amount", DoubleType()),
        StructField("timestamp", TimestampType())
    ])
    
    # Read from Kafka
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "latest") \
        .load()
    
    # Parse JSON
    parsed_df = df.select(
        from_json(col("value").cast("string"), schema).alias("data")
    ).select("data.*")
    
    # Filter transactions
    transactions = parsed_df.filter(col("action") == "transaction")
    
    # Aggregate by user in 1-minute windows
    aggregated = transactions \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "1 minute"),
            col("user_id")
        ) \
        .agg(
            count("*").alias("transaction_count"),
            sum("amount").alias("total_amount"),
            avg("amount").alias("avg_amount"),
            min("amount").alias("min_amount"),
            max("amount").alias("max_amount")
        )
    
    # Write to Kafka
    query = aggregated \
        .select(to_json(struct("*")).alias("value")) \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "aggregated-transactions") \
        .option("checkpointLocation", "/tmp/checkpoint") \
        .start()
    
    # Also write to console for debugging
    console_query = aggregated \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", False) \
        .start()
    
    # Wait for termination
    spark.streams.awaitAnyTermination()

# Run job
if __name__ == '__main__':
    create_spark_streaming_job()

Best Practices

  1. Exactly-Once Semantics: Ensure no data loss or duplication
  2. Checkpointing: Save state regularly for recovery
  3. Backpressure Handling: Slow down producers when needed
  4. Monitoring: Track latency, throughput, and errors
  5. Scaling: Partition topics and scale consumers
  6. Error Handling: Implement dead letter queues
  7. Testing: Test with realistic data volumes
  8. Documentation: Document data schemas and pipelines
  9. Security: Encrypt data in transit and at rest
  10. Cost Optimization: Monitor resource usage

Common Pitfalls

  1. Data Loss: Not using acks=‘all’ in producer
  2. Duplicates: Not handling retries properly
  3. Latency: Not tuning batch sizes and windows
  4. Scaling Issues: Not partitioning topics enough
  5. State Management: Losing state on failures
  6. Monitoring Gaps: No visibility into pipeline health
  7. Schema Evolution: Not handling schema changes
  8. Resource Leaks: Not closing connections properly
  9. Unordered Processing: Assuming order across partitions
  10. No Backpressure: Overwhelming downstream systems

Performance Comparison Table

Framework Latency Throughput Complexity Maturity
Kafka Streams Low High Low High
Flink Very Low Very High Medium High
Spark Streaming Medium Very High Medium High
Storm Low Medium High Medium

External Resources


Conclusion

Building real-time data pipelines requires careful architecture, proper tooling, and operational discipline. By using Kafka for reliable ingestion, Flink or Spark for processing, and implementing best practices for monitoring and scaling, you can build pipelines that process millions of events per second with sub-second latency. The key is starting simple and gradually adding complexity as requirements grow.

Next Steps:

  1. Set up Kafka cluster
  2. Implement producer and consumer
  3. Build first streaming job
  4. Add monitoring and alerting
  5. Scale and optimize

Comments