Skip to main content

Stream Processing with Kafka and Flink

Published: February 23, 2026 Updated: May 24, 2026 Larry Qu 17 min read

Introduction

In today’s data-driven world, the ability to process data in real-time is critical for fraud detection, monitoring, personalization, and operational intelligence. Apache Kafka and Apache Flink form the backbone of modern stream processing systems, enabling organizations to build robust, scalable real-time data pipelines.

In this guide, we’ll explore event streaming with Kafka, stream processing with Flink, and how to build end-to-end real-time data systems.


Understanding Event Streaming

What is Event Streaming?

Event streaming is the practice of capturing data in real-time as a continuous stream of events. Unlike batch processing, which processes data in discrete chunks, event streaming handles data as it arrives.

┌─────────────────────────────────────────────────────────────────────┐
│               BATCH VS STREAM PROCESSING                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  BATCH PROCESSING                    STREAM PROCESSING               │
│  ─────────────────                  ──────────────────                │
│                                                                     │
│  ┌─────────────────┐               ┌─────────────────┐              │
│  │ Collect data    │               │ Process events │              │
│  │ for period    │               │ immediately    │              │
│  └────────┬────────┘               └────────┬────────┘              │
│           │                                │                        │
│           ▼                                ▼                        │
│  ┌─────────────────┐               ┌─────────────────┐              │
│  │ Process at     │               │ Continuous      │              │
│  │ scheduled time │               │ processing      │              │
│  └────────┬────────┘               └────────┬────────┘              │
│           │                                │                        │
│           ▼                                ▼                        │
│  ┌─────────────────┐               ┌─────────────────┐              │
│  │ Results at     │               │ Results in      │              │
│  │ end of batch  │               │ real-time      │              │
│  └─────────────────┘               └─────────────────┘              │
│                                                                     │
│  Examples:                        Examples:                         │
│  • Daily reports                  • Fraud detection                 │
│  • Monthly analytics              • Real-time dashboards           │
│  • ETL jobs                      • Alerting systems               │
│  • Data warehouse load            • Live personalization           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Core Streaming Terminology

1. Stream Processing

Processing continuous data flows in real-time rather than batch.

2. Event

Individual data point flowing through the pipeline (transaction, click, sensor reading).

3. Topic

Named stream of events in Kafka, similar to a queue.

4. Partition

Subdivision of a topic for parallel processing and scalability.

5. Consumer Group

Set of consumers that together process all partitions of a topic.

6. Windowing

Grouping events into time-based or count-based windows for aggregation.

7. Stateful Processing

Maintaining state across events (e.g., running totals, user sessions).

8. Exactly-Once Semantics

Guarantee that each event is processed exactly once, not skipped or duplicated.

9. Backpressure

Mechanism to slow down producers when consumers cannot keep up.

10. Checkpointing

Saving processing state to enable recovery from failures.


Apache Kafka: Event Streaming Platform

Core Concepts

Kafka is a distributed event streaming platform that can publish, subscribe to, store, and process streams of records.

