Introduction
Modern applications require real-time communication. Whether it’s live notifications, collaborative editing, or streaming data, you need the right technology for the job.
This guide covers real-time API technologies: WebSockets, Server-Sent Events (SSE), and gRPC streaming - when to use each, how to implement them, and best practices.
Real-Time Communication Patterns
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ REAL-TIME PATTERNS โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Request-Response Server Push โ
โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ
โ Client โโโโRequestโโโโโถโ ClientโโโโโโPushโโโโโ Server โ
โ โโโโResponseโโโโโโโ โโโโโโDataโโโโโโโ โ
โ โ
โ Use Cases: Use Cases: โ
โ โข API calls โข Chat/messaging โ
โ โข Data retrieval โข Live notifications โ
โ โข One-time actions โข Streaming data โ
โ โข CRUD operations โข Collaborative editing โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
WebSockets
When to Use
- Bidirectional communication
- Low-latency messaging
- Chat applications
- Gaming
- Real-time collaboration
Implementation
# WebSocket Server with FastAPI
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
# Handle different message types
if message["type"] == "chat":
await manager.broadcast(json.dumps({
"type": "chat",
"client_id": client_id,
"message": message["content"]
}))
elif message["type"] == "ping":
await manager.send_personal_message("pong", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(json.dumps({
"type": "leave",
"client_id": client_id
}))
WebSocket with Authentication
# Authenticated WebSocket
@app.websocket("/ws/authenticated")
async def authenticated_websocket(websocket: WebSocket):
# Authenticate via query param or initial message
await websocket.accept()
try:
# Receive auth token
auth_message = await websocket.receive_json()
token = auth_message.get("token")
# Validate token
user = await validate_token(token)
if not user:
await websocket.send_json({"error": "Unauthorized"})
await websocket.close()
return
# Authenticated - now handle messages
while True:
data = await websocket.receive_text()
await handle_authenticated_message(user, data)
except Exception:
await websocket.close()
Server-Sent Events (SSE)
When to Use
- Server-to-client streaming
- Live feeds
- Stock tickers
- Notifications
- Progress updates
Implementation
# SSE Endpoint with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def event_stream():
"""Generate SSE events"""
import time
counter = 0
while True:
# Create SSE message
data = json.dumps({
"counter": counter,
"timestamp": time.time()
})
yield f"data: {data}\n\n"
counter += 1
await asyncio.sleep(1)
@app.get("/stream")
async def sse_endpoint():
"""SSE endpoint"""
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
# Client-side JavaScript
"""
const eventSource = new EventSource('/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
};
"""
SSE with Authentication
# Authenticated SSE
async def authenticated_event_stream(token: str):
"""Stream with authentication"""
user = await validate_token(token)
if not user:
yield "data: {\"error\": \"Unauthorized\"}\n\n"
return
# Stream user-specific events
async for event in get_user_events(user.id):
yield f"data: {json.dumps(event)}\n\n"
@app.get("/stream/user")
async def user_sse_endpoint(token: str = None):
if not token:
return {"error": "Token required"}
return StreamingResponse(
authenticated_event_stream(token),
media_type="text/event-stream"
)
gRPC Streaming
When to Use
- High-performance streaming
- Inter-service communication
- Bidirectional streaming
- Language-agnostic APIs
Protocol Buffer Definition
// streaming.proto
syntax = "proto3";
package streaming;
service DataStreamService {
// Server streaming - client requests, server streams
rpc FetchData(DataRequest) returns (stream DataResponse);
// Client streaming - client streams, server responds
rpc UploadData(stream UploadRequest) returns (UploadResponse);
// Bidirectional streaming - both stream
rpc ProcessStream(stream ProcessRequest) returns (stream ProcessResponse);
}
message DataRequest {
string query = 1;
int32 limit = 2;
}
message DataResponse {
string id = 1;
string content = 2;
int64 timestamp = 3;
}
message UploadRequest {
bytes data = 1;
string filename = 2;
}
message UploadResponse {
bool success = 1;
int32 records_processed = 2;
}
message ProcessRequest {
string input = 1;
}
message ProcessResponse {
string output = 1;
bool complete = 2;
}
Python gRPC Server
# streaming_server.py
import grpc
from concurrent import futures
import streaming_pb2
import streaming_pb2_grpc
class DataStreamServicer(streaming_pb2_grpc.DataStreamServiceServicer):
def FetchData(self, request, context):
"""Server streaming"""
# Stream multiple responses
for i in range(request.limit):
yield streaming_pb2.DataResponse(
id=f"item_{i}",
content=f"Data item {i}",
timestamp=time.time()
)
def UploadData(self, request_iterator, context):
"""Client streaming"""
total_data = b""
filename = ""
for chunk in request_iterator:
total_data += chunk.data
if chunk.filename:
filename = chunk.filename
# Process and return
records = process_data(total_data)
return streaming_pb2.UploadResponse(
success=True,
records_processed=records
)
def ProcessStream(self, request_iterator, context):
"""Bidirectional streaming"""
for request in request_iterator:
result = process(request.input)
yield streaming_pb2.ProcessResponse(
output=result,
complete=False
)
# Signal completion
yield streaming_pb2.ProcessResponse(
output="",
complete=True
)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
streaming_pb2_grpc.add_DataStreamServiceServicer_to_server(
DataStreamServicer(), server
)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
Comparison
| Feature | WebSockets | SSE | gRPC Streaming |
|---|---|---|---|
| Direction | Bidirectional | ServerโClient | Bidirectional |
| Protocol | WS/WSS | HTTP/2 | HTTP/2 |
| Browser Support | Excellent | Excellent | Limited |
| Complexity | Medium | Low | High |
| Performance | High | Medium | Very High |
| Auto-reconnect | Manual | Built-in | Manual |
| Binary Data | Yes | No | Yes |
| Use Case | Chat, Games | Feeds, Updates | Microservices |
Choosing the Right Technology
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DECISION GUIDE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Need bidirectional? โ
โ โ โ
โ โโโ No โโโถ Need browser? โ
โ โ โ โ
โ โ โโโ Yes โโโถ SSE โ
โ โ โ โ
โ โ โโโ No โโโถ gRPC streaming โ
โ โ โ
โ โโโ Yes โโโถ Need binary data? โ
โ โ โ
โ โโโ Yes โโโถ WebSockets or gRPC โ
โ โ โ
โ โโโ No โโโถ Need high performance? โ
โ โ โ
โ โโโ Yes โโโถ gRPC streaming โ
โ โ โ
โ โโโ No โโโถ WebSockets โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Best Practices
WebSockets
# Good: Heartbeat for connection health
async def websocket_with_heartbeat(websocket: WebSocket):
await websocket.accept()
async def send_heartbeat():
while True:
await websocket.send_text(json.dumps({"type": "heartbeat"}))
await asyncio.sleep(30)
heartbeat_task = asyncio.create_task(send_heartbeat())
try:
while True:
data = await websocket.receive_text()
await handle_message(data)
except:
heartbeat_task.cancel()
SSE
# Good: Proper error handling and reconnection
"""
Client-side:
"""
const eventSource = new EventSource('/stream');
// Exponential backoff for reconnection
eventSource.onerror = (error) => {
eventSource.close();
// Wait and retry
setTimeout(() => {
eventSource = new EventSource('/stream');
}, Math.min(1000 * Math.pow(2, retryCount), 30000));
};
"""
# Server-side: Graceful shutdown
async def event_stream():
try:
while True:
yield f"data: {json.dumps(get_data())}\n\n"
await asyncio.sleep(1)
except asyncio.CancelledError:
yield f"data: {json.dumps({'status': 'closing'})}\n\n"
raise
Conclusion
Choose the right real-time technology:
- WebSockets: Bidirectional, browser-based apps
- SSE: Simple server-to-client streaming
- gRPC Streaming: High-performance microservices
Comments