Skip to main content
โšก Calmops

Stream Processing with Kafka and Flink

Introduction

In today’s data-driven world, the ability to process data in real-time is critical for fraud detection, monitoring, personalization, and operational intelligence. Apache Kafka and Apache Flink form the backbone of modern stream processing systems, enabling organizations to build robust, scalable real-time data pipelines.

In this guide, we’ll explore event streaming with Kafka, stream processing with Flink, and how to build end-to-end real-time data systems.


Understanding Event Streaming

What is Event Streaming?

Event streaming is the practice of capturing data in real-time as a continuous stream of events. Unlike batch processing, which processes data in discrete chunks, event streaming handles data as it arrives.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚               BATCH VS STREAM PROCESSING                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  BATCH PROCESSING                    STREAM PROCESSING               โ”‚
โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                โ”‚
โ”‚                                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚
โ”‚  โ”‚ Collect data    โ”‚               โ”‚ Process events โ”‚              โ”‚
โ”‚  โ”‚ for period    โ”‚               โ”‚ immediately    โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚           โ”‚                                โ”‚                        โ”‚
โ”‚           โ–ผ                                โ–ผ                        โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚
โ”‚  โ”‚ Process at     โ”‚               โ”‚ Continuous      โ”‚              โ”‚
โ”‚  โ”‚ scheduled time โ”‚               โ”‚ processing      โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚           โ”‚                                โ”‚                        โ”‚
โ”‚           โ–ผ                                โ–ผ                        โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚
โ”‚  โ”‚ Results at     โ”‚               โ”‚ Results in      โ”‚              โ”‚
โ”‚  โ”‚ end of batch  โ”‚               โ”‚ real-time      โ”‚              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                     โ”‚
โ”‚  Examples:                        Examples:                         โ”‚
โ”‚  โ€ข Daily reports                  โ€ข Fraud detection                 โ”‚
โ”‚  โ€ข Monthly analytics              โ€ข Real-time dashboards           โ”‚
โ”‚  โ€ข ETL jobs                      โ€ข Alerting systems               โ”‚
โ”‚  โ€ข Data warehouse load            โ€ข Live personalization           โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Apache Kafka: Event Streaming Platform

Core Concepts

Kafka is a distributed event streaming platform that can publish, subscribe to, store, and process streams of records.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    KAFKA ARCHITECTURE                                 โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                      KAFKA CLUSTER                           โ”‚  โ”‚
โ”‚  โ”‚                                                               โ”‚  โ”‚
โ”‚  โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚  โ”‚
โ”‚  โ”‚    โ”‚ Broker 1โ”‚    โ”‚ Broker 2โ”‚    โ”‚ Broker Nโ”‚              โ”‚  โ”‚
โ”‚  โ”‚    โ”‚(Leader) โ”‚    โ”‚(Follower)โ”‚   โ”‚(Follower)โ”‚              โ”‚  โ”‚
โ”‚  โ”‚    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜              โ”‚  โ”‚
โ”‚  โ”‚         โ”‚              โ”‚              โ”‚                      โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚            โ”‚              โ”‚              โ”‚                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                      TOPICS                                   โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚   Orders     โ”‚  โ”‚   Clicks     โ”‚  โ”‚   Logs       โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ โ”Œโ”€โ”ฌโ”€โ”ฌโ”€โ”ฌโ”€โ”ฌโ”€โ” โ”‚  โ”‚ โ”Œโ”€โ”ฌโ”€โ”ฌโ”€โ”ฌโ”€โ”ฌโ”€โ” โ”‚  โ”‚ โ”Œโ”€โ”ฌโ”€โ”ฌโ”€โ”ฌโ”€โ”ฌโ”€โ” โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ โ”‚0โ”‚1โ”‚2โ”‚3โ”‚4โ”‚5โ”‚ โ”‚  โ”‚ โ”‚0โ”‚1โ”‚2โ”‚3โ”‚4โ”‚ โ”‚  โ”‚ โ”‚0โ”‚1โ”‚2โ”‚3โ”‚4โ”‚ โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ โ””โ”€โ”ดโ”€โ”ดโ”€โ”ดโ”€โ”ดโ”€โ”˜ โ”‚  โ”‚ โ””โ”€โ”ดโ”€โ”ดโ”€โ”ดโ”€โ”ดโ”€โ”˜ โ”‚  โ”‚ โ””โ”€โ”ดโ”€โ”ดโ”€โ”ดโ”€โ”ดโ”€โ”˜ โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  Partitions โ”‚  โ”‚  Partitions โ”‚  โ”‚  Partitions โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                     โ”‚
โ”‚  PRODUCERS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บ KAFKA โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–บ    โ”‚
โ”‚                                    CONSUMERS                        โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key Terminology

  • Topic: A category/feed name to which messages are published
  • Partition: Topics are split into partitions for parallelism
  • Producer: Publishes messages to topics
  • Consumer: Subscribes to topics and processes messages
  • Broker: Kafka server that stores messages
  • Consumer Group: Group of consumers that share partitions
  • Offset: Position of consumer in a partition