┌─────────────────────────────────────────────────────────────────────┐
│                    KAFKA ARCHITECTURE                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                      KAFKA CLUSTER                           │  │
│  │                                                               │  │
│  │    ┌─────────┐    ┌─────────┐    ┌─────────┐              │  │
│  │    │ Broker 1│    │ Broker 2│    │ Broker N│              │  │
│  │    │(Leader) │    │(Follower)│   │(Follower)│              │  │
│  │    └────┬────┘    └────┬────┘    └────┬────┘              │  │
│  │         │              │              │                      │  │
│  └─────────┼──────────────┼──────────────┼──────────────────────┘  │
│            │              │              │                          │
│  ┌─────────┴──────────────┴──────────────┴──────────────────────┐  │
│  │                      TOPICS                                   │  │
│  │  ┌───────────────┐  ┌───────────────┐  ┌───────────────┐  │  │
│  │  │   Orders     │  │   Clicks     │  │   Logs       │  │  │
│  │  │ ┌─┬─┬─┬─┬─┐ │  │ ┌─┬─┬─┬─┬─┐ │  │ ┌─┬─┬─┬─┬─┐ │  │  │
│  │  │ │0│1│2│3│4│5│ │  │ │0│1│2│3│4│ │  │ │0│1│2│3│4│ │  │  │
│  │  │ └─┴─┴─┴─┴─┘ │  │ └─┴─┴─┴─┴─┘ │  │ └─┴─┴─┴─┴─┘ │  │  │
│  │  │  Partitions │  │  Partitions │  │  Partitions │  │  │
│  │  └───────────────┘  └───────────────┘  └───────────────┘  │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  PRODUCERS ──────────────────────► KAFKA ──────────────────────►    │
│                                    CONSUMERS                        │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Key Terminology

  • Topic: A category/feed name to which messages are published
  • Partition: Topics are split into partitions for parallelism
  • Producer: Publishes messages to topics
  • Consumer: Subscribes to topics and processes messages
  • Broker: Kafka server that stores messages
  • Consumer Group: Group of consumers that share partitions
  • Offset: Position of consumer in a partition

Kafka Cluster Docker Setup

# docker-compose.yml for local development
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_SEGMENT_BYTES: 1073741824

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181

Topic Management

from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

class KafkaTopicManager:
    """Manage Kafka topics"""

    def __init__(self, bootstrap_servers: list[str]):
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            client_id='topic-manager'
        )

    def create_topic(self, topic_name: str,
                    num_partitions: int = 3,
                    replication_factor: int = 3) -> bool:
        """Create Kafka topic"""

        topic = NewTopic(
            name=topic_name,
            num_partitions=num_partitions,
            replication_factor=replication_factor,
            topic_configs={
                'retention.ms': str(7 * 24 * 60 * 60 * 1000),  # 7 days
                'compression.type': 'snappy',
                'min.insync.replicas': '2'
            }
        )

        try:
            self.admin_client.create_topics([topic], validate_only=False)
            print(f"Topic {topic_name} created successfully")
            return True
        except TopicAlreadyExistsError:
            print(f"Topic {topic_name} already exists")
            return False

    def list_topics(self) -> dict:
        """List all topics"""
        return self.admin_client.list_topics()

    def delete_topic(self, topic_name: str) -> bool:
        """Delete topic"""
        try:
            self.admin_client.delete_topics([topic_name])
            print(f"Topic {topic_name} deleted")
            return True
        except Exception as e:
            print(f"Error deleting topic: {e}")
            return False

# Usage
manager = KafkaTopicManager(['localhost:9092'])
manager.create_topic('events', num_partitions=6, replication_factor=3)
manager.create_topic('transactions', num_partitions=12, replication_factor=3)

Kafka Producer Implementation

from kafka import KafkaProducer
import json
import time

