Skip to main content
โšก Calmops

Real-time Analytics: Streaming Aggregations, OLAP

Introduction

Real-time analytics enables instant decision-making. From fraud detection to operational dashboards, the ability to process data in milliseconds transforms business outcomes.

Key Statistics:

  • Real-time analytics market: $50B by 2027
  • 73% of enterprises are investing in real-time capabilities
  • Average latency improvement: 1000x over batch
  • ClickHouse processes 1TB+ per second in production

Streaming Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Real-time Analytics Pipeline                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚  โ”‚ Sources โ”‚โ”€โ”€โ”€โ–ถโ”‚ Kafka   โ”‚โ”€โ”€โ”€โ–ถโ”‚ Process โ”‚โ”€โ”€โ”€โ–ถโ”‚  Sink   โ”‚     โ”‚
โ”‚  โ”‚ (Logs,  โ”‚    โ”‚ Topics  โ”‚    โ”‚ (Flink, โ”‚    โ”‚ (OLAP,  โ”‚     โ”‚
โ”‚  โ”‚  IoT)   โ”‚    โ”‚         โ”‚    โ”‚ Streams)โ”‚    โ”‚  Redis) โ”‚     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚
โ”‚                           โ”‚                                   โ”‚
โ”‚                           โ–ผ                                   โ”‚
โ”‚                  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                          โ”‚
โ”‚                  โ”‚  Stream Joins   โ”‚                          โ”‚
โ”‚                  โ”‚  Windowed Aggs  โ”‚                          โ”‚
โ”‚                  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Streaming Joins

// Flink Streaming Join
DataStream<Order> orders = env.addSource(new OrderSource());
DataStream<User> users = env.addSource(new UserSource());

// Tumbling window join
DataStream<OrderWithUser> joined = orders
    .join(users)
    .where(Order::getUserId)
    .equalTo(User::getId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .with((order, user) -> new OrderWithUser(order, user));

// Interval join (join within time bounds)
DataStream<ClickEvent> clicks = env.addSource(new ClickSource());
DataStream<PurchaseEvent> purchases = env.addSource(new PurchaseSource());

DataStream<Conversion> conversions = clicks
    .keyBy(ClickEvent::getUserId)
    .intervalJoin(purchases.keyBy(PurchaseEvent::getUserId))
    .between(Time.minutes(-30), Time.minutes(0))
    .process(new ConversionProcessFunction());

Windowed Aggregations

// Sliding window with late data handling
DataStream<RevenuePerMinute> revenue = orders
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Order>forBoundedOutOfOrderness(Duration.ofMinutes(5))
            .withTimestampAssigner((order, timestamp) -> order.getTimestamp())
    )
    .keyBy(Order::getProductId)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
    .allowedLateness(Time.minutes(5))
    .sum("amount")
    .name("Revenue Aggregation");

Kafka Streams

Aggregation

// Kafka Streams aggregation
KStream<String, Order> orders = builder.stream("orders");

// Running count
KTable<String, Long> orderCounts = orders
    .groupBy((key, order) -> order.getUserId())
    .count(Materialized.as("order-counts-store"));

// Session window aggregation
KTable<String, Long> sessionCounts = orders
    .groupBy((key, order) -> order.getUserId())
    .windowedBy(SessionWindows.with(Duration.ofMinutes(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(Duration.ofMinutes(1)));

// Tumbling window (KTable)
KTable<Windowed<String>, Long> windowedCounts = orders
    .groupBy((key, order) -> KeyValue.pair(order.getCategory(), order.getOrderId()))
    .windowedBy(TumblingWindows.of(Duration.ofHours(1)))
    .count();

Stream Joining

// KStream-KStream join
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Delivery> deliveries = builder.stream("deliveries");

KStream<String, OrderWithDelivery> enriched = orders
    .join(deliveries,
        (order, delivery) -> new OrderWithDelivery(order, delivery),
        JoinWindows.of(Duration.ofHours(1)),
        StreamJoined.with(Serdes.String(), orderSerde, deliverySerde));

ClickHouse

Real-time Tables

-- Create MergeTree table for real-time analytics
CREATE TABLE events (
    event_id UUID,
    event_type String,
    user_id String,
    properties JSON,
    timestamp DateTime,
    timestamp_ms UInt64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (event_type, user_id, timestamp)
TTL timestamp + INTERVAL 30 DAY
SETTINGS index_granularity = 8192;

-- SummingMergeTree for aggregations
CREATE TABLE hourly_metrics (
    event_type String,
    hour DateTime,
    user_id String,
    event_count UInt64,
    revenue Float64
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour, user_id);

-- Insert aggregated data
INSERT INTO hourly_metrics
SELECT 
    event_type,
    toStartOfHour(timestamp) AS hour,
    user_id,
    count() AS event_count,
    sum(amount) AS revenue
FROM events
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY event_type, hour, user_id;

Real-time Queries

-- Recent events with limit
SELECT event_type, user_id, timestamp
FROM events
ORDER BY timestamp DESC
LIMIT 1000;

-- Time-series aggregation
SELECT 
    toStartOfMinute(timestamp) AS minute,
    event_type,
    count() AS events,
    uniqExact(user_id) AS unique_users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute, event_type
ORDER BY minute;

-- Retention query
SELECT 
    toDate(first_seen) AS cohort_date,
    count(DISTINCT user_id) AS cohort_size,
    countIf(toDate(event_time) = first_seen + 1) AS day_1_retention,
    countIf(toDate(event_time) = first_seen + 7) AS day_7_retention
FROM (
    SELECT 
        user_id,
        MIN(timestamp) AS first_seen,
        event_time
    FROM events
    WHERE event_type = 'signup'
    GROUP BY user_id, event_time
)
GROUP BY cohort_date;

Streaming Patterns

Exactly-Once Processing

// Flink exactly-once with Checkpointing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every minute
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setTolerableCheckpointFailureNumber(3);

// Enable exactly-once for Kafka
KafkaSource<Event> source = KafkaSource.<Event>builder()
    .setBootstrapServers("kafka:9092")
    .setGroupId("flink-consumer")
    .setTopics("events")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.LATEST))
    .setDeserializer(KafkaRecordDeserializationSchema.of(new EventDeserializer()))
    .setProperty("isolation.level", "read_committed")
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

Backpressure Handling

// Flink backpressure handling
env.getCheckpointConfig().setUnalignedEnabled(true);

env.setBufferTimeout(100);

// ProcessFunction with buffering
public class BufferedProcessFunction extends ProcessFunction<Event, Result> {
    private final List<Event> buffer = new ArrayList<>();
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) {
        buffer.add(event);
        
        if (buffer.size() >= 100) {
            flushBuffer(out);
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) {
        flushBuffer(out);
    }
}

External Resources


Comments