Kafka Producer Implementation

from kafka import KafkaProducer
import json
import time

class OrderProducer:
    """
    Kafka producer for order events
    """
    
    def __init__(self, bootstrap_servers: list):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Wait for all replicas
            retries=3,
            max_in_flight_requests_per_connection=5,
            compression_type='gzip'
        )
        
        self.topic = 'orders'
    
    def send_order(self, order_data: dict):
        """
        Send order event to Kafka
        """
        
        # Create order event
        event = {
            'event_type': 'order_created',
            'timestamp': time.time(),
            'order_id': order_data['order_id'],
            'customer_id': order_data['customer_id'],
            'total_amount': order_data['total_amount'],
            'items': order_data['items'],
            'status': 'created'
        }
        
        # Send with order ID as key for partitioning
        future = self.producer.send(
            self.topic,
            key=order_data['order_id'],
            value=event
        )
        
        # Wait for confirmation
        record_metadata = future.get(timeout=10)
        
        return {
            'topic': record_metadata.topic,
            'partition': record_metadata.partition,
            'offset': record_metadata.offset
        }
    
    def send_batch(self, orders: list):
        """
        Send batch of orders
        """
        
        futures = []
        for order in orders:
            event = {
                'event_type': 'order_created',
                'timestamp': time.time(),
                **order
            }
            future = self.producer.send(
                self.topic,
                key=order['order_id'],
                value=event
            )
            futures.append(future)
        
        # Wait for all to complete
        for future in futures:
            metadata = future.get(timeout=10)
            print(f"Sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
    
    def close(self):
        self.producer.flush()
        self.producer.close()


# Usage
producer = OrderProducer(['localhost:9092'])

order = {
    'order_id': 'ORD-12345',
    'customer_id': 'CUST-001',
    'total_amount': 99.99,
    'items': ['item1', 'item2']
}

result = producer.send_order(order)
print(f"Order sent: {result}")

Kafka Consumer Implementation

from kafka import KafkaConsumer
import json

class OrderConsumer:
    """
    Kafka consumer for order events
    """
    
    def __init__(self, bootstrap_servers: list, group_id: str):
        self.consumer = KafkaConsumer(
            'orders',
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            auto_commit_interval_ms=5000,
            max_poll_records=100,
            max_poll_interval_ms=300000
        )
    
    def consume_orders(self):
        """
        Consume order events
        """
        
        print("Starting to consume orders...")
        
        for message in self.consumer:
            print(f"\n--- New Message ---")
            print(f"Topic: {message.topic}")
            print(f"Partition: {message.partition}")
            print(f"Offset: {message.offset}")
            print(f"Key: {message.key}")
            print(f"Value: {message.value}")
            
            # Process the order
            self.process_order(message.value)
    
    def process_order(self, order: dict):
        """
        Process order event
        """
        
        event_type = order.get('event_type')
        
        if event_type == 'order_created':
            print(f"Processing new order: {order['order_id']}")
            # Add business logic here
            
        elif event_type == 'order_updated':
            print(f"Order updated: {order['order_id']}")
            
        elif event_type == 'order_cancelled':
            print(f"Order cancelled: {order['order_id']}")
    
    def consume_with_partition_handling(self):
        """
        Handle each partition separately
        """
        
        # Get partition assignments
        print(f"Assigned partitions: {self.consumer.assignment()}")
        
        for message in self.consumer:
            # Process message
            self.process_order(message.value)
    
    def close(self):
        self.consumer.close()


# Usage
consumer = OrderConsumer(['localhost:9092'], 'order-processor-group')
consumer.consume_orders()

Apache Flink is a distributed stream processing framework that provides:

  • Exactly-once processing: Strong guarantee for state consistency
  • Event-time processing: Handle late-arriving data correctly
  • Windowing: Aggregate data over tumbling, sliding, or session windows
  • Stateful processing: Maintain state across events
  • Fault tolerance: Checkpoints and savepoints
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                     FLINK ARCHITECTURE                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                      FLINK APPLICATION                       โ”‚  โ”‚
โ”‚  โ”‚                                                               โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  Source     โ”‚โ”€โ”€โ–บโ”‚  Process    โ”‚โ”€โ”€โ–บโ”‚   Sink      โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  (Kafka)    โ”‚   โ”‚  (Operators)โ”‚   โ”‚  (Kafka/DB) โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  โ”‚
โ”‚  โ”‚                                                               โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚              STATE & TIME                             โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ€ข Managed state (RocksDB)                          โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ€ข Event time / Processing time                    โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ€ข Watermarks                                       โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ”‚                                       โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                  FLINK CLUSTER                               โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”      โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ JobManager  โ”‚  โ”‚ TaskManager โ”‚  โ”‚TaskManager โ”‚      โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ (Leader)   โ”‚  โ”‚  (Worker)  โ”‚  โ”‚ (Worker)   โ”‚      โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜      โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, FilterFunction, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types

class OrderMapFunction(MapFunction):
    """
    Transform order events
    """
    
    def map(self, value):
        # Parse JSON
        import json
        order = json.loads(value)
        
        # Transform
        return {
            'order_id': order['order_id'],
            'customer_id': order['customer_id'],
            'total_amount': float(order['total_amount']),
            'timestamp': order['timestamp'],
            'hour': int(order['timestamp'].split(':')[0])
        }

class HighValueFilter(FilterFunction):
    """
    Filter high-value orders
    """
    
    def filter(self, value):
        return value['total_amount'] > 100.0

class OrderAggregator(WindowFunction):
    """
    Aggregate orders in a window
    """
    
    def apply(self, key, window, values):
        total = sum(v['total_amount'] for v in values)
        count = len(values)
        
        return [(key, window.end, count, total)]

def process_orders():
    """
    Main Flink job for order processing
    """
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)
    
    # Source: Read from Kafka
    kafka_source = (
        FlinkKafkaConsumer(
            topics='orders',
            deserialization_schema=SimpleStringSchema(),
            properties={
                'bootstrap.servers': 'localhost:9092',
                'group.id': 'flink-order-processor'
            }
        )
    )
    
    orders = env.add_source(kafka_source)
    
    # Transform: Map to structured data
    transformed = orders.map(OrderMapFunction())
    
    # Filter: High-value orders only
    high_value = transformed.filter(HighValueFilter())
    
    # Window: Tumbling window (5 minutes)
    windowed = (
        high_value
        .key_by(lambda x: x['customer_id'])
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .apply(OrderAggregator())
    )
    
    # Sink: Write to Kafka
    kafka_sink = (
        FlinkKafkaProducer(
            topic='order-aggregates',
            serialization_schema=JsonSerializationSchema(),
            producer_config={
                'bootstrap.servers': 'localhost:9092'
            }
        )
    )
    
    windowed.add_sink(kafka_sink)
    
    # Execute
    env.execute("Order Processing Job")