class OrderProducer:
    """
    Kafka producer for order events
    """
    
    def __init__(self, bootstrap_servers: list):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Wait for all replicas
            retries=3,
            max_in_flight_requests_per_connection=5,
            compression_type='gzip'
        )
        
        self.topic = 'orders'
    
    def send_order(self, order_data: dict):
        """
        Send order event to Kafka
        """
        
        # Create order event
        event = {
            'event_type': 'order_created',
            'timestamp': time.time(),
            'order_id': order_data['order_id'],
            'customer_id': order_data['customer_id'],
            'total_amount': order_data['total_amount'],
            'items': order_data['items'],
            'status': 'created'
        }
        
        # Send with order ID as key for partitioning
        future = self.producer.send(
            self.topic,
            key=order_data['order_id'],
            value=event
        )
        
        # Wait for confirmation
        record_metadata = future.get(timeout=10)
        
        return {
            'topic': record_metadata.topic,
            'partition': record_metadata.partition,
            'offset': record_metadata.offset
        }
    
    def send_batch(self, orders: list):
        """
        Send batch of orders
        """
        
        futures = []
        for order in orders:
            event = {
                'event_type': 'order_created',
                'timestamp': time.time(),
                **order
            }
            future = self.producer.send(
                self.topic,
                key=order['order_id'],
                value=event
            )
            futures.append(future)
        
        # Wait for all to complete
        for future in futures:
            metadata = future.get(timeout=10)
            print(f"Sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
    
    def close(self):
        self.producer.flush()
        self.producer.close()


# Usage
producer = OrderProducer(['localhost:9092'])

order = {
    'order_id': 'ORD-12345',
    'customer_id': 'CUST-001',
    'total_amount': 99.99,
    'items': ['item1', 'item2']
}

result = producer.send_order(order)
print(f"Order sent: {result}")

High-Performance Kafka Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from typing import Callable, Optional

logger = logging.getLogger(__name__)

class ReliableKafkaProducer:
    """Production-grade Kafka producer"""

    def __init__(self, bootstrap_servers: list[str]):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            acks='all',
            retries=3,
            max_in_flight_requests_per_connection=5,
            compression_type='snappy',
            batch_size=16384,
            linger_ms=10,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None
        )
        self.metrics = {
            'sent': 0,
            'failed': 0,
            'latency_ms': []
        }

    def send_event(self, topic: str, event: dict,
                  key: Optional[str] = None,
                  callback: Optional[Callable] = None) -> bool:
        """Send event to Kafka"""

        try:
            future = self.producer.send(
                topic,
                value=event,
                key=key,
                timestamp_ms=int(time.time() * 1000)
            )

            future.add_callback(self._on_send_success)
            future.add_errback(self._on_send_error)

            self.metrics['sent'] += 1
            return True

        except Exception as e:
            logger.error(f"Error sending event: {e}")
            self.metrics['failed'] += 1
            return False

    def send_batch(self, topic: str, events: list[dict],
                  key_field: Optional[str] = None) -> int:
        """Send batch of events"""

        sent_count = 0
        for event in events:
            key = event.get(key_field) if key_field else None
            if self.send_event(topic, event, key):
                sent_count += 1

        return sent_count

    def _on_send_success(self, record_metadata):
        """Callback on successful send"""
        logger.debug(f"Message sent to {record_metadata.topic} "
                    f"partition {record_metadata.partition} "
                    f"offset {record_metadata.offset}")

    def _on_send_error(self, exc):
        """Callback on send error"""
        logger.error(f"Error sending message: {exc}")
        self.metrics['failed'] += 1

    def flush(self, timeout_ms: int = 30000):
        """Flush pending messages"""
        self.producer.flush(timeout_ms)

    def close(self):
        """Close producer"""
        self.producer.close()

    def get_metrics(self) -> dict:
        """Get producer metrics"""
        return self.metrics

# Usage
producer = ReliableKafkaProducer(['localhost:9092'])

event = {
    'user_id': 'user_123',
    'action': 'purchase',
    'amount': 99.99,
    'timestamp': datetime.now().isoformat()
}

producer.send_event('events', event, key='user_123')

events = [
    {'user_id': f'user_{i}', 'action': 'click', 'timestamp': datetime.now().isoformat()}
    for i in range(1000)
]

producer.send_batch('events', events, key_field='user_id')
producer.flush()
producer.close()

Kafka Consumer Implementation

from kafka import KafkaConsumer
import json

