Skip to main content
โšก Calmops

Building Real-Time APIs: WebSockets, SSE, and gRPC Streaming

Introduction

Modern applications require real-time communication. Whether it’s live notifications, collaborative editing, or streaming data, you need the right technology for the job.

This guide covers real-time API technologies: WebSockets, Server-Sent Events (SSE), and gRPC streaming - when to use each, how to implement them, and best practices.


Real-Time Communication Patterns

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              REAL-TIME PATTERNS                                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚   Request-Response              Server Push                          โ”‚
โ”‚   โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€          โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€                          โ”‚
โ”‚                                                                      โ”‚
โ”‚   Client โ”€โ”€โ”€โ”€Requestโ”€โ”€โ”€โ”€โ–ถโ”‚  Clientโ—€โ”€โ”€โ”€โ”€โ”€Pushโ”€โ”€โ”€โ”€โ”‚ Server            โ”‚
โ”‚         โ—€โ”€โ”€โ”€Responseโ”€โ”€โ”€โ”€โ”€โ”€โ”‚        โ—€โ”€โ”€โ”€โ”€โ”€Dataโ”€โ”€โ”€โ”€โ”€โ”€โ”‚                  โ”‚
โ”‚                                                                      โ”‚
โ”‚   Use Cases:                  Use Cases:                              โ”‚
โ”‚   โ€ข API calls                โ€ข Chat/messaging                        โ”‚
โ”‚   โ€ข Data retrieval           โ€ข Live notifications                    โ”‚
โ”‚   โ€ข One-time actions         โ€ข Streaming data                       โ”‚
โ”‚   โ€ข CRUD operations          โ€ข Collaborative editing                 โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

WebSockets

When to Use

  • Bidirectional communication
  • Low-latency messaging
  • Chat applications
  • Gaming
  • Real-time collaboration

Implementation

# WebSocket Server with FastAPI
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # Handle different message types
            if message["type"] == "chat":
                await manager.broadcast(json.dumps({
                    "type": "chat",
                    "client_id": client_id,
                    "message": message["content"]
                }))
            elif message["type"] == "ping":
                await manager.send_personal_message("pong", websocket)
                
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(json.dumps({
            "type": "leave",
            "client_id": client_id
        }))

WebSocket with Authentication

# Authenticated WebSocket
@app.websocket("/ws/authenticated")
async def authenticated_websocket(websocket: WebSocket):
    # Authenticate via query param or initial message
    await websocket.accept()
    
    try:
        # Receive auth token
        auth_message = await websocket.receive_json()
        token = auth_message.get("token")
        
        # Validate token
        user = await validate_token(token)
        if not user:
            await websocket.send_json({"error": "Unauthorized"})
            await websocket.close()
            return
        
        # Authenticated - now handle messages
        while True:
            data = await websocket.receive_text()
            await handle_authenticated_message(user, data)
            
    except Exception:
        await websocket.close()

Server-Sent Events (SSE)

When to Use

  • Server-to-client streaming
  • Live feeds
  • Stock tickers
  • Notifications
  • Progress updates

Implementation

# SSE Endpoint with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_stream():
    """Generate SSE events"""
    import time
    
    counter = 0
    while True:
        # Create SSE message
        data = json.dumps({
            "counter": counter,
            "timestamp": time.time()
        })
        
        yield f"data: {data}\n\n"
        
        counter += 1
        await asyncio.sleep(1)

@app.get("/stream")
async def sse_endpoint():
    """SSE endpoint"""
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

# Client-side JavaScript
"""
const eventSource = new EventSource('/stream');

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log('Received:', data);
};

eventSource.onerror = (error) => {
    console.error('SSE error:', error);
};
"""

SSE with Authentication

# Authenticated SSE
async def authenticated_event_stream(token: str):
    """Stream with authentication"""
    user = await validate_token(token)
    if not user:
        yield "data: {\"error\": \"Unauthorized\"}\n\n"
        return
    
    # Stream user-specific events
    async for event in get_user_events(user.id):
        yield f"data: {json.dumps(event)}\n\n"

@app.get("/stream/user")
async def user_sse_endpoint(token: str = None):
    if not token:
        return {"error": "Token required"}
    
    return StreamingResponse(
        authenticated_event_stream(token),
        media_type="text/event-stream"
    )

gRPC Streaming

When to Use

  • High-performance streaming
  • Inter-service communication
  • Bidirectional streaming
  • Language-agnostic APIs

Protocol Buffer Definition

// streaming.proto
syntax = "proto3";

package streaming;

service DataStreamService {
    // Server streaming - client requests, server streams
    rpc FetchData(DataRequest) returns (stream DataResponse);
    
    // Client streaming - client streams, server responds
    rpc UploadData(stream UploadRequest) returns (UploadResponse);
    
    // Bidirectional streaming - both stream
    rpc ProcessStream(stream ProcessRequest) returns (stream ProcessResponse);
}

message DataRequest {
    string query = 1;
    int32 limit = 2;
}

message DataResponse {
    string id = 1;
    string content = 2;
    int64 timestamp = 3;
}