from pyflink.table import StreamTableEnvironment

def flink_sql_processing():
    """
    Use Flink SQL for stream processing
    """
    
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(env)
    
    # Create source table (Kafka)
    t_env.execute_sql("""
        CREATE TEMPORARY VIEW orders (
            order_id STRING,
            customer_id STRING,
            total_amount DECIMAL(10, 2),
            order_time TIMESTAMP(3),
            WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'orders',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'flink-sql-processor',
            'format' = 'json',
            'json.timestamp-format.standard' = 'ISO-8601'
        )
    """)
    
    # Create sink table (Kafka)
    t_env.execute_sql("""
        CREATE TEMPORARY VIEW order_aggregates (
            window_start TIMESTAMP,
            window_end TIMESTAMP,
            customer_id STRING,
            order_count BIGINT,
            total_amount DECIMAL(15, 2)
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'order-aggregates',
            'properties.bootstrap.servers' = 'localhost:9092',
            'format' = 'json'
        )
    """)
    
    # Tumbling window aggregation
    t_env.execute_sql("""
        INSERT INTO order_aggregates
        SELECT
            TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
            TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
            customer_id,
            COUNT(*) AS order_count,
            SUM(total_amount) AS total_amount
        FROM orders
        GROUP BY
            TUMBLE(order_time, INTERVAL '5' MINUTE),
            customer_id
    """)
    
    # Session window aggregation
    t_env.execute_sql("""
        SELECT
            SESSION_START(order_time, INTERVAL '10' MINUTE) AS session_start,
            SESSION_END(order_time, INTERVAL '10' MINUTE) AS session_end,
            customer_id,
            COUNT(*) AS session_orders,
            SUM(total_amount) AS session_total
        FROM orders
        GROUP BY
            SESSION(order_time, INTERVAL '10' MINUTE),
            customer_id
    """)
    
    # Running totals with OVER windows
    t_env.execute_sql("""
        SELECT
            order_id,
            customer_id,
            total_amount,
            order_time,
            SUM(total_amount) OVER (
                PARTITION BY customer_id
                ORDER BY order_time
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
            ) AS running_total
        FROM orders
    """)