class OrderConsumer:
    """
    Kafka consumer for order events
    """
    
    def __init__(self, bootstrap_servers: list, group_id: str):
        self.consumer = KafkaConsumer(
            'orders',
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            auto_commit_interval_ms=5000,
            max_poll_records=100,
            max_poll_interval_ms=300000
        )
    
    def consume_orders(self):
        """
        Consume order events
        """
        
        print("Starting to consume orders...")
        
        for message in self.consumer:
            print(f"\n--- New Message ---")
            print(f"Topic: {message.topic}")
            print(f"Partition: {message.partition}")
            print(f"Offset: {message.offset}")
            print(f"Key: {message.key}")
            print(f"Value: {message.value}")
            
            # Process the order
            self.process_order(message.value)
    
    def process_order(self, order: dict):
        """
        Process order event
        """
        
        event_type = order.get('event_type')
        
        if event_type == 'order_created':
            print(f"Processing new order: {order['order_id']}")
            # Add business logic here
            
        elif event_type == 'order_updated':
            print(f"Order updated: {order['order_id']}")
            
        elif event_type == 'order_cancelled':
            print(f"Order cancelled: {order['order_id']}")
    
    def consume_with_partition_handling(self):
        """
        Handle each partition separately
        """
        
        # Get partition assignments
        print(f"Assigned partitions: {self.consumer.assignment()}")
        
        for message in self.consumer:
            # Process message
            self.process_order(message.value)
    
    def close(self):
        self.consumer.close()


# Usage
consumer = OrderConsumer(['localhost:9092'], 'order-processor-group')
consumer.consume_orders()

Apache Flink is a distributed stream processing framework that provides:

  • Exactly-once processing: Strong guarantee for state consistency
  • Event-time processing: Handle late-arriving data correctly
  • Windowing: Aggregate data over tumbling, sliding, or session windows
  • Stateful processing: Maintain state across events
  • Fault tolerance: Checkpoints and savepoints
┌─────────────────────────────────────────────────────────────────────┐
│                     FLINK ARCHITECTURE                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │                      FLINK APPLICATION                       │  │
│  │                                                               │  │
│  │  ┌─────────────┐   ┌─────────────┐   ┌─────────────┐    │  │
│  │  │  Source     │──►│  Process    │──►│   Sink      │    │  │
│  │  │  (Kafka)    │   │  (Operators)│   │  (Kafka/DB) │    │  │
│  │  └─────────────┘   └─────────────┘   └─────────────┘    │  │
│  │                                                               │  │
│  │  ┌─────────────────────────────────────────────────────┐  │  │
│  │  │              STATE & TIME                             │  │  │
│  │  │  • Managed state (RocksDB)                          │  │  │
│  │  │  • Event time / Processing time                    │  │  │
│  │  │  • Watermarks                                       │  │  │
│  │  └─────────────────────────────────────────────────────┘  │  │
│  └──────────────────────────────────────────────────────────────┘  │
│                              │                                       │
│  ┌───────────────────────────┴─────────────────────────────────┐  │
│  │                  FLINK CLUSTER                               │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐      │  │
│  │  │ JobManager  │  │ TaskManager │  │TaskManager │      │  │
│  │  │ (Leader)   │  │  (Worker)  │  │ (Worker)   │      │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘      │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, FilterFunction, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types

class OrderMapFunction(MapFunction):
    """
    Transform order events
    """
    
    def map(self, value):
        # Parse JSON
        import json
        order = json.loads(value)
        
        # Transform
        return {
            'order_id': order['order_id'],
            'customer_id': order['customer_id'],
            'total_amount': float(order['total_amount']),
            'timestamp': order['timestamp'],
            'hour': int(order['timestamp'].split(':')[0])
        }

class HighValueFilter(FilterFunction):
    """
    Filter high-value orders
    """
    
    def filter(self, value):
        return value['total_amount'] > 100.0

class OrderAggregator(WindowFunction):
    """
    Aggregate orders in a window
    """
    
    def apply(self, key, window, values):
        total = sum(v['total_amount'] for v in values)
        count = len(values)
        
        return [(key, window.end, count, total)]

