Introduction
In today’s data-driven world, the ability to process data in real-time is critical for fraud detection, monitoring, personalization, and operational intelligence. Apache Kafka and Apache Flink form the backbone of modern stream processing systems, enabling organizations to build robust, scalable real-time data pipelines.
In this guide, we’ll explore event streaming with Kafka, stream processing with Flink, and how to build end-to-end real-time data systems.
Understanding Event Streaming
What is Event Streaming?
Event streaming is the practice of capturing data in real-time as a continuous stream of events. Unlike batch processing, which processes data in discrete chunks, event streaming handles data as it arrives.
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ BATCH VS STREAM PROCESSING โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ BATCH PROCESSING STREAM PROCESSING โ
โ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Collect data โ โ Process events โ โ
โ โ for period โ โ immediately โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Process at โ โ Continuous โ โ
โ โ scheduled time โ โ processing โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Results at โ โ Results in โ โ
โ โ end of batch โ โ real-time โ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Examples: Examples: โ
โ โข Daily reports โข Fraud detection โ
โ โข Monthly analytics โข Real-time dashboards โ
โ โข ETL jobs โข Alerting systems โ
โ โข Data warehouse load โข Live personalization โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Apache Kafka: Event Streaming Platform
Core Concepts
Kafka is a distributed event streaming platform that can publish, subscribe to, store, and process streams of records.
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ KAFKA ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ KAFKA CLUSTER โ โ
โ โ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โ Broker 1โ โ Broker 2โ โ Broker Nโ โ โ
โ โ โ(Leader) โ โ(Follower)โ โ(Follower)โ โ โ
โ โ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โโโโโโฌโโโโโ โ โ
โ โ โ โ โ โ โ
โ โโโโโโโโโโโผโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ TOPICS โ โ
โ โ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โ โ
โ โ โ Orders โ โ Clicks โ โ Logs โ โ โ
โ โ โ โโโฌโโฌโโฌโโฌโโ โ โ โโโฌโโฌโโฌโโฌโโ โ โ โโโฌโโฌโโฌโโฌโโ โ โ โ
โ โ โ โ0โ1โ2โ3โ4โ5โ โ โ โ0โ1โ2โ3โ4โ โ โ โ0โ1โ2โ3โ4โ โ โ โ
โ โ โ โโโดโโดโโดโโดโโ โ โ โโโดโโดโโดโโดโโ โ โ โโโดโโดโโดโโดโโ โ โ โ
โ โ โ Partitions โ โ Partitions โ โ Partitions โ โ โ
โ โ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ PRODUCERS โโโโโโโโโโโโโโโโโโโโโโโบ KAFKA โโโโโโโโโโโโโโโโโโโโโโโบ โ
โ CONSUMERS โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Terminology
- Topic: A category/feed name to which messages are published
- Partition: Topics are split into partitions for parallelism
- Producer: Publishes messages to topics
- Consumer: Subscribes to topics and processes messages
- Broker: Kafka server that stores messages
- Consumer Group: Group of consumers that share partitions
- Offset: Position of consumer in a partition
Kafka Producer Implementation
from kafka import KafkaProducer
import json
import time
class OrderProducer:
"""
Kafka producer for order events
"""
def __init__(self, bootstrap_servers: list):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=5,
compression_type='gzip'
)
self.topic = 'orders'
def send_order(self, order_data: dict):
"""
Send order event to Kafka
"""
# Create order event
event = {
'event_type': 'order_created',
'timestamp': time.time(),
'order_id': order_data['order_id'],
'customer_id': order_data['customer_id'],
'total_amount': order_data['total_amount'],
'items': order_data['items'],
'status': 'created'
}
# Send with order ID as key for partitioning
future = self.producer.send(
self.topic,
key=order_data['order_id'],
value=event
)
# Wait for confirmation
record_metadata = future.get(timeout=10)
return {
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
def send_batch(self, orders: list):
"""
Send batch of orders
"""
futures = []
for order in orders:
event = {
'event_type': 'order_created',
'timestamp': time.time(),
**order
}
future = self.producer.send(
self.topic,
key=order['order_id'],
value=event
)
futures.append(future)
# Wait for all to complete
for future in futures:
metadata = future.get(timeout=10)
print(f"Sent to {metadata.topic}:{metadata.partition}:{metadata.offset}")
def close(self):
self.producer.flush()
self.producer.close()
# Usage
producer = OrderProducer(['localhost:9092'])
order = {
'order_id': 'ORD-12345',
'customer_id': 'CUST-001',
'total_amount': 99.99,
'items': ['item1', 'item2']
}
result = producer.send_order(order)
print(f"Order sent: {result}")
Kafka Consumer Implementation
from kafka import KafkaConsumer
import json
class OrderConsumer:
"""
Kafka consumer for order events
"""
def __init__(self, bootstrap_servers: list, group_id: str):
self.consumer = KafkaConsumer(
'orders',
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=5000,
max_poll_records=100,
max_poll_interval_ms=300000
)
def consume_orders(self):
"""
Consume order events
"""
print("Starting to consume orders...")
for message in self.consumer:
print(f"\n--- New Message ---")
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Key: {message.key}")
print(f"Value: {message.value}")
# Process the order
self.process_order(message.value)
def process_order(self, order: dict):
"""
Process order event
"""
event_type = order.get('event_type')
if event_type == 'order_created':
print(f"Processing new order: {order['order_id']}")
# Add business logic here
elif event_type == 'order_updated':
print(f"Order updated: {order['order_id']}")
elif event_type == 'order_cancelled':
print(f"Order cancelled: {order['order_id']}")
def consume_with_partition_handling(self):
"""
Handle each partition separately
"""
# Get partition assignments
print(f"Assigned partitions: {self.consumer.assignment()}")
for message in self.consumer:
# Process message
self.process_order(message.value)
def close(self):
self.consumer.close()
# Usage
consumer = OrderConsumer(['localhost:9092'], 'order-processor-group')
consumer.consume_orders()
Apache Flink: Stream Processing
What is Apache Flink?
Apache Flink is a distributed stream processing framework that provides:
- Exactly-once processing: Strong guarantee for state consistency
- Event-time processing: Handle late-arriving data correctly
- Windowing: Aggregate data over tumbling, sliding, or session windows
- Stateful processing: Maintain state across events
- Fault tolerance: Checkpoints and savepoints
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FLINK ARCHITECTURE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ FLINK APPLICATION โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ โ
โ โ โ Source โโโโบโ Process โโโโบโ Sink โ โ โ
โ โ โ (Kafka) โ โ (Operators)โ โ (Kafka/DB) โ โ โ
โ โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ STATE & TIME โ โ โ
โ โ โ โข Managed state (RocksDB) โ โ โ
โ โ โ โข Event time / Processing time โ โ โ
โ โ โ โข Watermarks โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ FLINK CLUSTER โ โ
โ โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ โ
โ โ โ JobManager โ โ TaskManager โ โTaskManager โ โ โ
โ โ โ (Leader) โ โ (Worker) โ โ (Worker) โ โ โ
โ โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Flink DataStream API
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, FilterFunction, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
class OrderMapFunction(MapFunction):
"""
Transform order events
"""
def map(self, value):
# Parse JSON
import json
order = json.loads(value)
# Transform
return {
'order_id': order['order_id'],
'customer_id': order['customer_id'],
'total_amount': float(order['total_amount']),
'timestamp': order['timestamp'],
'hour': int(order['timestamp'].split(':')[0])
}
class HighValueFilter(FilterFunction):
"""
Filter high-value orders
"""
def filter(self, value):
return value['total_amount'] > 100.0
class OrderAggregator(WindowFunction):
"""
Aggregate orders in a window
"""
def apply(self, key, window, values):
total = sum(v['total_amount'] for v in values)
count = len(values)
return [(key, window.end, count, total)]
def process_orders():
"""
Main Flink job for order processing
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Source: Read from Kafka
kafka_source = (
FlinkKafkaConsumer(
topics='orders',
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-order-processor'
}
)
)
orders = env.add_source(kafka_source)
# Transform: Map to structured data
transformed = orders.map(OrderMapFunction())
# Filter: High-value orders only
high_value = transformed.filter(HighValueFilter())
# Window: Tumbling window (5 minutes)
windowed = (
high_value
.key_by(lambda x: x['customer_id'])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.apply(OrderAggregator())
)
# Sink: Write to Kafka
kafka_sink = (
FlinkKafkaProducer(
topic='order-aggregates',
serialization_schema=JsonSerializationSchema(),
producer_config={
'bootstrap.servers': 'localhost:9092'
}
)
)
windowed.add_sink(kafka_sink)
# Execute
env.execute("Order Processing Job")
Flink SQL for Stream Processing
from pyflink.table import StreamTableEnvironment
def flink_sql_processing():
"""
Use Flink SQL for stream processing
"""
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Create source table (Kafka)
t_env.execute_sql("""
CREATE TEMPORARY VIEW orders (
order_id STRING,
customer_id STRING,
total_amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql-processor',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""")
# Create sink table (Kafka)
t_env.execute_sql("""
CREATE TEMPORARY VIEW order_aggregates (
window_start TIMESTAMP,
window_end TIMESTAMP,
customer_id STRING,
order_count BIGINT,
total_amount DECIMAL(15, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'order-aggregates',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Tumbling window aggregation
t_env.execute_sql("""
INSERT INTO order_aggregates
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(order_time, INTERVAL '5' MINUTE) AS window_end,
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS total_amount
FROM orders
GROUP BY
TUMBLE(order_time, INTERVAL '5' MINUTE),
customer_id
""")
# Session window aggregation
t_env.execute_sql("""
SELECT
SESSION_START(order_time, INTERVAL '10' MINUTE) AS session_start,
SESSION_END(order_time, INTERVAL '10' MINUTE) AS session_end,
customer_id,
COUNT(*) AS session_orders,
SUM(total_amount) AS session_total
FROM orders
GROUP BY
SESSION(order_time, INTERVAL '10' MINUTE),
customer_id
""")
# Running totals with OVER windows
t_env.execute_sql("""
SELECT
order_id,
customer_id,
total_amount,
order_time,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total
FROM orders
""")
Windowing Concepts
Types of Windows
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ WINDOW TYPES โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ TUMBLING WINDOW SLIDING WINDOW โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ [----|----|----|----] [--|----|----|----|--] โ
โ 0-5 5-10 10-15 -2-3 3-8 8-13 โ
โ โ
โ Non-overlapping, Overlapping, โ
โ fixed size fixed size โ
โ โ
โ SESSION WINDOW COUNT WINDOW โ
โ โโโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ [----] [--] [----] [1,2,3][2,3,4][3,4,5] โ
โ โ
โ Activity-based, Fixed number of events โ
โ gap-based โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Handling Late Data with Watermarks
from pyflink.datastream import TimeCharacteristic
from pyflink.common.watermark_strategy import WatermarkStrategy
# Set event time characteristic
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# Define watermark strategy
watermark_strategy = (
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(10))
.with_timestamp_assigner(lambda event, timestamp: event['timestamp'])
)
# Apply watermark strategy
orders_with_watermarks = orders.assign_timestamps_and_watermarks(watermark_strategy)
# Now late events (more than 10 seconds late) will be handled correctly
# They will be emitted to a side output for special handling
late_data_side_output = OutputTag[Order]("late-data")
windowed = (
orders_with_watermarks
.key_by(lambda x: x['customer_id'])
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowed_lateness(Duration.of_seconds(30)) # Allow late data
.side_output_late_data(late_data_side_output)
.process(WindowProcessFunction())
)
# Get late data
late_orders = windowed.get_side_output(late_data_side_output)
Exactly-Once Semantics
What is Exactly-Once?
Exactly-once processing guarantees that each event is processed exactly once, even in case of failures. This requires:
- Idempotent producers: Retries produce same result
- Transactional consumers: Offsets committed with processing
- Checkpointing: State is periodically saved
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ EXACTLY-ONCE PROCESSING โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Checkpointing Flow: โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ FLINK CHECKPOINT โ โ
โ โ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โSource 1 โ โOperator โ โ Sink โ โ โ
โ โ โOffset=5 โโโโบโ State โโโโบโ Committedโ โ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โ โ โ โ โ โ
โ โ โโโโโโโโโโโโโโโดโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโ โ โ
โ โ โ Checkpoint โ โ โ
โ โ โ Barrier โ โ โ
โ โ โโโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโ โ โ
โ โ โ RocksDB โ โ โ
โ โ โ (State) โ โ โ
โ โ โโโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโ โ โ
โ โ โ S3/ โ โ โ
โ โ โ HDFS โ โ โ
โ โ โโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ On Failure Recovery: โ
โ 1. Restore state from checkpoint โ
โ 2. Resume from saved offset โ
โ 3. Replay events โ
โ 4. Result: Same as if no failure occurred โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Enabling Checkpointing in Flink
from pyflink.datastream import CheckpointingMode
# Enable checkpointing
env.enable_checkpointing(interval=60000) # Every minute
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
env.get_checkpoint_config().set_checkpoint_timeout(600000)
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
# Set exactly-once mode
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
# Enable unaligned checkpoints for better performance
env.get_checkpoint_config().enable_unaligned_checkpoints()
# Configure RocksDB state backend
from pyflink.runtime.state.storage import RocksDBStateBackend
state_backend = RocksDBStateBackend("file:///tmp/flink/checkpoints")
env.set_state_backend(state_backend)
# Configure Kafka exactly-once
kafka_source = (
FlinkKafkaConsumer(
topics='orders',
deserialization_schema=JsonDeserializationSchema(),
properties={
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-processor',
'enable.auto.commit': 'false', # Flink manages offsets
'isolation.level': 'read_committed' # Only read committed
}
)
)
Building Complete Stream Pipeline
Architecture Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ COMPLETE STREAM PROCESSING PIPELINE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ Source โ โ Ingest โ โ Process โ โ Sink โ โ
โ โ Systems โ โ (Kafka) โ โ (Flink) โ โ Targets โ โ
โ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โโโโโโฌโโโโโโ โ
โ โ โ โ โ โ
โ โผ โผ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ โ
โ โ Mobile App โโโบ Orders Topic โโโบ Flink โโโบ Analytics โ โ
โ โ โ โ โ โ
โ โ โ โโโโบ Alerts โ โ
โ โ โ โ โ โ
โ โ โ โโโโบ Database โ โ
โ โ โ โ โ โ
โ โ โ โโโโบ Kafka (out) โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ Monitoring: Prometheus + Grafana โ
โ Logging: ELK Stack โ
โ Deployment: Kubernetes โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Complete Example
class OrderProcessingPipeline:
"""
Complete order processing pipeline with Kafka and Flink
"""
def __init__(self, config: dict):
self.config = config
self.env = self._create_environment()
def _create_environment(self):
env = StreamExecutionEnvironment.get_execution_environment()
# Checkpoint configuration
env.enable_checkpointing(60000)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
# State backend
env.set_state_backend(RocksDBStateBackend("file:///tmp/checkpoints"))
# Parallelism
env.set_parallelism(self.config.get('parallelism', 4))
return env
def build_pipeline(self):
# Source: Kafka
orders = self.env.add_source(
FlinkKafkaConsumer(
topics='orders',
deserialization_schema=JsonDeserializationSchema(),
properties={
'bootstrap.servers': self.config['kafka_bootstrap'],
'group.id': 'order-processor',
'enable.auto.commit': 'false'
}
)
)
# Parse and validate
validated = orders.map(OrderValidator())
# Filter invalid orders
valid_orders = validated.filter(ValidOrderFilter())
invalid_orders = validated.filter(InvalidOrderFilter())
# Process valid orders: enrich with customer data
enriched = valid_orders.map(CustomerEnricher(self.config['customer_api']))
# Window aggregation
hourly_metrics = (
enriched
.key_by(lambda x: x['customer_id'])
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.process(HourlyAggregation())
)
# Sink: Metrics to Kafka
hourly_metrics.add_sink(
FlinkKafkaProducer(
topic='order-metrics',
serialization_schema=JsonSerializationSchema(),
producer_config={
'bootstrap.servers': self.config['kafka_bootstrap'],
'transaction.timeout.ms': '900000',
'enable.idempotence': 'true'
}
)
)
# Sink: Valid orders to database
enriched.add_sink(
JdbcSink(
sql="INSERT INTO orders VALUES (?, ?, ?, ?, ?)",
parameter_mapping=lambda x: (
x['order_id'],
x['customer_id'],
x['total_amount'],
x['timestamp'],
x['status']
),
driver="org.postgresql.Driver",
url=self.config['db_url']
)
)
# Sink: Invalid orders to dead letter queue
invalid_orders.add_sink(
FlinkKafkaProducer(
topic='orders-dlq',
serialization_schema=JsonSerializationSchema(),
producer_config={
'bootstrap.servers': self.config['kafka_bootstrap']
}
)
)
# Return env for execution
return self.env
def run(self):
env = self.build_pipeline()
env.execute("Order Processing Pipeline")
Common Pitfalls
1. Not Handling Backpressure
# Anti-pattern: No backpressure handling
def bad_backpressure():
# High throughput source can overwhelm downstream
source = env.add_source(UnboundedSource())
result = source.map(ExpensiveOperation()) # Can cause OOM!
return result
# Good pattern: Configure backpressure
def good_backpressure():
# Set buffer timeout
env.set_buffer_timeout(100) # Flush buffers every 100ms
# Use keyed state (distributes load)
source = env.add_source(UnboundedSource())
result = source.key_by(lambda x: x['key']).map(ExpensiveOperation())
return result
2. Ignoring State Size
# Anti-pattern: Unbounded state
def bad_state():
# Storing all events in state
class BadProcessFunction(KeyedProcessFunction):
def process(self, value, ctx):
# Never clears state!
state = ctx.get_state(ValueStateDescriptor("all", String))
current = state.value() or ""
state.update(current + json.dumps(value))
return value
# Good pattern: State with TTL
def good_state():
# Use state with TTL
state_descriptor = ValueStateDescriptor(
"recent",
String,
ttl=Time.hours(1)
)
state_descriptor.set_early_state_cleanup(true, Time.hours(1))
Best Practices
1. Use Appropriate Serialization
# Prefer Kryo or custom serializers over Java serialization
from pyflink.common.serializers import KryoSerializer
# Configure
env.get_config().set_default_serialization_config(
KryoSerializer(),
MyCustomClass
)
2. Monitor and Alert
# Key metrics to monitor:
# - Checkpoint duration and size
# - Latency (end-to-end, processing)
# - Backpressure
# - State size
# - Kafka consumer lag
External Resources
- Apache Kafka Documentation
- Apache Flink Documentation
- Kafka Streams Documentation
- Flink SQL Documentation
- Kafka Summit Talks
- Flink Forward Talks
Conclusion
Building real-time data pipelines with Kafka and Flink enables organizations to process data as it arrives, unlocking use cases from fraud detection to real-time personalization.
Key takeaways:
- Kafka provides durable, scalable event streaming
- Flink offers exactly-once processing with event-time semantics
- Use windowing to aggregate streaming data
- Implement checkpoints for fault tolerance
- Monitor lag, latency, and state size
- Handle late data with watermarks and allowed lateness
Together, Kafka and Flink form the foundation of modern stream processing architectures, enabling organizations to build responsive, data-driven applications.
Comments