Windowing Concepts

Types of Windows

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    WINDOW TYPES                                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  TUMBLING WINDOW              SLIDING WINDOW                        โ”‚
โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€              โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                       โ”‚
โ”‚  [----|----|----|----]        [--|----|----|----|--]               โ”‚
โ”‚   0-5   5-10  10-15          -2-3   3-8   8-13                  โ”‚
โ”‚                                                                     โ”‚
โ”‚  Non-overlapping,          Overlapping,                              โ”‚
โ”‚  fixed size               fixed size                                โ”‚
โ”‚                                                                     โ”‚
โ”‚  SESSION WINDOW            COUNT WINDOW                              โ”‚
โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€            โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                              โ”‚
โ”‚  [----]  [--]  [----]     [1,2,3][2,3,4][3,4,5]                  โ”‚
โ”‚                                                                     โ”‚
โ”‚  Activity-based,          Fixed number of events                    โ”‚
โ”‚  gap-based                                                             โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Handling Late Data with Watermarks

from pyflink.datastream import TimeCharacteristic
from pyflink.common.watermark_strategy import WatermarkStrategy

# Set event time characteristic
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# Define watermark strategy
watermark_strategy = (
    WatermarkStrategy
    .for_bounded_out_of_orderness(Duration.of_seconds(10))
    .with_timestamp_assigner(lambda event, timestamp: event['timestamp'])
)

# Apply watermark strategy
orders_with_watermarks = orders.assign_timestamps_and_watermarks(watermark_strategy)

# Now late events (more than 10 seconds late) will be handled correctly
# They will be emitted to a side output for special handling
late_data_side_output = OutputTag[Order]("late-data")

windowed = (
    orders_with_watermarks
    .key_by(lambda x: x['customer_id'])
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowed_lateness(Duration.of_seconds(30))  # Allow late data
    .side_output_late_data(late_data_side_output)
    .process(WindowProcessFunction())
)

# Get late data
late_orders = windowed.get_side_output(late_data_side_output)

Exactly-Once Semantics

What is Exactly-Once?

Exactly-once processing guarantees that each event is processed exactly once, even in case of failures. This requires:

  1. Idempotent producers: Retries produce same result
  2. Transactional consumers: Offsets committed with processing
  3. Checkpointing: State is periodically saved
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚               EXACTLY-ONCE PROCESSING                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  Checkpointing Flow:                                                โ”‚
โ”‚                                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚                    FLINK CHECKPOINT                         โ”‚   โ”‚
โ”‚  โ”‚                                                               โ”‚   โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚   โ”‚
โ”‚  โ”‚  โ”‚Source 1 โ”‚   โ”‚Operator โ”‚   โ”‚  Sink   โ”‚                  โ”‚   โ”‚
โ”‚  โ”‚  โ”‚Offset=5 โ”‚โ”€โ”€โ–บโ”‚ State   โ”‚โ”€โ”€โ–บโ”‚ Committedโ”‚                  โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                  โ”‚   โ”‚
โ”‚  โ”‚       โ”‚             โ”‚             โ”‚                          โ”‚   โ”‚
โ”‚  โ”‚       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ”‚   โ”‚
โ”‚  โ”‚                     โ”‚                                        โ”‚   โ”‚
โ”‚  โ”‚                     โ–ผ                                        โ”‚   โ”‚
โ”‚  โ”‚            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                  โ”‚   โ”‚
โ”‚  โ”‚            โ”‚ Checkpoint  โ”‚                                  โ”‚   โ”‚
โ”‚  โ”‚            โ”‚   Barrier   โ”‚                                  โ”‚   โ”‚
โ”‚  โ”‚            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                  โ”‚   โ”‚
โ”‚  โ”‚                     โ”‚                                        โ”‚   โ”‚
โ”‚  โ”‚                     โ–ผ                                        โ”‚   โ”‚
โ”‚  โ”‚            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                  โ”‚   โ”‚
โ”‚  โ”‚            โ”‚   RocksDB   โ”‚                                  โ”‚   โ”‚
โ”‚  โ”‚            โ”‚  (State)    โ”‚                                  โ”‚   โ”‚
โ”‚  โ”‚            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                  โ”‚   โ”‚
โ”‚  โ”‚                     โ”‚                                        โ”‚   โ”‚
โ”‚  โ”‚                     โ–ผ                                        โ”‚   โ”‚
โ”‚  โ”‚            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                  โ”‚   โ”‚
โ”‚  โ”‚            โ”‚     S3/     โ”‚                                  โ”‚   โ”‚
โ”‚  โ”‚            โ”‚  HDFS       โ”‚                                  โ”‚   โ”‚
โ”‚  โ”‚            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                  โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                     โ”‚
โ”‚  On Failure Recovery:                                               โ”‚
โ”‚  1. Restore state from checkpoint                                  โ”‚
โ”‚  2. Resume from saved offset                                       โ”‚
โ”‚  3. Replay events                                                  โ”‚
โ”‚  4. Result: Same as if no failure occurred                         โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
from pyflink.datastream import CheckpointingMode

