Skip to main content

Streaming Architecture: Building Real-Time Data Pipelines with Kafka and Flink

Created: March 16, 2026 Larry Qu 16 min read
Table of Contents

Introduction

Modern applications require processing data in real-time rather than in batch jobs. From fraud detection to personalized recommendations, organizations need immediate insights from their data streams. Streaming architecture, powered by Apache Kafka and Apache Flink, enables building robust real-time data pipelines that process millions of events per second with fault tolerance and exactly-once semantics.

As of 2026, both platforms have reached major milestones. Kafka 4.x runs KRaft natively without ZooKeeper and introduces share groups for queue-style workloads. Flink 2.x brings native ML_PREDICT for LLM inference and VECTOR_SEARCH for real-time vector similarity search directly in SQL. Together, they form the foundation of modern data streaming platforms that span operational workloads, analytics, automation, and AI.

This article explores streaming architecture patterns, the latest capabilities of Kafka and Flink, implementation strategies, and best practices for building production-ready real-time systems in 2026.

Understanding Streaming Architecture

Batch vs. Stream Processing

Traditional batch processing collects data over time and processes it in scheduled jobs. While simple, this approach introduces latency — data collected today might not be available until tomorrow.

Stream processing handles data as it arrives:

  • Event-by-event processing: Each event is processed individually as it arrives
  • Windowing: Events are grouped into time windows for aggregations
  • Continuous computation: Pipelines run 24/7 without scheduled jobs
  • Low latency: Results are available in milliseconds to seconds

Event-Driven Architecture vs. Stream Processing

These two patterns are complementary but distinct:

Event-driven architecture treats every data change as an immutable event stored in a distributed log. Kafka is the most common implementation. Producers write events to topics, and multiple consumers can read from those topics independently. This pattern excels at decoupling producers from consumers and supporting multiple downstream use cases from a single data stream.

Stream processing architecture adds a computation layer on top of event streaming. Frameworks like Flink read from the event log, apply stateful transformations, and write results to new topics or external systems. This pattern is necessary when you need windowed aggregations, complex event processing, or joins across multiple streams.

Most production systems combine both patterns: Kafka for durable event storage and distribution, paired with Flink for stateful processing.

Kappa Architecture

Kappa Architecture processes both batch and real-time data through a single stream processing pipeline. It removes the need for separate batch and speed layers that define Lambda Architecture. Kafka provides the durable replayable log, and Flink handles all computations — batch workloads are just stream processing on bounded datasets. In 2026, Kappa has become the dominant pattern for new streaming systems because it simplifies operations and eliminates the cost of maintaining two code paths.

Core Components

A streaming architecture typically includes:

  1. Event Sources: Databases (via CDC), IoT devices, application logs, user interactions, webhooks
  2. Message Broker: Kafka serves as the backbone for event transport with KRaft consensus
  3. Stream Processing Engine: Flink processes and transforms streams with stateful operations
  4. State Storage: RocksDB, Flink state backends, embedded key-value stores
  5. Sink Systems: Data warehouses, databases, lakehouses, notification systems, feature stores
  6. Schema Registry: Manages schema evolution for event formats (Avro, Protobuf, JSON Schema)

Apache Kafka: The Event Backbone

Apache Kafka is a distributed event streaming platform that serves as the central nervous system for real-time data architectures. The 4.x line, starting with Kafka 4.0 released in March 2025, represents the most significant architectural shift in Kafka’s history.

KRaft Mode: ZooKeeper Removed

Kafka 4.0 makes KRaft the default consensus mechanism, fully removing the ZooKeeper dependency. KRaft (KIP-500) uses Kafka’s own Raft-based quorum controller, eliminating the operational burden of managing a separate ZooKeeper ensemble:

  • Single process deployment: Run one JVM instead of Kafka + ZooKeeper
  • Simpler operations: Fewer components to monitor, configure, and upgrade
  • Scalable metadata: Controllers can handle more partitions and topics
  • Faster failover: Controller election completes in seconds instead of minutes