def process_orders():
    """
    Main Flink job for order processing
    """
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)
    
    # Source: Read from Kafka
    kafka_source = (
        FlinkKafkaConsumer(
            topics='orders',
            deserialization_schema=SimpleStringSchema(),
            properties={
                'bootstrap.servers': 'localhost:9092',
                'group.id': 'flink-order-processor'
            }
        )
    )
    
    orders = env.add_source(kafka_source)
    
    # Transform: Map to structured data
    transformed = orders.map(OrderMapFunction())
    
    # Filter: High-value orders only
    high_value = transformed.filter(HighValueFilter())
    
    # Window: Tumbling window (5 minutes)
    windowed = (
        high_value
        .key_by(lambda x: x['customer_id'])
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .apply(OrderAggregator())
    )
    
    # Sink: Write to Kafka
    kafka_sink = (
        FlinkKafkaProducer(
            topic='order-aggregates',
            serialization_schema=JsonSerializationSchema(),
            producer_config={
                'bootstrap.servers': 'localhost:9092'
            }
        )
    )
    
    windowed.add_sink(kafka_sink)
    
    # Execute
    env.execute("Order Processing Job")
from pyflink.table import StreamTableEnvironment

def flink_sql_processing():
    """
    Use Flink SQL for stream processing
    """
    
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    
    # Create source table (Kafka)
    t_env.execute_sql("""
        CREATE TEMPORARY VIEW orders (
            order_id STRING,
            customer_id STRING,
            total_amount DECIMAL(10, 2),
            order_time TIMESTAMP(3),
            WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'orders',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'flink-sql-processor',
            'format' = 'json',
            'json.timestamp-format.standard' = 'ISO-8601'
        )
    """)
    
    # Create sink table (Kafka)
    t_env.execute_sql("""
        CREATE TEMPORARY VIEW order_aggregates (
            window_start TIMESTAMP,
            window_end TIMESTAMP,
            customer_id STRING,
            order_count BIGINT,
            total_amount DECIMAL(15, 2)
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'order-aggregates',
            'properties.bootstrap.servers' = 'localhost:9092',
            'format' = 'json'
        )
    """)
    
    # Tumbling window aggregation
    t_env.execute_sql("""
        INSERT INTO order_aggregates
        SELECT
            TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
            TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
            customer_id,
            COUNT(*) AS order_count,
            SUM(total_amount) AS total_amount
        FROM orders
        GROUP BY
            TUMBLE(order_time, INTERVAL '5' MINUTE),
            customer_id
    """)
    
    # Session window aggregation
    t_env.execute_sql("""
        SELECT
            SESSION_START(order_time, INTERVAL '10' MINUTE) AS session_start,
            SESSION_END(order_time, INTERVAL '10' MINUTE) AS session_end,
            customer_id,
            COUNT(*) AS session_orders,
            SUM(total_amount) AS session_total
        FROM orders
        GROUP BY
            SESSION(order_time, INTERVAL '10' MINUTE),
            customer_id
    """)
    
    # Running totals with OVER windows
    t_env.execute_sql("""
        SELECT
            order_id,
            customer_id,
            total_amount,
            order_time,
            SUM(total_amount) OVER (
                PARTITION BY customer_id
                ORDER BY order_time
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            ) AS running_total
        FROM orders
    """    )

Spark Structured Streaming

Spark Streaming Job

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def create_spark_streaming_job():
    """Create Spark Structured Streaming job"""

    spark = SparkSession.builder \
        .appName("RealTimeAnalytics") \
        .getOrCreate()

    # Set log level
    spark.sparkContext.setLogLevel("WARN")

    # Define schema
    schema = StructType([
        StructField("user_id", StringType()),
        StructField("action", StringType()),
        StructField("amount", DoubleType()),
        StructField("timestamp", TimestampType())
    ])

    # Read from Kafka
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "latest") \
        .load()

    # Parse JSON
    parsed_df = df.select(
        from_json(col("value").cast("string"), schema).alias("data")
    ).select("data.*")

    # Filter transactions
    transactions = parsed_df.filter(col("action") == "transaction")

    # Aggregate by user in 1-minute windows
    aggregated = transactions \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "1 minute"),
            col("user_id")
        ) \
        .agg(
            count("*").alias("transaction_count"),
            sum("amount").alias("total_amount"),
            avg("amount").alias("avg_amount"),
            min("amount").alias("min_amount"),
            max("amount").alias("max_amount")
        )

    # Write to Kafka
    query = aggregated \
        .select(to_json(struct("*")).alias("value")) \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "aggregated-transactions") \
        .option("checkpointLocation", "/tmp/checkpoint") \
        .start()

    # Also write to console for debugging
    console_query = aggregated \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", False) \
        .start()

    # Wait for termination
    spark.streams.awaitAnyTermination()

