Introduction
Real-time data pipelines are critical for modern data-driven applications. Processing data as it arrives enables real-time analytics, fraud detection, recommendations, and monitoring. This guide covers building production-grade real-time pipelines using Kafka for ingestion, Flink/Spark for processing, and best practices for scaling to millions of events per second.
Key Statistics:
- Real-time analytics improve decision-making by 40%
- Fraud detection latency reduced from hours to milliseconds
- Processing 1M+ events/second requires careful architecture
- 99.99% uptime required for critical pipelines
Core Concepts & Terminology
1. Stream Processing
Processing continuous data flows in real-time rather than batch.
2. Event
Individual data point flowing through the pipeline (transaction, click, sensor reading).
3. Topic
Named stream of events in Kafka, similar to a queue.
4. Partition
Subdivision of a topic for parallel processing and scalability.
5. Consumer Group
Set of consumers that together process all partitions of a topic.
6. Windowing
Grouping events into time-based or count-based windows for aggregation.
7. Stateful Processing
Maintaining state across events (e.g., running totals, user sessions).
8. Exactly-Once Semantics
Guarantee that each event is processed exactly once, not skipped or duplicated.
9. Backpressure
Mechanism to slow down producers when consumers can’t keep up.
10. Checkpointing
Saving processing state to enable recovery from failures.
Real-Time Pipeline Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Sources โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Applications โ โ IoT Devices โ โ Databases โ โ
โ โ (Events) โ โ (Sensors) โ โ (CDC) โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Message Broker (Kafka) โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Topic 1 โ โ Topic 2 โ โ Topic N โ โ
โ โ (Partitions) โ โ (Partitions) โ โ (Partitions) โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Stream Processing (Flink/Spark) โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โ โ Filtering โ โ Aggregation โ โ Enrichment โ โ
โ โ Transformationโ โ Windowing โ โ Joins โ โ
โ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโดโโโโโโโโโโโโโ
โ โ
โผ โผ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
โ Real-Time Store โ โ Data Warehouse โ
โ (Redis, DynamoDB)โ โ (S3, BigQuery) โ
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
Kafka Setup & Configuration
Kafka Cluster Configuration
# docker-compose.yml for local development
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_SEGMENT_BYTES: 1073741824
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
Topic Creation & Configuration
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
class KafkaTopicManager:
"""Manage Kafka topics"""
def __init__(self, bootstrap_servers: list[str]):
self.admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='topic-manager'
)
def create_topic(self, topic_name: str,
num_partitions: int = 3,
replication_factor: int = 3) -> bool:
"""Create Kafka topic"""
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
topic_configs={
'retention.ms': str(7 * 24 * 60 * 60 * 1000), # 7 days
'compression.type': 'snappy',
'min.insync.replicas': '2'
}
)
try:
self.admin_client.create_topics([topic], validate_only=False)
print(f"Topic {topic_name} created successfully")
return True
except TopicAlreadyExistsError:
print(f"Topic {topic_name} already exists")
return False
def list_topics(self) -> dict:
"""List all topics"""
return self.admin_client.list_topics()
def delete_topic(self, topic_name: str) -> bool:
"""Delete topic"""
try:
self.admin_client.delete_topics([topic_name])
print(f"Topic {topic_name} deleted")
return True
except Exception as e:
print(f"Error deleting topic: {e}")
return False
# Usage
manager = KafkaTopicManager(['localhost:9092'])
manager.create_topic('events', num_partitions=6, replication_factor=3)
manager.create_topic('transactions', num_partitions=12, replication_factor=3)
Kafka Producer Implementation
High-Performance Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import logging
from typing import Callable, Optional
logger = logging.getLogger(__name__)
class ReliableKafkaProducer:
"""Production-grade Kafka producer"""
def __init__(self, bootstrap_servers: list[str]):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
acks='all', # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=5,
compression_type='snappy',
batch_size=16384,
linger_ms=10,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
self.metrics = {
'sent': 0,
'failed': 0,
'latency_ms': []
}
def send_event(self, topic: str, event: dict,
key: Optional[str] = None,
callback: Optional[Callable] = None) -> bool:
"""Send event to Kafka"""
try:
future = self.producer.send(
topic,
value=event,
key=key,
timestamp_ms=int(time.time() * 1000)
)
# Add callback
future.add_callback(self._on_send_success)
future.add_errback(self._on_send_error)
self.metrics['sent'] += 1
return True
except Exception as e:
logger.error(f"Error sending event: {e}")
self.metrics['failed'] += 1
return False
def send_batch(self, topic: str, events: list[dict],
key_field: Optional[str] = None) -> int:
"""Send batch of events"""
sent_count = 0
for event in events:
key = event.get(key_field) if key_field else None
if self.send_event(topic, event, key):
sent_count += 1
return sent_count
def _on_send_success(self, record_metadata):
"""Callback on successful send"""
logger.debug(f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
def _on_send_error(self, exc):
"""Callback on send error"""
logger.error(f"Error sending message: {exc}")
self.metrics['failed'] += 1
def flush(self, timeout_ms: int = 30000):
"""Flush pending messages"""
self.producer.flush(timeout_ms)
def close(self):
"""Close producer"""
self.producer.close()
def get_metrics(self) -> dict:
"""Get producer metrics"""
return self.metrics
# Usage
producer = ReliableKafkaProducer(['localhost:9092'])
# Send single event
event = {
'user_id': 'user_123',
'action': 'purchase',
'amount': 99.99,
'timestamp': datetime.now().isoformat()
}
producer.send_event('events', event, key='user_123')
# Send batch
events = [
{'user_id': f'user_{i}', 'action': 'click', 'timestamp': datetime.now().isoformat()}
for i in range(1000)
]
producer.send_batch('events', events, key_field='user_id')
producer.flush()
producer.close()
Apache Flink Stream Processing
Flink Job Implementation
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction, FilterFunction, AggregateFunction
from pyflink.datastream.windowing.window import TimeWindow
from pyflink.datastream.windowing.time_window import TimeWindowAll
import json
from datetime import datetime
class EventParser(MapFunction):
"""Parse JSON events"""
def map(self, value):
try:
event = json.loads(value)
event['parsed_time'] = datetime.now().timestamp()
return event
except Exception as e:
print(f"Error parsing event: {e}")
return None
class TransactionFilter(FilterFunction):
"""Filter for transaction events"""
def filter(self, event):
return event.get('type') == 'transaction' and event.get('amount', 0) > 0
class TransactionAggregator(AggregateFunction):
"""Aggregate transactions by user"""
def create_accumulator(self):
return {
'count': 0,
'total_amount': 0.0,
'min_amount': float('inf'),
'max_amount': 0.0
}
def add(self, value, accumulator):
accumulator['count'] += 1
accumulator['total_amount'] += value.get('amount', 0)
accumulator['min_amount'] = min(accumulator['min_amount'], value.get('amount', 0))
accumulator['max_amount'] = max(accumulator['max_amount'], value.get('amount', 0))
return accumulator
def get_result(self, accumulator):
return {
'count': accumulator['count'],
'total': accumulator['total_amount'],
'average': accumulator['total_amount'] / accumulator['count'] if accumulator['count'] > 0 else 0,
'min': accumulator['min_amount'],
'max': accumulator['max_amount']
}
def merge(self, a, b):
return {
'count': a['count'] + b['count'],
'total_amount': a['total_amount'] + b['total_amount'],
'min_amount': min(a['min_amount'], b['min_amount']),
'max_amount': max(a['max_amount'], b['max_amount'])
}
def create_flink_job():
"""Create Flink streaming job"""
env = StreamExecutionEnvironment.get_execution_environment()
# Set parallelism
env.set_parallelism(4)
# Enable checkpointing
env.enable_checkpointing(60000) # 60 seconds
env.get_checkpoint_config().set_checkpointing_mode('EXACTLY_ONCE')
# Read from Kafka
kafka_source = env.add_source(
FlinkKafkaConsumer(
topics=['events'],
deserialization_schema=SimpleStringSchema(),
properties={
'bootstrap.servers': 'localhost:9092',
'group.id': 'flink-group',
'auto.offset.reset': 'earliest'
}
)
)
# Process stream
result = (kafka_source
.map(EventParser())
.filter(TransactionFilter())
.key_by(lambda x: x.get('user_id'))
.window(TimeWindow.of(60000)) # 60 second window
.aggregate(TransactionAggregator())
.map(lambda x: json.dumps(x))
)
# Write to Kafka
result.add_sink(
FlinkKafkaProducer(
topic='aggregated-transactions',
serialization_schema=SimpleStringSchema(),
producer_config={
'bootstrap.servers': 'localhost:9092'
}
)
)
# Execute
env.execute("Transaction Aggregation Job")
# Run job
if __name__ == '__main__':
create_flink_job()
Spark Structured Streaming
Spark Streaming Job
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def create_spark_streaming_job():
"""Create Spark Structured Streaming job"""
spark = SparkSession.builder \
.appName("RealTimeAnalytics") \
.getOrCreate()
# Set log level
spark.sparkContext.setLogLevel("WARN")
# Define schema
schema = StructType([
StructField("user_id", StringType()),
StructField("action", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", TimestampType())
])
# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
parsed_df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Filter transactions
transactions = parsed_df.filter(col("action") == "transaction")
# Aggregate by user in 1-minute windows
aggregated = transactions \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "1 minute"),
col("user_id")
) \
.agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
min("amount").alias("min_amount"),
max("amount").alias("max_amount")
)
# Write to Kafka
query = aggregated \
.select(to_json(struct("*")).alias("value")) \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "aggregated-transactions") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start()
# Also write to console for debugging
console_query = aggregated \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.start()
# Wait for termination
spark.streams.awaitAnyTermination()
# Run job
if __name__ == '__main__':
create_spark_streaming_job()
Best Practices
- Exactly-Once Semantics: Ensure no data loss or duplication
- Checkpointing: Save state regularly for recovery
- Backpressure Handling: Slow down producers when needed
- Monitoring: Track latency, throughput, and errors
- Scaling: Partition topics and scale consumers
- Error Handling: Implement dead letter queues
- Testing: Test with realistic data volumes
- Documentation: Document data schemas and pipelines
- Security: Encrypt data in transit and at rest
- Cost Optimization: Monitor resource usage
Common Pitfalls
- Data Loss: Not using acks=‘all’ in producer
- Duplicates: Not handling retries properly
- Latency: Not tuning batch sizes and windows
- Scaling Issues: Not partitioning topics enough
- State Management: Losing state on failures
- Monitoring Gaps: No visibility into pipeline health
- Schema Evolution: Not handling schema changes
- Resource Leaks: Not closing connections properly
- Unordered Processing: Assuming order across partitions
- No Backpressure: Overwhelming downstream systems
Performance Comparison Table
| Framework | Latency | Throughput | Complexity | Maturity |
|---|---|---|---|---|
| Kafka Streams | Low | High | Low | High |
| Flink | Very Low | Very High | Medium | High |
| Spark Streaming | Medium | Very High | Medium | High |
| Storm | Low | Medium | High | Medium |
External Resources
- Apache Kafka Documentation
- Apache Flink Guide
- Spark Structured Streaming
- Kafka Best Practices
- Real-Time Data Pipelines Book
Conclusion
Building real-time data pipelines requires careful architecture, proper tooling, and operational discipline. By using Kafka for reliable ingestion, Flink or Spark for processing, and implementing best practices for monitoring and scaling, you can build pipelines that process millions of events per second with sub-second latency. The key is starting simple and gradually adding complexity as requirements grow.
Next Steps:
- Set up Kafka cluster
- Implement producer and consumer
- Build first streaming job
- Add monitoring and alerting
- Scale and optimize
Comments