Kafka 4.2 introduced KIP-853 for dynamic voter management, so controllers can be added or removed from the voter set without cluster downtime. KIP-1312 (in 4.3) adds support for unregistering controllers, preventing stale metadata registrations from blocking cluster upgrades.

Core Concepts

Topics and Partitions

  • Topics are named streams of events
  • Partitions provide parallelism and fault tolerance
  • Each partition is an ordered, immutable sequence of events
  • Kafka 4.x supports millions of partitions in a single cluster with KRaft

Producers and Consumers

  • Producers publish events to topics with configurable acknowledgment guarantees
  • Consumers subscribe to topics and process events within consumer groups
  • Consumer groups enable parallel processing with coordinated offset management

Replication and Fault Tolerance

  • Events are replicated across brokers with configurable replication factor
  • ISR (In-Sync Replicas) ensure durability guarantees
  • Automatic failover through KRaft controller-managed partition leadership

Next-Generation Consumer Rebalance Protocol

KIP-848, generally available since Kafka 4.0, fundamentally changed how consumer groups rebalance. In the classic protocol, a “stop-the-world” rebalance pauses all consumers while partitions are reassigned. The new protocol shifts coordination to the broker side:

// Enable the new protocol (required in Kafka 4.0+)
Properties props = new Properties();
props.put("group.protocol", "consumer");
props.put("group.id", "order-processor");

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);

Benefits include:

  • Incremental rebalancing: Only the affected consumers pause, not the entire group
  • Faster assignments: Broker-driven coordination reduces rebalance time from minutes to seconds
  • Stable under load: No cascading rebalances during high-throughput periods
  • Predictable behavior: Consistent assignment strategy across all clients

Share Groups (Kafka Queues)

KIP-932 introduces share groups, a new consumption model that decouples consumer parallelism from partition count. Production-ready since Kafka 4.2 (February 2026), share groups enable queue-style message processing:

# Create a topic for queue-style consumption
kafka-topics.sh --create \
  --topic order-notifications \
  --partitions 6 \
  --replication-factor 3 \
  --bootstrap-server localhost:9092
// Share consumer — multiple consumers can process from same partitions
Properties props = new Properties();
props.put("group.id", "notification-workers");
props.put("group.protocol", "consumer");
props.put("share.acquire.mode", "record_limit");

KafkaShareConsumer<String, Notification> consumer =
    new KafkaShareConsumer<>(props);
consumer.subscribe(Collections.singleton("order-notifications"));

while (true) {
    ShareRecords<String, Notification> records = consumer.poll(Duration.ofMillis(100));
    for (ShareRecord<String, Notification> record : records) {
        try {
            processNotification(record.value());
            consumer.acknowledge(record);
        } catch (Exception e) {
            consumer.acknowledge(record, AcknowledgeType.REJECT);
        }
    }
}

Share groups offer per-record acknowledgment and delivery counting — records that fail can be retried individually without blocking the partition offset. The RENEW acknowledgement type (KIP-1222) extends the message lock timer for long-running tasks. Strict fetch enforcement (KIP-1206) forces the broker to honor max.poll.records exactly, preventing memory spikes.

Kafka Streams with Dead Letter Queues

Kafka 4.2 introduced native dead letter queue support for Kafka Streams (KIP-1034). Records that fail processing are routed to a DLQ topic instead of causing the application to crash or stall:

Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
    DeadLetterQueueExceptionHandler.class.getName());
props.put(StreamsConfig.DLQ_TOPIC_NAME_CONFIG, "orders-dlq");

KafkaStreams streams = new KafkaStreams(topology, props);

KIP-1295 extends DLQ support to GlobalKTable processing, enabling poison-pill record handling in lookup tables as well.

Performance Characteristics