# Run job
if __name__ == '__main__':
    create_spark_streaming_job()

Windowing Concepts

Types of Windows

┌─────────────────────────────────────────────────────────────────────┐
│                    WINDOW TYPES                                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  TUMBLING WINDOW              SLIDING WINDOW                        │
│  ───────────────              ───────────────                       │
│  [----|----|----|----]        [--|----|----|----|--]               │
│   0-5   5-10  10-15          -2-3   3-8   8-13                  │
│                                                                     │
│  Non-overlapping,          Overlapping,                              │
│  fixed size               fixed size                                │
│                                                                     │
│  SESSION WINDOW            COUNT WINDOW                              │
│  ─────────────            ────────────                              │
│  [----]  [--]  [----]     [1,2,3][2,3,4][3,4,5]                  │
│                                                                     │
│  Activity-based,          Fixed number of events                    │
│  gap-based                                                             │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Handling Late Data with Watermarks

from pyflink.datastream import TimeCharacteristic
from pyflink.common.watermark_strategy import WatermarkStrategy

# Set event time characteristic
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# Define watermark strategy
watermark_strategy = (
    WatermarkStrategy
    .for_bounded_out_of_orderness(Duration.of_seconds(10))
    .with_timestamp_assigner(lambda event, timestamp: event['timestamp'])
)

# Apply watermark strategy
orders_with_watermarks = orders.assign_timestamps_and_watermarks(watermark_strategy)

# Now late events (more than 10 seconds late) will be handled correctly
# They will be emitted to a side output for special handling
late_data_side_output = OutputTag[Order]("late-data")

windowed = (
    orders_with_watermarks
    .key_by(lambda x: x['customer_id'])
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowed_lateness(Duration.of_seconds(30))  # Allow late data
    .side_output_late_data(late_data_side_output)
    .process(WindowProcessFunction())
)

# Get late data
late_orders = windowed.get_side_output(late_data_side_output)

Exactly-Once Semantics

What is Exactly-Once?

Exactly-once processing guarantees that each event is processed exactly once, even in case of failures. This requires:

  1. Idempotent producers: Retries produce same result
  2. Transactional consumers: Offsets committed with processing
  3. Checkpointing: State is periodically saved
┌─────────────────────────────────────────────────────────────────────┐
│               EXACTLY-ONCE PROCESSING                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Checkpointing Flow:                                                │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    FLINK CHECKPOINT                         │   │
│  │                                                               │   │
│  │  ┌─────────┐   ┌─────────┐   ┌─────────┐                  │   │
│  │  │Source 1 │   │Operator │   │  Sink   │                  │   │
│  │  │Offset=5 │──►│ State   │──►│ Committed│                  │   │
│  │  └─────────┘   └─────────┘   └─────────┘                  │   │
│  │       │             │             │                          │   │
│  │       └─────────────┴─────────────┘                          │   │
│  │                     │                                        │   │
│  │                     ▼                                        │   │
│  │            ┌──────────────┐                                  │   │
│  │            │ Checkpoint  │                                  │   │
│  │            │   Barrier   │                                  │   │
│  │            └──────────────┘                                  │   │
│  │                     │                                        │   │
│  │                     ▼                                        │   │
│  │            ┌──────────────┐                                  │   │
│  │            │   RocksDB   │                                  │   │
│  │            │  (State)    │                                  │   │
│  │            └──────────────┘                                  │   │
│  │                     │                                        │   │
│  │                     ▼                                        │   │
│  │            ┌──────────────┐                                  │   │
│  │            │     S3/     │                                  │   │
│  │            │  HDFS       │                                  │   │
│  │            └──────────────┘                                  │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  On Failure Recovery:                                               │
│  1. Restore state from checkpoint                                  │
│  2. Resume from saved offset                                       │
│  3. Replay events                                                  │
│  4. Result: Same as if no failure occurred                         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
from pyflink.datastream import CheckpointingMode