message UploadRequest {
    bytes data = 1;
    string filename = 2;
}

message UploadResponse {
    bool success = 1;
    int32 records_processed = 2;
}

message ProcessRequest {
    string input = 1;
}

message ProcessResponse {
    string output = 1;
    bool complete = 2;
}

Python gRPC Server

# streaming_server.py
import grpc
from concurrent import futures
import streaming_pb2
import streaming_pb2_grpc

class DataStreamServicer(streaming_pb2_grpc.DataStreamServiceServicer):
    
    def FetchData(self, request, context):
        """Server streaming"""
        # Stream multiple responses
        for i in range(request.limit):
            yield streaming_pb2.DataResponse(
                id=f"item_{i}",
                content=f"Data item {i}",
                timestamp=time.time()
            )
    
    def UploadData(self, request_iterator, context):
        """Client streaming"""
        total_data = b""
        filename = ""
        
        for chunk in request_iterator:
            total_data += chunk.data
            if chunk.filename:
                filename = chunk.filename
        
        # Process and return
        records = process_data(total_data)
        
        return streaming_pb2.UploadResponse(
            success=True,
            records_processed=records
        )
    
    def ProcessStream(self, request_iterator, context):
        """Bidirectional streaming"""
        for request in request_iterator:
            result = process(request.input)
            
            yield streaming_pb2.ProcessResponse(
                output=result,
                complete=False
            )
        
        # Signal completion
        yield streaming_pb2.ProcessResponse(
            output="",
            complete=True
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    streaming_pb2_grpc.add_DataStreamServiceServicer_to_server(
        DataStreamServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

Comparison

Feature WebSockets SSE gRPC Streaming
Direction Bidirectional Serverโ†’Client Bidirectional
Protocol WS/WSS HTTP/2 HTTP/2
Browser Support Excellent Excellent Limited
Complexity Medium Low High
Performance High Medium Very High
Auto-reconnect Manual Built-in Manual
Binary Data Yes No Yes
Use Case Chat, Games Feeds, Updates Microservices

Choosing the Right Technology

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              DECISION GUIDE                                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                      โ”‚
โ”‚   Need bidirectional?                                                โ”‚
โ”‚   โ”‚                                                                  โ”‚
โ”‚   โ”œโ”€โ”€ No โ”€โ”€โ–ถ Need browser?                                          โ”‚
โ”‚   โ”‚           โ”‚                                                      โ”‚
โ”‚   โ”‚           โ”œโ”€โ”€ Yes โ”€โ”€โ–ถ SSE                                       โ”‚
โ”‚   โ”‚           โ”‚                                                      โ”‚
โ”‚   โ”‚           โ””โ”€โ”€ No โ”€โ”€โ–ถ gRPC streaming                             โ”‚
โ”‚   โ”‚                                                                  โ”‚
โ”‚   โ””โ”€โ”€ Yes โ”€โ”€โ–ถ Need binary data?                                     โ”‚
โ”‚               โ”‚                                                      โ”‚
โ”‚               โ”œโ”€โ”€ Yes โ”€โ”€โ–ถ WebSockets or gRPC                        โ”‚
โ”‚               โ”‚                                                      โ”‚
โ”‚               โ””โ”€โ”€ No โ”€โ”€โ–ถ Need high performance?                      โ”‚
โ”‚                           โ”‚                                           โ”‚
โ”‚                           โ”œโ”€โ”€ Yes โ”€โ”€โ–ถ gRPC streaming                  โ”‚
โ”‚                           โ”‚                                           โ”‚
โ”‚                           โ””โ”€โ”€ No โ”€โ”€โ–ถ WebSockets                       โ”‚
โ”‚                                                                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Best Practices

WebSockets

# Good: Heartbeat for connection health
async def websocket_with_heartbeat(websocket: WebSocket):
    await websocket.accept()
    
    async def send_heartbeat():
        while True:
            await websocket.send_text(json.dumps({"type": "heartbeat"}))
            await asyncio.sleep(30)
    
    heartbeat_task = asyncio.create_task(send_heartbeat())
    
    try:
        while True:
            data = await websocket.receive_text()
            await handle_message(data)
    except:
        heartbeat_task.cancel()

SSE

# Good: Proper error handling and reconnection
"""
Client-side:
"""
const eventSource = new EventSource('/stream');

// Exponential backoff for reconnection
eventSource.onerror = (error) => {
    eventSource.close();
    
    // Wait and retry
    setTimeout(() => {
        eventSource = new EventSource('/stream');
    }, Math.min(1000 * Math.pow(2, retryCount), 30000));
};
"""

# Server-side: Graceful shutdown
async def event_stream():
    try:
        while True:
            yield f"data: {json.dumps(get_data())}\n\n"
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        yield f"data: {json.dumps({'status': 'closing'})}\n\n"
        raise

Conclusion

Choose the right real-time technology:

  • WebSockets: Bidirectional, browser-based apps
  • SSE: Simple server-to-client streaming
  • gRPC Streaming: High-performance microservices

Comments