Metric Value
Throughput Millions of events/second per cluster
Latency < 5ms end-to-end (p99)
Durability Configurable replication (3x default)
Scalability Horizontal partition scaling across 100+ brokers
Partition count Millions with KRaft (up from thousands with ZooKeeper)
Java requirement Java 17 for brokers, Java 11+ for clients (Kafka 4.0+)

Apache Flink is a distributed stream processing framework that provides sophisticated event-time processing, windowing, and exactly-once guarantees. The 2.x line — Flink 2.0 (March 2025), 2.1 (July 2025), and 2.2 (December 2025) — brings AI-native capabilities alongside core streaming improvements.

Core Capabilities

Event Time Processing

  • Processes events based on occurrence time, not arrival time
  • Watermarks handle late-arriving events with configurable allowed lateness
  • Window functions operate on event time for correct temporal aggregations

Windowing

  • Tumbling windows: Fixed-size, non-overlapping (e.g., every 5 minutes)
  • Sliding windows: Fixed-size, overlapping (e.g., 10-minute window sliding every 1 minute)
  • Session windows: Dynamic windows based on activity gaps between events
  • Cumulative windows: Growing windows that accumulate data over time

Exactly-Once Semantics

  • Checkpointing ensures exactly-once processing across failures
  • Two-phase commit for transactional sinks (Kafka, JDBC, etc.)
  • Stateful operators maintain processing state with configurable backends

ML_PREDICT for LLM Inference

Flink 2.1 introduced the ML_PREDICT function for LLM inference in SQL. Flink 2.2 extends this to the Table API, enabling model inference directly in data processing pipelines:

// Create a model for LLM inference
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.createModel("classifier", ModelDescriptor
    .forProvider("openai")
    .inputSchema(Schema.newBuilder()
        .column("input", DataTypes.STRING()).build())
    .outputSchema(Schema.newBuilder()
        .column("output", DataTypes.STRING()).build())
    .option("endpoint", "https://api.openai.com/v1/chat/completions")
    .option("model", "gpt-4.1")
    .option("system-prompt",
        "Classify this customer message as: complaint, question, or feedback")
    .option("api-key", "${OPENAI_API_KEY}")
    .build());

Table input = tEnv.from("customer_messages");
Table classified = tEnv.fromModel("classifier")
    .predict(input, ColumnList.of("message_text"));
-- SQL equivalent for the same classification pipeline
CREATE MODEL message_classifier
  PROVIDER 'openai'
  INPUT (input STRING)
  OUTPUT (output STRING)
  WITH (
    'endpoint' = 'https://api.openai.com/v1/chat/completions',
    'model' = 'gpt-4.1',
    'system-prompt' = 'Classify this message: complaint, question, or feedback',
    'api-key' = '${OPENAI_API_KEY}'
  );

SELECT
  message_id,
  ML_PREDICT(message_classifier, message_text) AS category
FROM customer_messages;

Flink 2.2 introduces VECTOR_SEARCH for streaming vector similarity search directly within Flink SQL. This enables real-time Retrieval-Augmented Generation (RAG) without external vector databases:

-- Real-time vector similarity search
CREATE TABLE doc_embeddings (
  doc_id INT,
  embedding ARRAY<FLOAT>,
  content STRING,
  INDEX idx_embedding USING VECTOR (embedding)
);

SELECT
  query.query_id,
  doc.content,
  VECTOR_DISTANCE(query.vector, doc.embedding, 'COSINE') AS similarity
FROM query_stream AS query,
LATERAL TABLE(VECTOR_SEARCH(
  TABLE doc_embeddings,
  query.vector,
  DESCRIPTOR(idx_embedding),
  5  -- top-k results
));

Materialized Tables

Materialized Tables simplify stream and batch pipelines by letting the engine derive schemas and refresh pipelines automatically:

CREATE MATERIALIZED TABLE daily_order_summary
PARTITIONED BY (order_date)
DISTRIBUTED INTO 64 BUCKETS
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT
  order_date,
  customer_tier,
  COUNT(*) AS order_count,
  SUM(total_amount) AS revenue