# Enable checkpointing
env.enable_checkpointing(interval=60000)  # Every minute
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
env.get_checkpoint_config().set_checkpoint_timeout(600000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# Set exactly-once mode
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

# Enable unaligned checkpoints for better performance
env.get_checkpoint_config().enable_unaligned_checkpoints()

# Configure RocksDB state backend
from pyflink.runtime.state.storage import RocksDBStateBackend

state_backend = RocksDBStateBackend("file:///tmp/flink/checkpoints")
env.set_state_backend(state_backend)

# Configure Kafka exactly-once
kafka_source = (
    FlinkKafkaConsumer(
        topics='orders',
        deserialization_schema=JsonDeserializationSchema(),
        properties={
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'flink-processor',
            'enable.auto.commit': 'false',  # Flink manages offsets
            'isolation.level': 'read_committed'  # Only read committed
        }
    )
)

Building Complete Stream Pipeline

Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│              COMPLETE STREAM PROCESSING PIPELINE                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐   │
│  │  Source  │    │  Ingest  │    │ Process  │    │   Sink   │   │
│  │  Systems │    │  (Kafka) │    │ (Flink)  │    │  Targets  │   │
│  └────┬─────┘    └────┬─────┘    └────┬─────┘    └────┬─────┘   │
│       │               │               │               │          │
│       ▼               ▼               ▼               ▼          │
│  ┌─────────────────────────────────────────────────────────────┐  │
│  │                                                              │  │
│  │  Mobile App ──► Orders Topic ──► Flink ──► Analytics     │  │
│  │                          │              │                   │  │
│  │                          │              ├──► Alerts         │  │
│  │                          │              │                   │  │
│  │                          │              ├──► Database       │  │
│  │                          │              │                   │  │
│  │                          │              └──► Kafka (out)   │  │
│  │                                                              │  │
│  └─────────────────────────────────────────────────────────────┘  │
│                                                                     │
│  Monitoring: Prometheus + Grafana                                    │
│  Logging: ELK Stack                                                 │
│  Deployment: Kubernetes                                              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Complete Example

class OrderProcessingPipeline:
    """
    Complete order processing pipeline with Kafka and Flink
    """
    
    def __init__(self, config: dict):
        self.config = config
        self.env = self._create_environment()
    
    def _create_environment(self):
        env = StreamExecutionEnvironment.get_execution_environment()
        
        # Checkpoint configuration
        env.enable_checkpointing(60000)
        env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
        env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
        
        # State backend
        env.set_state_backend(RocksDBStateBackend("file:///tmp/checkpoints"))
        
        # Parallelism
        env.set_parallelism(self.config.get('parallelism', 4))
        
        return env
    
    def build_pipeline(self):
        # Source: Kafka
        orders = self.env.add_source(
            FlinkKafkaConsumer(
                topics='orders',
                deserialization_schema=JsonDeserializationSchema(),
                properties={
                    'bootstrap.servers': self.config['kafka_bootstrap'],
                    'group.id': 'order-processor',
                    'enable.auto.commit': 'false'
                }
            )
        )
        
        # Parse and validate
        validated = orders.map(OrderValidator())
        
        # Filter invalid orders
        valid_orders = validated.filter(ValidOrderFilter())
        invalid_orders = validated.filter(InvalidOrderFilter())
        
        # Process valid orders: enrich with customer data
        enriched = valid_orders.map(CustomerEnricher(self.config['customer_api']))
        
        # Window aggregation
        hourly_metrics = (
            enriched
            .key_by(lambda x: x['customer_id'])
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .process(HourlyAggregation())
        )
        
        # Sink: Metrics to Kafka
        hourly_metrics.add_sink(
            FlinkKafkaProducer(
                topic='order-metrics',
                serialization_schema=JsonSerializationSchema(),
                producer_config={
                    'bootstrap.servers': self.config['kafka_bootstrap'],
                    'transaction.timeout.ms': '900000',
                    'enable.idempotence': 'true'
                }
            )
        )
        
        # Sink: Valid orders to database
        enriched.add_sink(
            JdbcSink(
                sql="INSERT INTO orders VALUES (?, ?, ?, ?, ?)",
                parameter_mapping=lambda x: (
                    x['order_id'],
                    x['customer_id'],
                    x['total_amount'],
                    x['timestamp'],
                    x['status']
                ),
                driver="org.postgresql.Driver",
                url=self.config['db_url']
            )
        )
        
        # Sink: Invalid orders to dead letter queue
        invalid_orders.add_sink(
            FlinkKafkaProducer(
                topic='orders-dlq',
                serialization_schema=JsonSerializationSchema(),
                producer_config={
                    'bootstrap.servers': self.config['kafka_bootstrap']
                }
            )
        )
        
        # Return env for execution
        return self.env
    
    def run(self):
        env = self.build_pipeline()
        env.execute("Order Processing Pipeline")

Common Pitfalls

1. Not Handling Backpressure

# Anti-pattern: No backpressure handling
def bad_backpressure():
    # High throughput source can overwhelm downstream
    source = env.add_source(UnboundedSource())
    result = source.map(ExpensiveOperation())  # Can cause OOM!
    return result

# Good pattern: Configure backpressure
def good_backpressure():
    # Set buffer timeout
    env.set_buffer_timeout(100)  # Flush buffers every 100ms
    
    # Use keyed state (distributes load)
    source = env.add_source(UnboundedSource())
    result = source.key_by(lambda x: x['key']).map(ExpensiveOperation())
    return result

2. Ignoring State Size

# Anti-pattern: Unbounded state
def bad_state():
    # Storing all events in state
    class BadProcessFunction(KeyedProcessFunction):
        def process(self, value, ctx):
            # Never clears state!
            state = ctx.get_state(ValueStateDescriptor("all", String))
            current = state.value() or ""
            state.update(current + json.dumps(value))
            return value

# Good pattern: State with TTL
def good_state():
    # Use state with TTL
    state_descriptor = ValueStateDescriptor(
        "recent",
        String,
        ttl=Time.hours(1)
    )
    state_descriptor.set_early_state_cleanup(true, Time.hours(1))

Best Practices

1. Use Appropriate Serialization

# Prefer Kryo or custom serializers over Java serialization
from pyflink.common.serializers import KryoSerializer

# Configure
env.get_config().set_default_serialization_config(
    KryoSerializer(),
    MyCustomClass
)

2. Monitor and Alert

# Key metrics to monitor:
# - Checkpoint duration and size
# - Latency (end-to-end, processing)
# - Backpressure
# - State size
# - Kafka consumer lag

Performance Comparison

Framework Latency Throughput Complexity Maturity
Kafka Streams Low High Low High
Flink Very Low Very High Medium High
Spark Streaming Medium Very High Medium High
Storm Low Medium High Medium

External Resources


Conclusion

Building real-time data pipelines with Kafka and Flink enables organizations to process data as it arrives, unlocking use cases from fraud detection to real-time personalization.

Key takeaways:

  • Kafka provides durable, scalable event streaming
  • Flink offers exactly-once processing with event-time semantics
  • Use windowing to aggregate streaming data
  • Implement checkpoints for fault tolerance
  • Monitor lag, latency, and state size
  • Handle late data with watermarks and allowed lateness

Together, Kafka and Flink form the foundation of modern stream processing architectures, enabling organizations to build responsive, data-driven applications.

Comments

👍 Was this article helpful?