# Enable checkpointing
env.enable_checkpointing(interval=60000)  # Every minute
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
env.get_checkpoint_config().set_checkpoint_timeout(600000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# Set exactly-once mode
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

# Enable unaligned checkpoints for better performance
env.get_checkpoint_config().enable_unaligned_checkpoints()

# Configure RocksDB state backend
from pyflink.runtime.state.storage import RocksDBStateBackend

state_backend = RocksDBStateBackend("file:///tmp/flink/checkpoints")
env.set_state_backend(state_backend)

# Configure Kafka exactly-once
kafka_source = (
    FlinkKafkaConsumer(
        topics='orders',
        deserialization_schema=JsonDeserializationSchema(),
        properties={
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'flink-processor',
            'enable.auto.commit': 'false',  # Flink manages offsets
            'isolation.level': 'read_committed'  # Only read committed
        }
    )
)

Building Complete Stream Pipeline

Architecture Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              COMPLETE STREAM PROCESSING PIPELINE                      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚  Source  โ”‚    โ”‚  Ingest  โ”‚    โ”‚ Process  โ”‚    โ”‚   Sink   โ”‚   โ”‚
โ”‚  โ”‚  Systems โ”‚    โ”‚  (Kafka) โ”‚    โ”‚ (Flink)  โ”‚    โ”‚  Targets  โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚       โ”‚               โ”‚               โ”‚               โ”‚          โ”‚
โ”‚       โ–ผ               โ–ผ               โ–ผ               โ–ผ          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                                                              โ”‚  โ”‚
โ”‚  โ”‚  Mobile App โ”€โ”€โ–บ Orders Topic โ”€โ”€โ–บ Flink โ”€โ”€โ–บ Analytics     โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚              โ”‚                   โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚              โ”œโ”€โ”€โ–บ Alerts         โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚              โ”‚                   โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚              โ”œโ”€โ”€โ–บ Database       โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚              โ”‚                   โ”‚  โ”‚
โ”‚  โ”‚                          โ”‚              โ””โ”€โ”€โ–บ Kafka (out)   โ”‚  โ”‚
โ”‚  โ”‚                                                              โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                     โ”‚
โ”‚  Monitoring: Prometheus + Grafana                                    โ”‚
โ”‚  Logging: ELK Stack                                                 โ”‚
โ”‚  Deployment: Kubernetes                                              โ”‚
โ”‚                                                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Complete Example

class OrderProcessingPipeline:
    """
    Complete order processing pipeline with Kafka and Flink
    """
    
    def __init__(self, config: dict):
        self.config = config
        self.env = self._create_environment()
    
    def _create_environment(self):
        env = StreamExecutionEnvironment.get_execution_environment()
        
        # Checkpoint configuration
        env.enable_checkpointing(60000)
        env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
        env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
        
        # State backend
        env.set_state_backend(RocksDBStateBackend("file:///tmp/checkpoints"))
        
        # Parallelism
        env.set_parallelism(self.config.get('parallelism', 4))
        
        return env
    
    def build_pipeline(self):
        # Source: Kafka
        orders = self.env.add_source(
            FlinkKafkaConsumer(
                topics='orders',
                deserialization_schema=JsonDeserializationSchema(),
                properties={
                    'bootstrap.servers': self.config['kafka_bootstrap'],
                    'group.id': 'order-processor',
                    'enable.auto.commit': 'false'
                }
            )
        )
        
        # Parse and validate
        validated = orders.map(OrderValidator())
        
        # Filter invalid orders
        valid_orders = validated.filter(ValidOrderFilter())
        invalid_orders = validated.filter(InvalidOrderFilter())
        
        # Process valid orders: enrich with customer data
        enriched = valid_orders.map(CustomerEnricher(self.config['customer_api']))
        
        # Window aggregation
        hourly_metrics = (
            enriched
            .key_by(lambda x: x['customer_id'])
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .process(HourlyAggregation())
        )
        
        # Sink: Metrics to Kafka
        hourly_metrics.add_sink(
            FlinkKafkaProducer(
                topic='order-metrics',
                serialization_schema=JsonSerializationSchema(),
                producer_config={
                    'bootstrap.servers': self.config['kafka_bootstrap'],
                    'transaction.timeout.ms': '900000',
                    'enable.idempotence': 'true'
                }
            )
        )
        
        # Sink: Valid orders to database
        enriched.add_sink(
            JdbcSink(
                sql="INSERT INTO orders VALUES (?, ?, ?, ?, ?)",
                parameter_mapping=lambda x: (
                    x['order_id'],
                    x['customer_id'],
                    x['total_amount'],
                    x['timestamp'],
                    x['status']
                ),
                driver="org.postgresql.Driver",
                url=self.config['db_url']
            )
        )
        
        # Sink: Invalid orders to dead letter queue
        invalid_orders.add_sink(
            FlinkKafkaProducer(
                topic='orders-dlq',
                serialization_schema=JsonSerializationSchema(),
                producer_config={
                    'bootstrap.servers': self.config['kafka_bootstrap']
                }
            )
        )
        
        # Return env for execution
        return self.env
    
    def run(self):
        env = self.build_pipeline()
        env.execute("Order Processing Pipeline")

Common Pitfalls

1. Not Handling Backpressure

# Anti-pattern: No backpressure handling
def bad_backpressure():
    # High throughput source can overwhelm downstream
    source = env.add_source(UnboundedSource())
    result = source.map(ExpensiveOperation())  # Can cause OOM!
    return result

# Good pattern: Configure backpressure
def good_backpressure():
    # Set buffer timeout
    env.set_buffer_timeout(100)  # Flush buffers every 100ms
    
    # Use keyed state (distributes load)
    source = env.add_source(UnboundedSource())
    result = source.key_by(lambda x: x['key']).map(ExpensiveOperation())
    return result

2. Ignoring State Size

# Anti-pattern: Unbounded state
def bad_state():
    # Storing all events in state
    class BadProcessFunction(KeyedProcessFunction):
        def process(self, value, ctx):
            # Never clears state!
            state = ctx.get_state(ValueStateDescriptor("all", String))
            current = state.value() or ""
            state.update(current + json.dumps(value))
            return value

# Good pattern: State with TTL
def good_state():
    # Use state with TTL
    state_descriptor = ValueStateDescriptor(
        "recent",
        String,
        ttl=Time.hours(1)
    )
    state_descriptor.set_early_state_cleanup(true, Time.hours(1))

Best Practices

1. Use Appropriate Serialization

# Prefer Kryo or custom serializers over Java serialization
from pyflink.common.serializers import KryoSerializer

# Configure
env.get_config().set_default_serialization_config(
    KryoSerializer(),
    MyCustomClass
)

2. Monitor and Alert

# Key metrics to monitor:
# - Checkpoint duration and size
# - Latency (end-to-end, processing)
# - Backpressure
# - State size
# - Kafka consumer lag

External Resources


Conclusion

Building real-time data pipelines with Kafka and Flink enables organizations to process data as it arrives, unlocking use cases from fraud detection to real-time personalization.

Key takeaways:

  • Kafka provides durable, scalable event streaming
  • Flink offers exactly-once processing with event-time semantics
  • Use windowing to aggregate streaming data
  • Implement checkpoints for fault tolerance
  • Monitor lag, latency, and state size
  • Handle late data with watermarks and allowed lateness

Together, Kafka and Flink form the foundation of modern stream processing architectures, enabling organizations to build responsive, data-driven applications.

Comments