FROM orders
GROUP BY order_date, customer_tier;

Flink 2.2 extends Materialized Tables with an enricher interface for custom default logic, DISTRIBUTED BY/INTO for bucketing, and SHOW MATERIALIZED TABLES for discovery.

Delta Joins

Delta joins replace the large state of regular joins with bidirectional lookup-based joins. In Flink 2.2, delta joins support CDC sources without DELETE operations, allow projection and filter operations after the source, and include caching to reduce external storage requests:

// Delta join: source tables from Apache Fluss (Incubating)
Table orders = tEnv.from("fluss_orders");
Table customers = tEnv.from("fluss_customers");

Table enriched = orders.join(customers)
    .where($("orders.customer_id").isEqual($("customers.id")))
    .select($("orders.order_id"), $("customers.name"),
            $("orders.total_amount"));

Balanced Tasks Scheduling

Flink 2.2 introduces balanced tasks scheduling (FLIP-370) to distribute tasks evenly across TaskManagers, reducing job bottlenecks:

# flink-conf.yaml
jobmanager.scheduler: adaptive
jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers: true

The scheduler actively minimizes the number of active TaskManagers during downscaling by maximizing utilization before freeing resources.

DataStream API

DataStream<String> input = env.addSource(new FlinkKafkaConsumer<>(...));
DataStream<Order> parsed = input
    .map(json -> parseOrder(json))
    .keyBy(Order::getCustomerId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("totalAmount");

Table API / SQL

SELECT
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
    customer_id,
    SUM(total_amount) AS total,
    COUNT(*) AS order_count
FROM orders
GROUP BY
    TUMBLE(event_time, INTERVAL '5' MINUTE),
    customer_id;

Building Real-Time Pipelines

Pattern 1: Event Aggregation

Continuously aggregate metrics across time windows:

DataStream<Metric> metrics = env.addSource(metricsSource);
metrics
    .keyBy(Metric::getSensorId)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    .reduce((a, b) -> {
        a.setValue(a.getValue() + b.getValue());
        return a;
    })
    .addSink(new MetricsSink());

Pattern 2: Pattern Detection with CEP

Detect sequences of events matching patterns:

Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("start")
    .where(evt -> evt.getAmount() > 10000)
    .next("suspicious")
    .where(evt -> evt.getLocation() != null &&
                  !evt.getLocation().equals(evt.getPreviousLocation()))
    .within(Time.minutes(1));

DataStream<Alert> alerts = CEP.pattern(transactions, pattern)
    .select(evt -> new Alert("Possible fraud: " + evt.getCustomerId()));

Pattern 3: Stream-to-Stream Joins

Join multiple streams within sliding windows:

DataStream<Order> orders = env.addSource(orderSource);
DataStream<Inventory> inventory = env.addSource(inventorySource);

DataStream<OrderConfirmation> confirmed = orders
    .join(inventory)
    .where(Order::getProductId)
    .equalTo(Inventory::getProductId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .filter((o, i) -> o.getQuantity() <= i.getAvailable())
    .map((o, i) -> new OrderConfirmation(o, i));

Pattern 4: Real-Time ETL with CDC

Stream database changes from MySQL via Debezium into a data lake:

-- Kafka source with CDC events (Debezium format)
CREATE TABLE customer_cdc (
    customer_id INT,
    name STRING,
    email STRING,
    operation STRING,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'cdc.customers',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'debezium-json'
);

-- Sink to Delta Lake
CREATE TABLE customer_warehouse (
    customer_id INT,
    name STRING,
    email STRING,
    last_updated TIMESTAMP(3),
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector' = 'delta',
    'table-path' = 's3://data-lake/customers'
);

-- Continuous sync pipeline
INSERT INTO customer_warehouse
SELECT
    customer_id,
    COALESCE(name, name_old),
    COALESCE(email, email_old),
    ts AS last_updated
FROM customer_cdc;

Pattern 5: Streaming Lakehouse with Iceberg

Stream data into Apache Iceberg tables for analytics and BI:

DataStream<Transaction> txns = env.addSource(kafkaSource);

// Sink as Iceberg table with Flink's Iceberg connector
StreamTableEnvironment sEnv = StreamTableEnvironment.create(env);

sEnv.executeSql("CREATE TABLE iceberg_txns (" +
    "  txn_id BIGINT, user_id INT, amount DOUBLE, " +
    "  txn_ts TIMESTAMP(3), dt STRING" +
    ") PARTITIONED BY (dt) WITH (" +
    "  'connector'='iceberg'," +
    "  'catalog-type'='hadoop'," +
    "  'catalog-name'='txn_catalog'," +
    "  'warehouse'='s3://warehouse/txns'" +
    ")");

sEnv.fromDataStream(txns)
    .executeInsert("iceberg_txns");

Pattern 6: Queue Processing with Kafka Share Groups

Use Kafka share groups for work-queue workloads where tasks are processed independently:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "image-processing-pool");
props.put("share.acquire.mode", "record_limit");

KafkaShareConsumer<String, ImageTask> consumer =
    new KafkaShareConsumer<>(props);
consumer.subscribe(Collections.singleton("image-tasks"));

// Scale workers independently of topic partition count
for (int i = 0; i < desiredWorkers; i++) {
    executor.submit(() -> {
        while (true) {
            ShareRecords<String, ImageTask> records =
                consumer.poll(Duration.ofMillis(200));
            for (ShareRecord<String, ImageTask> record : records) {
                try {
                    ImageResult result = resizeImage(record.value());
                    resultTopic.send(result);
                    consumer.acknowledge(record);
                } catch (RetryableException e) {
                    consumer.acknowledge(record, AcknowledgeType.RENEW);
                } catch (FatalException e) {
                    consumer.acknowledge(record, AcknowledgeType.REJECT);
                }
            }
        }
    });
}

Streaming with AI/ML Integration

Feature Store Integration

Stream processing computes ML features in real-time:

DataStream<UserEvent> events = env.addSource(userEventSource);
events
    .keyBy(UserEvent::getUserId)
    .flatMap(new FeatureExtractor(featureStore))
    .addSink(featureStoreSink);

Real-Time Inference with ML_PREDICT

Use Flink 2.2’s ML_PREDICT for live LLM inference in a streaming pipeline:

CREATE MODEL sentiment_analyzer
  PROVIDER 'openai'
  INPUT (input STRING)
  OUTPUT (output STRING)
  WITH (
    'endpoint' = 'https://api.openai.com/v1/chat/completions',
    'model' = 'gpt-4.1-mini',
    'system-prompt' = 'Analyze sentiment: positive, negative, or neutral',
    'api-key' = '${OPENAI_API_KEY}'
  );

-- Stream through reviews, classify sentiment in real-time
SELECT
  review_id,
  product_id,
  review_text,
  ML_PREDICT(sentiment_analyzer, review_text) AS sentiment
FROM product_reviews
WHERE ML_PREDICT(sentiment_analyzer, review_text) = 'negative';

Vector Search for Real-Time RAG

Build a real-time RAG pipeline that retrieves context from streaming documents:

-- Ingest document chunks with embeddings
CREATE TABLE doc_stream (
  chunk_id INT,
  content STRING,
  embedding ARRAY<FLOAT>,
  created_at TIMESTAMP(3)
) WITH ('connector' = 'kafka', 'topic' = 'doc-chunks');

-- Query stream with embedded questions
CREATE TABLE query_stream (
  query_id INT,
  question STRING,
  embedding ARRAY<FLOAT>
) WITH ('connector' = 'kafka', 'topic' = 'queries');

-- Retrieve top-3 relevant chunks for each query
SELECT
  q.query_id,
  q.question,
  d.content AS retrieved_context,
  VECTOR_DISTANCE(q.embedding, d.embedding, 'COSINE') AS score
FROM query_stream AS q,
LATERAL TABLE(VECTOR_SEARCH(
  TABLE doc_stream, q.embedding,
  DESCRIPTOR(idx_embedding), 3
)) AS d
WHERE score > 0.75;

The Flink Agents project (preview since v0.1.0, October 2025) enables running AI agents at scale on Flink:

  • Long-running, system-triggered AI agents integrated with streaming pipelines
  • LLM tool calling and integration with external APIs
  • Enterprise-grade Agentic AI deployment with checkpointing and fault tolerance
  • Version 0.2.1 (March 2026) adds bug fixes and vulnerability patches

Flink ML 2.2.0 (December 2025) expanded feature engineering algorithms from 6 to 33, covering the majority of Spark ML’s feature transformers. It also introduced ModelServable for online inference without Flink runtime dependency.

Handling Stateful Streaming

State Backends

Flink manages state using pluggable backends:

Backend Use Case Characteristics
HashMap Small state, maximum performance In-memory, fast access, limited by heap
RocksDB Large state, disk-backed Spills to disk, handles GB-TB state sizes
EmbeddedRocksDB Stateful Flink on Kubernetes Same as RocksDB but embedded in container

Checkpointing and Recovery

Enable fault tolerance with periodic checkpoints:

StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every 60 seconds
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(300000);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

// Use aligned checkpoints for most workloads
env.getCheckpointConfig()
    .setAlignedCheckpointTimeout(Duration.ofSeconds(30));

Flink 2.2 adds a new configuration option for unaligned checkpoint recovery (FLINK-38541), giving operators finer control over recovery behavior under failure. Session-mode HA recovery introduces new Application stores (FLINK-38975) for running and terminated applications.

Savepoints

Create savepoints for deployment and migration:

# Manual savepoint for planned upgrades
flink savepoint <job-id> <target-directory>

# Stop job with savepoint
flink stop --savepointPath <target-directory> <job-id>

Scaling Considerations

Kafka Partition Strategy

Choose partition keys based on query patterns:

  • User-based: user_id for per-user processing and stateful joins
  • Geographic: region_id for regional aggregation and data locality
  • Composite: Combine keys for balanced distribution across partitions

Scale processing by adjusting parallelism:

env.setParallelism(4);      // Global parallelism
source.setParallelism(8);   // Source parallelism (increase for I/O bound)
sink.setParallelism(4);     // Sink parallelism

Flink 2.2’s balanced tasks scheduling (FLINK-31757) distributes tasks evenly across TaskManagers, reducing bottlenecks without manual tuning.

Capacity Planning

Estimate required resources:

Throughput Partitions Flink Task Managers Memory State Backend
10K events/sec 10-20 2-4 16GB HashMap
100K events/sec 50-100 8-16 64GB RocksDB
1M events/sec 200+ 32+ 256GB+ RocksDB

Rate Limiting for External Systems

Flink 2.2 introduces the RateLimiter interface for scan sources. This prevents Flink jobs from overwhelming external systems during backfill or burst:

DataStreamSource<Record> source = env.fromSource(
    MySource.<Record>builder()
        .setRateLimiter(new TokenBucketRateLimiter(10000)) // 10K req/s
        .build(),
    WatermarkStrategy.noWatermarks(),
    "throttled-source"
);

Best Practices

1. Design for Late Data

Use watermarks with allowed lateness to handle out-of-order events:

DataStream<Event> stream = env
    .fromSource(source, WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(30))
        .withTimestampAssigner((event, ts) -> event.getEventTime()),
        "source")
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(2))
    .sideOutputLateData(lateOutputTag);

