Skip to main content

Real-time Analytics: Streaming Aggregations, OLAP

Created: February 18, 2026 4 min read

Introduction

Real-time analytics enables instant decision-making. From fraud detection to operational dashboards, the ability to process data in milliseconds transforms business outcomes.

Key Statistics:

  • Real-time analytics market: $50B by 2027
  • 73% of enterprises are investing in real-time capabilities
  • Average latency improvement: 1000x over batch
  • ClickHouse processes 1TB+ per second in production

Streaming Architecture

┌─────────────────────────────────────────────────────────────────┐
│                  Real-time Analytics Pipeline                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐     │
│  │ Sources │───▶│ Kafka   │───▶│ Process │───▶│  Sink   │     │
│  │ (Logs,  │    │ Topics  │    │ (Flink, │    │ (OLAP,  │     │
│  │  IoT)   │    │         │    │ Streams)│    │  Redis) │     │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘     │
│                           │                                   │
│                           ▼                                   │
│                  ┌─────────────────┐                          │
│                  │  Stream Joins   │                          │
│                  │  Windowed Aggs  │                          │
│                  └─────────────────┘                          │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Streaming Joins

// Flink Streaming Join
DataStream<Order> orders = env.addSource(new OrderSource());
DataStream<User> users = env.addSource(new UserSource());

// Tumbling window join
DataStream<OrderWithUser> joined = orders
    .join(users)
    .where(Order::getUserId)
    .equalTo(User::getId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .with((order, user) -> new OrderWithUser(order, user));

// Interval join (join within time bounds)
DataStream<ClickEvent> clicks = env.addSource(new ClickSource());
DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseSource());

DataStream<Conversion> conversions = clicks
    .keyBy(ClickEvent::getUserId)
    .intervalJoin(purchases.keyBy(PurchaseEvent::getUserId))
    .between(Time.minutes(-30), Time.minutes(0))
    .process(new ConversionProcessFunction());

Windowed Aggregations

// Sliding window with late data handling
DataStream<RevenuePerMinute> revenue = orders
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Order>forBoundedOutOfOrderness(Duration.ofMinutes(5))
            .withTimestampAssigner((order, timestamp) -> order.getTimestamp())
    )
    .keyBy(Order::getProductId)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    .allowedLateness(Time.minutes(5))
    .sum("amount")
    .name("Revenue Aggregation");

Kafka Streams

Aggregation

// Kafka Streams aggregation
KStream<String, Order> orders = builder.stream("orders");

// Running count
KTable<String, Long> orderCounts = orders
    .groupBy((key, order) -> order.getUserId())
    .count(Materialized.as("order-counts-store"));

// Session window aggregation
KTable<String, Long> sessionCounts = orders
    .groupBy((key, order) -> order.getUserId())
    .windowedBy(SessionWindows.with(Duration.ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(Duration.ofMinutes(1)));

// Tumbling window (KTable)
KTable<Windowed<String>, Long> windowedCounts = orders
    .groupBy((key, order) -> KeyValue.pair(order.getCategory(), order.getOrderId()))
    .windowedBy(TumblingWindows.of(Duration.ofHours(1)))
    .count();

Stream Joining

// KStream-KStream join
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Delivery> deliveries = builder.stream("deliveries");

KStream<String, OrderWithDelivery> enriched = orders
    .join(deliveries,
        (order, delivery) -> new OrderWithDelivery(order, delivery),
        JoinWindows.of(Duration.ofHours(1)),
        StreamJoined.with(Serdes.String(), orderSerde, deliverySerde));

ClickHouse

Real-time Tables

-- Create MergeTree table for real-time analytics
CREATE TABLE events (
    event_id UUID,
    event_type String,
    user_id String,
    properties JSON,
    timestamp DateTime,
    timestamp_ms UInt64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (event_type, user_id, timestamp)
TTL timestamp + INTERVAL 30 DAY
SETTINGS index_granularity = 8192;

-- SummingMergeTree for aggregations
CREATE TABLE hourly_metrics (
    event_type String,
    hour DateTime,
    user_id String,
    event_count UInt64,
    revenue Float64
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour, user_id);

-- Insert aggregated data
INSERT INTO hourly_metrics
SELECT 
    event_type,
    toStartOfHour(timestamp) AS hour,
    user_id,
    count() AS event_count,
    sum(amount) AS revenue
FROM events
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY event_type, hour, user_id;

Real-time Queries

-- Recent events with limit
SELECT event_type, user_id, timestamp
FROM events
ORDER BY timestamp DESC
LIMIT 1000;

-- Time-series aggregation
SELECT 
    toStartOfMinute(timestamp) AS minute,
    event_type,
    count() AS events,
    uniqExact(user_id) AS unique_users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute, event_type
ORDER BY minute;

-- Retention query
SELECT 
    toDate(first_seen) AS cohort_date,
    count(DISTINCT user_id) AS cohort_size,
    countIf(toDate(event_time) = first_seen + 1) AS day_1_retention,
    countIf(toDate(event_time) = first_seen + 7) AS day_7_retention
FROM (
    SELECT 
        user_id,
        MIN(timestamp) AS first_seen,
        event_time
    FROM events
    WHERE event_type = 'signup'
    GROUP BY user_id, event_time
)
GROUP BY cohort_date;

Streaming Patterns

Exactly-Once Processing

// Flink exactly-once with Checkpointing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every minute
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setTolerableCheckpointFailureNumber(3);

// Enable exactly-once for Kafka
KafkaSource<Event> source = KafkaSource.<Event>builder()
    .setBootstrapServers("kafka:9092")
    .setGroupId("flink-consumer")
    .setTopics("events")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.LATEST))
    .setDeserializer(KafkaRecordDeserializationSchema.of(new EventDeserializer()))
    .setProperty("isolation.level", "read_committed")
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

Backpressure Handling

// Flink backpressure handling
env.getCheckpointConfig().setUnalignedEnabled(true);

env.setBufferTimeout(100);

// ProcessFunction with buffering
public class BufferedProcessFunction extends ProcessFunction<Event, Result> {
    private final List<Event> buffer = new ArrayList<>();
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) {
        buffer.add(event);
        
        if (buffer.size() >= 100) {
            flushBuffer(out);
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
        flushBuffer(out);
    }
}

External Resources


Comments

Share this article

Scan to read on mobile