Introduction
Real-time functionality is expected in modern applications. From live notifications to collaborative editing, users expect instant updates. This guide covers WebSockets, Server-Sent Events, and patterns for building responsive real-time applications.
Real-time communication enables immediate data exchange between client and server, creating interactive experiences that keep users engaged.
Communication Patterns
Pattern Comparison
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Real-Time Communication Patterns โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Polling: Client โโโโโโบ Server (wait) โ
โ Client โโโโโโ Server โ
โ (repeat) โ
โ โ
โ Long Polling: Client โโโโโโบ Server (wait for data) โ
โ Client โโโโโโ Server โ
โ Client โโโโโโบ Server (repeat) โ
โ โ
โ WebSockets: Client โโโโโบ Server (persistent) โ
โ Bi-directional, real-time โ
โ โ
โ SSE: Client โโโโโโบ Server โ
โ Client โโโโโโ Server (stream) โ
โ Server-initiated updates โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
| Pattern | Use Case | Pros | Cons |
|---|---|---|---|
| Polling | Simple updates | Easy | Inefficient |
| Long Polling | Chat | Better than polling | Complex |
| WebSockets | Bidirectional | Real-time, efficient | Stateful |
| SSE | ServerโClient | Simple, HTTP/2 | One-way |
WebSockets
Server Implementation
# Python websockets
import asyncio
import json
from websockets import serve
async def echo(websocket):
async for message in websocket:
data = json.loads(message)
# Process message
response = {"type": "response", "data": data}
await websocket.send(json.dumps(response))
async def main():
async with serve(echo, "localhost", 8765):
await asyncio.Future() # run forever
asyncio.run(main())
Chat Application
# WebSocket chat server
import asyncio
import json
from websockets import serve
from collections import defaultdict
# Connected clients
clients = defaultdict(set)
async def chat(websocket, path):
client_id = id(websocket)
# Parse initial message for room
try:
init_data = await websocket.recv()
init = json.loads(init_data)
room = init.get("room", "global")
except:
room = "global"
clients[room].add(websocket)
try:
async for message in websocket:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "chat":
# Broadcast to room
broadcast_message = json.dumps({
"type": "chat",
"user": data.get("user"),
"message": data.get("message"),
"timestamp": data.get("timestamp")
})
await broadcast(room, broadcast_message)
elif msg_type == "typing":
# Broadcast typing indicator
await broadcast(room, json.dumps({
"type": "typing",
"user": data.get("user")
}))
finally:
clients[room].discard(websocket)
async def broadcast(room, message):
if room in clients:
# Remove disconnected clients
active_clients = [c for c in clients[room] if c.open]
await asyncio.gather(
*[c.send(message) for c in active_clients],
return_exceptions=True
)
# Run server
asyncio.run(serve(chat, "localhost", 8765))
Client Implementation
// WebSocket client
class ChatClient {
constructor(url) {
this.ws = new WebSocket(url);
this.setupHandlers();
}
setupHandlers() {
this.ws.onopen = () => {
console.log('Connected');
this.send({ type: 'join', room: 'general' });
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onclose = () => {
console.log('Disconnected, reconnecting...');
setTimeout(() => this.reconnect(), 1000);
};
this.ws.onerror = (error) => {
console.error('Error:', error);
};
}
handleMessage(message) {
switch (message.type) {
case 'chat':
this.displayMessage(message);
break;
case 'typing':
this.showTypingIndicator(message.user);
break;
}
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
sendMessage(text) {
this.send({
type: 'chat',
user: this.userName,
message: text,
timestamp: Date.now()
});
}
}
Server-Sent Events (SSE)
SSE Server
# Flask SSE
from flask import Flask, Response, stream_with_context
import time
import json
app = Flask(__name__)
@app.route('/events')
def events():
def generate():
while True:
# Send event
data = json.dumps({
'message': 'Update',
'timestamp': time.time()
})
yield f"data: {data}\n\n"
# Wait before next event
time.sleep(1)
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
# With authentication
@app.route('/events/authenticated')
def events_authenticated():
# Verify user session
if not current_user.is_authenticated:
return Response('Unauthorized', status=401)
def generate():
# Stream events for authenticated user
for event in get_user_events(current_user.id):
yield f"data: {json.dumps(event)}\n\n"
return Response(generate(), mimetype='text/event-stream')
SSE Client
// SSE client
class EventSource {
constructor(url) {
this.eventSource = new EventSource(url);
this.setupHandlers();
}
setupHandlers() {
this.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Message:', data);
};
this.eventSource.addEventListener('update', (event) => {
const data = JSON.parse(event.data);
this.handleUpdate(data);
});
this.eventSource.onerror = () => {
console.error('Connection lost, retrying...');
};
}
handleUpdate(data) {
// Handle specific update
}
close() {
this.eventSource.close();
}
}
// Using it
const events = new EventSource('/api/events');
WebSocket vs SSE
When to Use WebSockets
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Use WebSockets When: โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โข Chat applications โ
โ โข Real-time gaming โ
โ โข Collaborative editing โ
โ โข Bidirectional data flow needed โ
โ โข High-frequency updates โ
โ โข Client needs to send frequent updates โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
When to Use SSE
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Use SSE When: โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โข Notifications โ
โ โข Live feeds (Twitter, news) โ
โ โข Dashboard updates โ
โ โข Progress updates โ
โ โข Simple serverโclient updates โ
โ โข Need HTTP/2 multiplexing โ
โ โข Behind corporate firewalls โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Scaling WebSockets
Connection Management
# Redis-backed WebSocket manager
import asyncio
import json
from redis.asyncio import Redis
class WebSocketManager:
def __init__(self, redis_url):
self.redis = Redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
async def publish(self, channel, message):
"""Publish message to channel."""
await self.redis.publish(channel, json.dumps(message))
async def subscribe(self, channel):
"""Subscribe to channel."""
await self.pubsub.subscribe(channel)
async def listen(self, channel):
"""Listen for messages."""
async for message in self.pubsub.listen():
if message['type'] == 'message':
yield json.loads(message['data'])
Horizontal Scaling
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ WebSocket Scaling Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโโโโโ โ
โ โ Load Balancerโ โ
โ โโโโโโโโฌโโโโโโโ โ
โ โ โ
โ โโโโโโโโโโโโโผโโโโโโโโโโโโ โ
โ โผ โผ โผ โ
โ โโโโโโโโ โโโโโโโโ โโโโโโโโ โ
โ โ WS โ โ WS โ โ WS โ โ
โ โServerโ โServerโ โServerโ โ
โ โโโโฌโโโโ โโโโฌโโโโ โโโโฌโโโโ โ
โ โ โ โ โ
โ โโโโโโโโโโโโผโโโโโโโโโโโ โ
โ โผ โ
โ โโโโโโโโโโโโ โ
โ โ Redis โ โ
โ โ Pub/Sub โ โ
โ โโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Security
Authentication
# WebSocket authentication
async def authenticated_websocket(websocket, path):
# Get token from query string
query = urllib.parse.parse_qs(path.split('?')[1])
token = query.get('token', [None])[0]
if not token:
await websocket.close(4001, "Authentication required")
return
# Verify token
user = await verify_token(token)
if not user:
await websocket.close(4002, "Invalid token")
return
# Continue with authenticated connection
await handle_connection(websocket, user)
Rate Limiting
# Rate limiting WebSocket messages
class RateLimitedWebSocket:
def __init__(self, websocket, max_messages=10, window=1):
self.ws = websocket
self.max_messages = max_messages
self.window = window
self.messages = []
async def send(self, data):
now = time.time()
# Remove old messages
self.messages = [m for m in self.messages if now - m < self.window]
if len(self.messages) >= self.max_messages:
raise RateLimitError("Too many messages")
self.messages.append(now)
await self.ws.send(data)
Best Practices
- Use WebSockets for bidirectional: Chat, games, collaboration
- Use SSE for unidirectional: Notifications, feeds
- Implement reconnection logic: Handle network issues
- Add heartbeat/ping-pong: Detect dead connections
- Scale with Redis: Share state across instances
- Secure with TLS: Always use wss://
- Implement rate limiting: Prevent abuse
Conclusion
Real-time applications require careful choice of communication pattern. WebSockets excel at bidirectional, high-frequency communication, while SSE provides simpler server-to-client updates. Choose based on your specific requirements and scale accordingly.
Comments