2. Monitor Processing Lag

Track event-time vs. processing-time gap. Flink exposes currentProcessTime and currentWatermark metrics. Kafka 4.2 adds improved lag metrics for share groups.

3. Backpressure Handling

Design for burst traffic with buffers and rate limiters:

# Flink configuration
taskmanager.memory.managed.size: 512mb
taskmanager.network.memory.max: 256mb
taskmanager.network.memory.buffers-per-channel: 16

4. State Size Management

Regularly clear unnecessary state with TTL:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofHours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(
        StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
    .build();

ValueStateDescriptor<Session> desc =
    new ValueStateDescriptor<>("session", Session.class);
desc.enableTimeToLive(ttlConfig);

5. Schema Evolution

Plan for schema changes in events. Use Avro or Protobuf with a Schema Registry:

// Kafka consumer with schema registry
Properties props = new Properties();
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("specific.avro.reader", "true");
Use Case Recommended API
Simple transformations, filters, maps DataStream API
SQL queries, windowed aggregations Table API / SQL
Complex event processing (CEP) DataStream API + CEP library
ML inference, LLM calls ML_PREDICT / Table API
Vector similarity search VECTOR_SEARCH SQL
Stream processing with Python UDFs PyFlink DataStream API

Challenges and Solutions

Challenge: Out-of-Order Events

Solution: Use event-time processing with watermarks and allowed lateness:

DataStream<Event> stream = env
    .fromSource(
        kafkaSource,
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner(
                (event, timestamp) -> event.getTimestamp()),
        "kafka-source")
    .keyBy(Event::getKey)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1));

