Introduction
Real-time analytics enables instant decision-making. From fraud detection to operational dashboards, the ability to process data in milliseconds transforms business outcomes.
Key Statistics:
- Real-time analytics market: $50B by 2027
- 73% of enterprises are investing in real-time capabilities
- Average latency improvement: 1000x over batch
- ClickHouse processes 1TB+ per second in production
Streaming Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Real-time Analytics Pipeline โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Sources โโโโโถโ Kafka โโโโโถโ Process โโโโโถโ Sink โ โ
โ โ (Logs, โ โ Topics โ โ (Flink, โ โ (OLAP, โ โ
โ โ IoT) โ โ โ โ Streams)โ โ Redis) โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Stream Joins โ โ
โ โ Windowed Aggs โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Apache Flink
Streaming Joins
// Flink Streaming Join
DataStream<Order> orders = env.addSource(new OrderSource());
DataStream<User> users = env.addSource(new UserSource());
// Tumbling window join
DataStream<OrderWithUser> joined = orders
.join(users)
.where(Order::getUserId)
.equalTo(User::getId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.with((order, user) -> new OrderWithUser(order, user));
// Interval join (join within time bounds)
DataStream<ClickEvent> clicks = env.addSource(new ClickSource());
DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseSource());
DataStream<Conversion> conversions = clicks
.keyBy(ClickEvent::getUserId)
.intervalJoin(purchases.keyBy(PurchaseEvent::getUserId))
.between(Time.minutes(-30), Time.minutes(0))
.process(new ConversionProcessFunction());
Windowed Aggregations
// Sliding window with late data handling
DataStream<RevenuePerMinute> revenue = orders
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withTimestampAssigner((order, timestamp) -> order.getTimestamp())
)
.keyBy(Order::getProductId)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.allowedLateness(Time.minutes(5))
.sum("amount")
.name("Revenue Aggregation");
Kafka Streams
Aggregation
// Kafka Streams aggregation
KStream<String, Order> orders = builder.stream("orders");
// Running count
KTable<String, Long> orderCounts = orders
.groupBy((key, order) -> order.getUserId())
.count(Materialized.as("order-counts-store"));
// Session window aggregation
KTable<String, Long> sessionCounts = orders
.groupBy((key, order) -> order.getUserId())
.windowedBy(SessionWindows.with(Duration.ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(Duration.ofMinutes(1)));
// Tumbling window (KTable)
KTable<Windowed<String>, Long> windowedCounts = orders
.groupBy((key, order) -> KeyValue.pair(order.getCategory(), order.getOrderId()))
.windowedBy(TumblingWindows.of(Duration.ofHours(1)))
.count();
Stream Joining
// KStream-KStream join
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Delivery> deliveries = builder.stream("deliveries");
KStream<String, OrderWithDelivery> enriched = orders
.join(deliveries,
(order, delivery) -> new OrderWithDelivery(order, delivery),
JoinWindows.of(Duration.ofHours(1)),
StreamJoined.with(Serdes.String(), orderSerde, deliverySerde));
ClickHouse
Real-time Tables
-- Create MergeTree table for real-time analytics
CREATE TABLE events (
event_id UUID,
event_type String,
user_id String,
properties JSON,
timestamp DateTime,
timestamp_ms UInt64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (event_type, user_id, timestamp)
TTL timestamp + INTERVAL 30 DAY
SETTINGS index_granularity = 8192;
-- SummingMergeTree for aggregations
CREATE TABLE hourly_metrics (
event_type String,
hour DateTime,
user_id String,
event_count UInt64,
revenue Float64
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour, user_id);
-- Insert aggregated data
INSERT INTO hourly_metrics
SELECT
event_type,
toStartOfHour(timestamp) AS hour,
user_id,
count() AS event_count,
sum(amount) AS revenue
FROM events
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY event_type, hour, user_id;
Real-time Queries
-- Recent events with limit
SELECT event_type, user_id, timestamp
FROM events
ORDER BY timestamp DESC
LIMIT 1000;
-- Time-series aggregation
SELECT
toStartOfMinute(timestamp) AS minute,
event_type,
count() AS events,
uniqExact(user_id) AS unique_users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute, event_type
ORDER BY minute;
-- Retention query
SELECT
toDate(first_seen) AS cohort_date,
count(DISTINCT user_id) AS cohort_size,
countIf(toDate(event_time) = first_seen + 1) AS day_1_retention,
countIf(toDate(event_time) = first_seen + 7) AS day_7_retention
FROM (
SELECT
user_id,
MIN(timestamp) AS first_seen,
event_time
FROM events
WHERE event_type = 'signup'
GROUP BY user_id, event_time
)
GROUP BY cohort_date;
Streaming Patterns
Exactly-Once Processing
// Flink exactly-once with Checkpointing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every minute
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setTolerableCheckpointFailureNumber(3);
// Enable exactly-once for Kafka
KafkaSource<Event> source = KafkaSource.<Event>builder()
.setBootstrapServers("kafka:9092")
.setGroupId("flink-consumer")
.setTopics("events")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.LATEST))
.setDeserializer(KafkaRecordDeserializationSchema.of(new EventDeserializer()))
.setProperty("isolation.level", "read_committed")
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Backpressure Handling
// Flink backpressure handling
env.getCheckpointConfig().setUnalignedEnabled(true);
env.setBufferTimeout(100);
// ProcessFunction with buffering
public class BufferedProcessFunction extends ProcessFunction<Event, Result> {
private final List<Event> buffer = new ArrayList<>();
@Override
public void processElement(Event event, Context ctx, Collector<Result> out) {
buffer.add(event);
if (buffer.size() >= 100) {
flushBuffer(out);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
flushBuffer(out);
}
}
Comments