Introduction
In today’s data-driven world, the ability to process data in real-time is critical for fraud detection, monitoring, personalization, and operational intelligence. Apache Kafka and Apache Flink form the backbone of modern stream processing systems, enabling organizations to build robust, scalable real-time data pipelines.
In this guide, we’ll explore event streaming with Kafka, stream processing with Flink, and how to build end-to-end real-time data systems.
Understanding Event Streaming
What is Event Streaming?
Event streaming is the practice of capturing data in real-time as a continuous stream of events. Unlike batch processing, which processes data in discrete chunks, event streaming handles data as it arrives.
┌─────────────────────────────────────────────────────────────────────┐
│ BATCH VS STREAM PROCESSING │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ BATCH PROCESSING STREAM PROCESSING │
│ ───────────────── ────────────────── │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Collect data │ │ Process events │ │
│ │ for period │ │ immediately │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Process at │ │ Continuous │ │
│ │ scheduled time │ │ processing │ │
│ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Results at │ │ Results in │ │
│ │ end of batch │ │ real-time │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ Examples: Examples: │
│ • Daily reports • Fraud detection │
│ • Monthly analytics • Real-time dashboards │
│ • ETL jobs • Alerting systems │
│ • Data warehouse load • Live personalization │
│ │
└─────────────────────────────────────────────────────────────────────┘
Core Streaming Terminology
1. Stream Processing
Processing continuous data flows in real-time rather than batch.
2. Event
Individual data point flowing through the pipeline (transaction, click, sensor reading).
3. Topic
Named stream of events in Kafka, similar to a queue.
4. Partition
Subdivision of a topic for parallel processing and scalability.
5. Consumer Group
Set of consumers that together process all partitions of a topic.
6. Windowing
Grouping events into time-based or count-based windows for aggregation.
7. Stateful Processing
Maintaining state across events (e.g., running totals, user sessions).
8. Exactly-Once Semantics
Guarantee that each event is processed exactly once, not skipped or duplicated.
9. Backpressure
Mechanism to slow down producers when consumers cannot keep up.
10. Checkpointing
Saving processing state to enable recovery from failures.
Apache Kafka: Event Streaming Platform
Core Concepts
Kafka is a distributed event streaming platform that can publish, subscribe to, store, and process streams of records.
┌─────────────────────────────────────────────────────────────────────┐
│ KAFKA ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ KAFKA CLUSTER │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Broker 1│ │ Broker 2│ │ Broker N│ │ │
│ │ │(Leader) │ │(Follower)│ │(Follower)│ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │ │
│ └─────────┼──────────────┼──────────────┼──────────────────────┘ │
│ │ │ │ │
│ ┌─────────┴──────────────┴──────────────┴──────────────────────┐ │
│ │ TOPICS │ │
│ │ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ │
│ │ │ Orders │ │ Clicks │ │ Logs │ │ │
│ │ │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐ │ │ ┌─┬─┬─┬─┬─┐ │ │ │
│ │ │ │0│1│2│3│4│5│ │ │ │0│1│2│3│4│ │ │ │0│1│2│3│4│ │ │ │
│ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘ │ │ └─┴─┴─┴─┴─┘ │ │ │
│ │ │ Partitions │ │ Partitions │ │ Partitions │ │ │
│ │ └───────────────┘ └───────────────┘ └───────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ PRODUCERS ──────────────────────► KAFKA ──────────────────────► │
│ CONSUMERS │
│ │
└─────────────────────────────────────────────────────────────────────┘
Key Terminology
- Topic: A category/feed name to which messages are published
- Partition: Topics are split into partitions for parallelism
- Producer: Publishes messages to topics
- Consumer: Subscribes to topics and processes messages
- Broker: Kafka server that stores messages
- Consumer Group: Group of consumers that share partitions
- Offset: Position of consumer in a partition
Kafka Cluster Docker Setup
# docker-compose.yml for local development
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Topic Management
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
class KafkaTopicManager:
"""Manage Kafka topics"""
def __init__(self, bootstrap_servers: list[str]):
self.admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='topic-manager'
)
def create_topic(self, topic_name: str,
num_partitions: int = 3,
replication_factor: int = 3) -> bool:
"""Create Kafka topic"""
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs={
'retention.ms': str(7 * 24 * 60 * 60 * 1000), # 7 days
'compression.type': 'snappy',
'min.insync.replicas': '2'
}
)
try:
self.admin_client.create_topics([topic], validate_only=False)
print(f"Topic {topic_name} created successfully")
return True
except TopicAlreadyExistsError:
print(f"Topic {topic_name} already exists")
return False
def list_topics(self) -> dict:
"""List all topics"""
return self.admin_client.list_topics()
def delete_topic(self, topic_name: str) -> bool:
"""Delete topic"""
try:
self.admin_client.delete_topics([topic_name])
print(f"Topic {topic_name} deleted")
return True
except Exception as e:
print(f"Error deleting topic: {e}")
return False
# Usage
manager = KafkaTopicManager(['localhost:9092'])
manager.create_topic('events', num_partitions=6, replication_factor=3)
manager.create_topic('transactions', num_partitions=12, replication_factor=3)
Kafka Producer Implementation
from kafka import KafkaProducer
import json
import time
class OrderProducer:
"""
Kafka producer for order events
"""
def __init__(self, bootstrap_servers: list):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=5,
compression_type='gzip'
)
self.topic = 'orders'
def send_order(self, order_data: dict):
"""
Send order event to Kafka
"""
# Create order event
event = {
'event_type': 'order_created',
'timestamp': time.time(),
'order_id': order_data['order_id'],
'customer_id': order_data['customer_id'],
'total_amount': order_data['total_amount'],
'items': order_data['items'],
'status': 'created'
}
# Send with order ID as key for partitioning
future = self.producer.send(
self.topic,
key=order_data['order_id'],
value=event
)
# Wait for confirmation
record_metadata = future.get(timeout=10)
return {
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
def send_batch(self, orders: list):
"""
Send batch of orders
"""
futures = []
for order in orders:
event = {
'event_type': 'order_created',
'timestamp': time.time(),
**order
}
future = self.producer.send(
self.topic,
key=order['order_id'],
value=event
)
futures.append(future)
# Wait for all to complete
for future in futures:
metadata = future.get(timeout=10)
print(f"Sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
def close(self):
self.producer.flush()
self.producer.close()
# Usage
producer = OrderProducer(['localhost:9092'])
order = {
'order_id': 'ORD-12345',
'customer_id': 'CUST-001',
'total_amount': 99.99,
'items': ['item1', 'item2']
}
result = producer.send_order(order)
print(f"Order sent: {result}")
High-Performance Kafka Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class ReliableKafkaProducer:
"""Production-grade Kafka producer"""
def __init__(self, bootstrap_servers: list[str]):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
acks='all',
retries=3,
max_in_flight_requests_per_connection=5,
compression_type='snappy',
batch_size=16384,
linger_ms=10,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
self.metrics = {
'sent': 0,
'failed': 0,
'latency_ms': []
}
def send_event(self, topic: str, event: dict,
key: Optional[str] = None,
callback: Optional[Callable] = None) -> bool:
"""Send event to Kafka"""
try:
future = self.producer.send(
topic,
value=event,
key=key,
timestamp_ms=int(time.time() * 1000)
)
future.add_callback(self._on_send_success)
future.add_errback(self._on_send_error)
self.metrics['sent'] += 1
return True
except Exception as e:
logger.error(f"Error sending event: {e}")
self.metrics['failed'] += 1
return False
def send_batch(self, topic: str, events: list[dict],
key_field: Optional[str] = None) -> int:
"""Send batch of events"""
sent_count = 0
for event in events:
key = event.get(key_field) if key_field else None
if self.send_event(topic, event, key):
sent_count += 1
return sent_count
def _on_send_success(self, record_metadata):
"""Callback on successful send"""
logger.debug(f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
def _on_send_error(self, exc):
"""Callback on send error"""
logger.error(f"Error sending message: {exc}")
self.metrics['failed'] += 1
def flush(self, timeout_ms: int = 30000):
"""Flush pending messages"""
self.producer.flush(timeout_ms)
def close(self):
"""Close producer"""
self.producer.close()
def get_metrics(self) -> dict:
"""Get producer metrics"""
return self.metrics
# Usage
producer = ReliableKafkaProducer(['localhost:9092'])
event = {
'user_id': 'user_123',
'action': 'purchase',
'amount': 99.99,
'timestamp': datetime.now().isoformat()
}
producer.send_event('events', event, key='user_123')
events = [
{'user_id': f'user_{i}', 'action': 'click', 'timestamp': datetime.now().isoformat()}
for i in range(1000)
]
producer.send_batch('events', events, key_field='user_id')
producer.flush()
producer.close()
Kafka Consumer Implementation
from kafka import KafkaConsumer
import json
class OrderConsumer:
"""
Kafka consumer for order events
"""
def __init__(self, bootstrap_servers: list, group_id: str):
self.consumer = KafkaConsumer(
'orders',
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
max_poll_records=100,
max_poll_interval_ms=300000
)
def consume_orders(self):
"""
Consume order events
"""
print("Starting to consume orders...")
for message in self.consumer:
print(f"\n--- New Message ---")
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Key: {message.key}")
print(f"Value: {message.value}")
# Process the order
self.process_order(message.value)
def process_order(self, order: dict):
"""
Process order event
"""
event_type = order.get('event_type')
if event_type == 'order_created':
print(f"Processing new order: {order['order_id']}")
# Add business logic here
elif event_type == 'order_updated':
print(f"Order updated: {order['order_id']}")
elif event_type == 'order_cancelled':
print(f"Order cancelled: {order['order_id']}")
def consume_with_partition_handling(self):
"""
Handle each partition separately
"""
# Get partition assignments
print(f"Assigned partitions: {self.consumer.assignment()}")
for message in self.consumer:
# Process message
self.process_order(message.value)
def close(self):
self.consumer.close()
# Usage
consumer = OrderConsumer(['localhost:9092'], 'order-processor-group')
consumer.consume_orders()
Apache Flink: Stream Processing
What is Apache Flink?
Apache Flink is a distributed stream processing framework that provides:
- Exactly-once processing: Strong guarantee for state consistency
- Event-time processing: Handle late-arriving data correctly
- Windowing: Aggregate data over tumbling, sliding, or session windows
- Stateful processing: Maintain state across events
- Fault tolerance: Checkpoints and savepoints
┌─────────────────────────────────────────────────────────────────────┐
│ FLINK ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ FLINK APPLICATION │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Source │──►│ Process │──►│ Sink │ │ │
│ │ │ (Kafka) │ │ (Operators)│ │ (Kafka/DB) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ STATE & TIME │ │ │
│ │ │ • Managed state (RocksDB) │ │ │
│ │ │ • Event time / Processing time │ │ │
│ │ │ • Watermarks │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────────────────┴─────────────────────────────────┐ │
│ │ FLINK CLUSTER │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ JobManager │ │ TaskManager │ │TaskManager │ │ │
│ │ │ (Leader) │ │ (Worker) │ │ (Worker) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Flink DataStream API
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, FilterFunction, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
class OrderMapFunction(MapFunction):
"""
Transform order events
"""
def map(self, value):
# Parse JSON
import json
order = json.loads(value)
# Transform
return {
'order_id': order['order_id'],
'customer_id': order['customer_id'],
'total_amount': float(order['total_amount']),
'timestamp': order['timestamp'],
'hour': int(order['timestamp'].split(':')[0])
}
class HighValueFilter(FilterFunction):
"""
Filter high-value orders
"""
def filter(self, value):
return value['total_amount'] > 100.0
class OrderAggregator(WindowFunction):
"""
Aggregate orders in a window
"""
def apply(self, key, window, values):
total = sum(v['total_amount'] for v in values)
count = len(values)
return [(key, window.end, count, total)]
def process_orders():
"""
Main Flink job for order processing
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Source: Read from Kafka
kafka_source = (
FlinkKafkaConsumer(
topics='orders',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-order-processor'
}
)
)
orders = env.add_source(kafka_source)
# Transform: Map to structured data
transformed = orders.map(OrderMapFunction())
# Filter: High-value orders only
high_value = transformed.filter(HighValueFilter())
# Window: Tumbling window (5 minutes)
windowed = (
high_value
.key_by(lambda x: x['customer_id'])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(OrderAggregator())
)
# Sink: Write to Kafka
kafka_sink = (
FlinkKafkaProducer(
topic='order-aggregates',
serialization_schema=JsonSerializationSchema(),
producer_config={
'bootstrap.servers': 'localhost:9092'
}
)
)
windowed.add_sink(kafka_sink)
# Execute
env.execute("Order Processing Job")
Flink SQL for Stream Processing
from pyflink.table import StreamTableEnvironment
def flink_sql_processing():
"""
Use Flink SQL for stream processing
"""
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Create source table (Kafka)
t_env.execute_sql("""
CREATE TEMPORARY VIEW orders (
order_id STRING,
customer_id STRING,
total_amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql-processor',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""")
# Create sink table (Kafka)
t_env.execute_sql("""
CREATE TEMPORARY VIEW order_aggregates (
window_start TIMESTAMP,
window_end TIMESTAMP,
customer_id STRING,
order_count BIGINT,
total_amount DECIMAL(15, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'order-aggregates',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Tumbling window aggregation
t_env.execute_sql("""
INSERT INTO order_aggregates
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS total_amount
FROM orders
GROUP BY
TUMBLE(order_time, INTERVAL '5' MINUTE),
customer_id
""")
# Session window aggregation
t_env.execute_sql("""
SELECT
SESSION_START(order_time, INTERVAL '10' MINUTE) AS session_start,
SESSION_END(order_time, INTERVAL '10' MINUTE) AS session_end,
customer_id,
COUNT(*) AS session_orders,
SUM(total_amount) AS session_total
FROM orders
GROUP BY
SESSION(order_time, INTERVAL '10' MINUTE),
customer_id
""")
# Running totals with OVER windows
t_env.execute_sql("""
SELECT
order_id,
customer_id,
total_amount,
order_time,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total
FROM orders
""" )
Spark Structured Streaming
Spark Streaming Job
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def create_spark_streaming_job():
"""Create Spark Structured Streaming job"""
spark = SparkSession.builder \
.appName("RealTimeAnalytics") \
.getOrCreate()
# Set log level
spark.sparkContext.setLogLevel("WARN")
# Define schema
schema = StructType([
StructField("user_id", StringType()),
StructField("action", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", TimestampType())
])
# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
parsed_df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Filter transactions
transactions = parsed_df.filter(col("action") == "transaction")
# Aggregate by user in 1-minute windows
aggregated = transactions \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "1 minute"),
col("user_id")
) \
.agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
min("amount").alias("min_amount"),
max("amount").alias("max_amount")
)
# Write to Kafka
query = aggregated \
.select(to_json(struct("*")).alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "aggregated-transactions") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start()
# Also write to console for debugging
console_query = aggregated \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.start()
# Wait for termination
spark.streams.awaitAnyTermination()
# Run job
if __name__ == '__main__':
create_spark_streaming_job()
Windowing Concepts
Types of Windows
┌─────────────────────────────────────────────────────────────────────┐
│ WINDOW TYPES │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ TUMBLING WINDOW SLIDING WINDOW │
│ ─────────────── ─────────────── │
│ [----|----|----|----] [--|----|----|----|--] │
│ 0-5 5-10 10-15 -2-3 3-8 8-13 │
│ │
│ Non-overlapping, Overlapping, │
│ fixed size fixed size │
│ │
│ SESSION WINDOW COUNT WINDOW │
│ ───────────── ──────────── │
│ [----] [--] [----] [1,2,3][2,3,4][3,4,5] │
│ │
│ Activity-based, Fixed number of events │
│ gap-based │
│ │
└─────────────────────────────────────────────────────────────────────┘
Handling Late Data with Watermarks
from pyflink.datastream import TimeCharacteristic
from pyflink.common.watermark_strategy import WatermarkStrategy
# Set event time characteristic
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# Define watermark strategy
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(10))
.with_timestamp_assigner(lambda event, timestamp: event['timestamp'])
)
# Apply watermark strategy
orders_with_watermarks = orders.assign_timestamps_and_watermarks(watermark_strategy)
# Now late events (more than 10 seconds late) will be handled correctly
# They will be emitted to a side output for special handling
late_data_side_output = OutputTag[Order]("late-data")
windowed = (
orders_with_watermarks
.key_by(lambda x: x['customer_id'])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowed_lateness(Duration.of_seconds(30)) # Allow late data
.side_output_late_data(late_data_side_output)
.process(WindowProcessFunction())
)
# Get late data
late_orders = windowed.get_side_output(late_data_side_output)
Exactly-Once Semantics
What is Exactly-Once?
Exactly-once processing guarantees that each event is processed exactly once, even in case of failures. This requires:
- Idempotent producers: Retries produce same result
- Transactional consumers: Offsets committed with processing
- Checkpointing: State is periodically saved
┌─────────────────────────────────────────────────────────────────────┐
│ EXACTLY-ONCE PROCESSING │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Checkpointing Flow: │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ FLINK CHECKPOINT │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Source 1 │ │Operator │ │ Sink │ │ │
│ │ │Offset=5 │──►│ State │──►│ Committed│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │ │ │ │
│ │ └─────────────┴─────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ Checkpoint │ │ │
│ │ │ Barrier │ │ │
│ │ └──────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ RocksDB │ │ │
│ │ │ (State) │ │ │
│ │ └──────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────┐ │ │
│ │ │ S3/ │ │ │
│ │ │ HDFS │ │ │
│ │ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ On Failure Recovery: │
│ 1. Restore state from checkpoint │
│ 2. Resume from saved offset │
│ 3. Replay events │
│ 4. Result: Same as if no failure occurred │
│ │
└─────────────────────────────────────────────────────────────────────┘
Enabling Checkpointing in Flink
from pyflink.datastream import CheckpointingMode
# Enable checkpointing
env.enable_checkpointing(interval=60000) # Every minute
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
env.get_checkpoint_config().set_checkpoint_timeout(600000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
# Set exactly-once mode
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
# Enable unaligned checkpoints for better performance
env.get_checkpoint_config().enable_unaligned_checkpoints()
# Configure RocksDB state backend
from pyflink.runtime.state.storage import RocksDBStateBackend
state_backend = RocksDBStateBackend("file:///tmp/flink/checkpoints")
env.set_state_backend(state_backend)
# Configure Kafka exactly-once
kafka_source = (
FlinkKafkaConsumer(
topics='orders',
deserialization_schema=JsonDeserializationSchema(),
properties={
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-processor',
'enable.auto.commit': 'false', # Flink manages offsets
'isolation.level': 'read_committed' # Only read committed
}
)
)
Building Complete Stream Pipeline
Architecture Overview
┌─────────────────────────────────────────────────────────────────────┐
│ COMPLETE STREAM PROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Source │ │ Ingest │ │ Process │ │ Sink │ │
│ │ Systems │ │ (Kafka) │ │ (Flink) │ │ Targets │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Mobile App ──► Orders Topic ──► Flink ──► Analytics │ │
│ │ │ │ │ │
│ │ │ ├──► Alerts │ │
│ │ │ │ │ │
│ │ │ ├──► Database │ │
│ │ │ │ │ │
│ │ │ └──► Kafka (out) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ Monitoring: Prometheus + Grafana │
│ Logging: ELK Stack │
│ Deployment: Kubernetes │
│ │
└─────────────────────────────────────────────────────────────────────┘
Complete Example
class OrderProcessingPipeline:
"""
Complete order processing pipeline with Kafka and Flink
"""
def __init__(self, config: dict):
self.config = config
self.env = self._create_environment()
def _create_environment(self):
env = StreamExecutionEnvironment.get_execution_environment()
# Checkpoint configuration
env.enable_checkpointing(60000)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
# State backend
env.set_state_backend(RocksDBStateBackend("file:///tmp/checkpoints"))
# Parallelism
env.set_parallelism(self.config.get('parallelism', 4))
return env
def build_pipeline(self):
# Source: Kafka
orders = self.env.add_source(
FlinkKafkaConsumer(
topics='orders',
deserialization_schema=JsonDeserializationSchema(),
properties={
'bootstrap.servers': self.config['kafka_bootstrap'],
'group.id': 'order-processor',
'enable.auto.commit': 'false'
}
)
)
# Parse and validate
validated = orders.map(OrderValidator())
# Filter invalid orders
valid_orders = validated.filter(ValidOrderFilter())
invalid_orders = validated.filter(InvalidOrderFilter())
# Process valid orders: enrich with customer data
enriched = valid_orders.map(CustomerEnricher(self.config['customer_api']))
# Window aggregation
hourly_metrics = (
enriched
.key_by(lambda x: x['customer_id'])
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.process(HourlyAggregation())
)
# Sink: Metrics to Kafka
hourly_metrics.add_sink(
FlinkKafkaProducer(
topic='order-metrics',
serialization_schema=JsonSerializationSchema(),
producer_config={
'bootstrap.servers': self.config['kafka_bootstrap'],
'transaction.timeout.ms': '900000',
'enable.idempotence': 'true'
}
)
)
# Sink: Valid orders to database
enriched.add_sink(
JdbcSink(
sql="INSERT INTO orders VALUES (?, ?, ?, ?, ?)",
parameter_mapping=lambda x: (
x['order_id'],
x['customer_id'],
x['total_amount'],
x['timestamp'],
x['status']
),
driver="org.postgresql.Driver",
url=self.config['db_url']
)
)
# Sink: Invalid orders to dead letter queue
invalid_orders.add_sink(
FlinkKafkaProducer(
topic='orders-dlq',
serialization_schema=JsonSerializationSchema(),
producer_config={
'bootstrap.servers': self.config['kafka_bootstrap']
}
)
)
# Return env for execution
return self.env
def run(self):
env = self.build_pipeline()
env.execute("Order Processing Pipeline")
Common Pitfalls
1. Not Handling Backpressure
# Anti-pattern: No backpressure handling
def bad_backpressure():
# High throughput source can overwhelm downstream
source = env.add_source(UnboundedSource())
result = source.map(ExpensiveOperation()) # Can cause OOM!
return result
# Good pattern: Configure backpressure
def good_backpressure():
# Set buffer timeout
env.set_buffer_timeout(100) # Flush buffers every 100ms
# Use keyed state (distributes load)
source = env.add_source(UnboundedSource())
result = source.key_by(lambda x: x['key']).map(ExpensiveOperation())
return result
2. Ignoring State Size
# Anti-pattern: Unbounded state
def bad_state():
# Storing all events in state
class BadProcessFunction(KeyedProcessFunction):
def process(self, value, ctx):
# Never clears state!
state = ctx.get_state(ValueStateDescriptor("all", String))
current = state.value() or ""
state.update(current + json.dumps(value))
return value
# Good pattern: State with TTL
def good_state():
# Use state with TTL
state_descriptor = ValueStateDescriptor(
"recent",
String,
ttl=Time.hours(1)
)
state_descriptor.set_early_state_cleanup(true, Time.hours(1))
Best Practices
1. Use Appropriate Serialization
# Prefer Kryo or custom serializers over Java serialization
from pyflink.common.serializers import KryoSerializer
# Configure
env.get_config().set_default_serialization_config(
KryoSerializer(),
MyCustomClass
)
2. Monitor and Alert
# Key metrics to monitor:
# - Checkpoint duration and size
# - Latency (end-to-end, processing)
# - Backpressure
# - State size
# - Kafka consumer lag
Performance Comparison
| Framework | Latency | Throughput | Complexity | Maturity |
|---|---|---|---|---|
| Kafka Streams | Low | High | Low | High |
| Flink | Very Low | Very High | Medium | High |
| Spark Streaming | Medium | Very High | Medium | High |
| Storm | Low | Medium | High | Medium |
External Resources
- Apache Kafka Documentation
- Apache Flink Documentation
- Kafka Streams Documentation
- Flink SQL Documentation
- Kafka Summit Talks
- Flink Forward Talks
Conclusion
Building real-time data pipelines with Kafka and Flink enables organizations to process data as it arrives, unlocking use cases from fraud detection to real-time personalization.
Key takeaways:
- Kafka provides durable, scalable event streaming
- Flink offers exactly-once processing with event-time semantics
- Use windowing to aggregate streaming data
- Implement checkpoints for fault tolerance
- Monitor lag, latency, and state size
- Handle late data with watermarks and allowed lateness
Together, Kafka and Flink form the foundation of modern stream processing architectures, enabling organizations to build responsive, data-driven applications.
Comments