Challenge: Exactly-Once Sinks (Transactional)

Solution: Use transactional sinks with two-phase commit:

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("localhost:9092")
    .setRecordSerializer(new SimpleStringSerializer())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-sink-")
    .build();

Challenge: State Explosion in Long-Running Jobs

Solution: Implement state TTL, use RocksDB for large state, and design compaction strategies:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofHours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(
        StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
    .build();

Challenge: Zombie Transactions in Kafka

Solution: Kafka 4.2+ provides stricter epoch validation for Transaction Version 2 markers (KIP-890, KIP-1050, KIP-1229), reducing the chances of zombie transactions during producer failures:

# Broker-side transaction defense
transaction.state.log.min.isr=2
transaction.max.timeout.ms=300000
producer.id.expiration.ms=86400000

Challenge: Debugging Production Pipelines

Solution: Flink 2.3 introduces OTel gRPC exporter for metrics. Kafka 4.2 adds unified metric naming (kafka.COMPONENT pattern) and idle-ratio metrics. Both platforms now support structured logging and trace propagation:

# Flink OTel configuration
metrics.reporter.otel.factory.class:
  org.apache.flink.metrics.opentelemetry.OpenTelemetryMetricReporterFactory
metrics.reporter.otel.endpoint: http://otel-collector:4317

Shift-Left Architecture

Data processing is moving earlier in the data lifecycle. The streaming layer is becoming the first place where data is enriched, transformed, and analyzed — before it ever lands in a database or data lake. This Shift-Left Architecture reduces storage costs, minimizes data movement, and enables sub-second decision-making.

Unified Real-Time Lakehouse

Flink’s Materialized Tables and Iceberg/Paimon integration make the streaming lakehouse a production reality. Organizations run streaming pipelines that write directly to open table formats, enabling both real-time queries and batch analytics on the same dataset.

Agentic AI over Streams

Flink Agents and ML_PREDICT enable autonomous AI agents that react to streaming events in real-time. Use cases include automated incident response, dynamic pricing, and self-optimizing pipelines that detect and correct anomalies without human intervention.

Kafka as Unified Messaging Platform

With Share Groups production-ready in Kafka 4.2, organizations can consolidate streaming (pub/sub) and queuing (point-to-point) workloads onto a single platform, eliminating the need to operate separate systems like RabbitMQ or SQS alongside Kafka.

Resources

Comments

👍 Was this article helpful?