Introduction
Real-time communication enables instant updates, live collaboration, and interactive experiences. This guide covers WebSocket implementation patterns, from native APIs to managed solutions.
Real-Time Options Comparison
┌─────────────────────────────────────────────────────────────┐
│ Real-Time Technologies │
├─────────────────────────────────────────────────────────────┤
│ │
│ WebSocket │
│ • Full-duplex communication │
│ • Bidirectional messages │
│ • Native browser support │
│ │
│ Socket.io │
│ • Abstraction over WebSocket │
│ • Fallbacks, rooms, auto-reconnect │
│ │
│ Server-Sent Events (SSE) │
│ • One-way (server to client) │
│ • Simple, HTTP-based │
│ • Great for notifications │
│ │
│ Managed Services │
│ • Pusher, Ably, Supabase Realtime │
│ • Don't manage infrastructure │
│ • Scale automatically │
│ │
└─────────────────────────────────────────────────────────────┘
WebSocket Protocol Internals
The WebSocket Handshake
The connection begins as an HTTP upgrade request:
GET /ws/chat HTTP/1.1
Host: api.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Key Generation
import base64
import hashlib
client_key = "dGhlIHNhbXBsZSBub25jZQ=="
magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
combined = client_key + magic_string
accept = base64.b64encode(hashlib.sha1(combined.encode()).digest()).decode()
Frame Structure
WebSocket frames have a binary format with the following fields:
| Field | Bits | Description |
|---|---|---|
| FIN | 1 | Final fragment indicator |
| RSV1-3 | 3 | Reserved for extensions |
| Opcode | 4 | Frame type (text=1, binary=2, close=8, ping=9, pong=0xA) |
| MASK | 1 | Whether payload is masked (client frames must be) |
| Payload Length | 7 | Length of payload (7, 16, or 64 bits) |
| Masking Key | 0/4 | Key for unmasking (present if MASK=1) |
| Payload | Variable | The actual message data |
// Frame examples
const textFrame = Buffer.from([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); // "Hello"
const closeFrame = Buffer.from([0x88, 0x00]);
const pingFrame = Buffer.from([0x89, 0x00]);
HTTP vs WebSocket
| Feature | HTTP | WebSocket |
|---|---|---|
| Connection | Request-Response | Persistent |
| Direction | Half-duplex | Full-duplex |
| Overhead | Headers every request | Minimal after handshake |
| Server-initiated | Requires polling | Native support |
| Stateful | Stateless | Stateful |
Native WebSockets
Server Implementation
// Node.js WebSocket server
import { WebSocketServer, WebSocket } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
interface Client {
ws: WebSocket;
userId?: string;
}
const clients = new Map<WebSocket, Client>();
wss.on('connection', (ws, req) => {
// Store client
clients.set(ws, { ws });
ws.on('message', (message) => {
const data = JSON.parse(message.toString());
switch (data.type) {
case 'auth':
// Authenticate user
const client = clients.get(ws);
if (client) client.userId = data.userId;
break;
case 'message':
// Broadcast to room
broadcastToRoom(data.room, {
type: 'message',
content: data.content,
from: clients.get(ws)?.userId,
});
break;
case 'join':
// Join room
ws.send(JSON.stringify({ type: 'joined', room: data.room }));
break;
}
});
ws.on('close', () => {
clients.delete(ws);
});
});
function broadcastToRoom(room: string, message: object) {
const msg = JSON.stringify(message);
clients.forEach((client) => {
client.ws.send(msg);
});
}
Client Implementation
// Client WebSocket
class RealTimeClient {
private ws: WebSocket;
private handlers = new Map<string, (data: any) => void>();
constructor(url: string) {
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('Connected');
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
const handler = this.handlers.get(data.type);
if (handler) handler(data);
};
}
on(type: string, handler: (data: any) => void) {
this.handlers.set(type, handler);
}
send(type: string, data: any) {
this.ws.send(JSON.stringify({ type, ...data }));
}
}
// Usage
const client = new RealTimeClient('ws://localhost:8080');
client.on('message', (data) => {
console.log('New message:', data.content);
});
client.send('auth', { token: 'user-token' });
Socket.io
Server Setup
// Socket.io server
import { Server } from 'socket.io';
import { createServer } from 'http';
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: '*',
},
});
io.on('connection', (socket) => {
console.log('Client connected:', socket.id);
// Join room
socket.on('join-room', (roomId: string) => {
socket.join(roomId);
socket.to(roomId).emit('user-joined', socket.id);
});
// Leave room
socket.on('leave-room', (roomId: string) => {
socket.leave(roomId);
});
// Send message to room
socket.on('chat-message', (data: { room: string; message: string }) => {
io.to(data.room).emit('chat-message', {
message: data.message,
sender: socket.id,
timestamp: Date.now(),
});
});
// Private message
socket.on('private-message', (data: { to: string; message: string }) => {
io.to(data.to).emit('private-message', {
message: data.message,
from: socket.id,
});
});
socket.on('disconnect', () => {
console.log('Client disconnected:', socket.id);
});
});
httpServer.listen(3000);
Client Usage
// Socket.io client
import { io, Socket } from 'socket.io-client';
const socket: Socket = io('http://localhost:3000', {
auth: { token: 'user-token' },
transports: ['websocket'], // Force WebSocket
});
socket.on('connect', () => {
console.log('Connected:', socket.id);
});
// Join room
socket.emit('join-room', 'room-123');
// Listen for messages
socket.on('chat-message', (data) => {
console.log('New message:', data.message);
});
// Disconnect
socket.on('disconnect', () => {
console.log('Disconnected');
});
Server-Sent Events (SSE)
Simple Implementation
// Express SSE endpoint
app.get('/api/events', (req, res) => {
// Set headers for SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders();
// Send initial message
res.write('data: {"status": "connected"}\n\n');
// Send updates periodically
const interval = setInterval(() => {
const data = JSON.stringify({
time: new Date().toISOString(),
users: getOnlineUsers()
});
res.write(`data: ${data}\n\n`);
}, 5000);
// Clean up on close
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
Client Implementation
// SSE client
const eventSource = new EventSource('/api/events');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Update:', data);
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};
// Clean up
// eventSource.close();
Scaling WebSockets
Redis Adapter
// Scale with Redis (Socket.io)
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
const io = new Server({
adapter: createAdapter(pubClient, subClient),
});
// Now handles multiple server instances
Connection Management
# Production considerations
production:
load_balancing:
- "Sticky sessions required"
- "Redis for state"
monitoring:
- "Track connected clients"
- "Alert on disconnections"
- "Measure message throughput"
security:
- "WSS (WebSocket Secure)"
- "Authentication on connect"
- "Rate limiting"
Managed Solutions
Supabase Realtime
// Supabase Realtime
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(
process.env.SUPABASE_URL!,
process.env.SUPABASE_KEY!
);
// Subscribe to database changes
const channel = supabase
.channel('posts')
.on('postgres_changes', {
event: '*',
schema: 'public',
table: 'posts',
}, (payload) => {
console.log('Change received!', payload);
})
.subscribe();
// Unsubscribe
channel.unsubscribe();
Message Protocol Design
Message Format Standard
{
"type": "message_type",
"payload": { },
"metadata": {
"message_id": "uuid",
"timestamp": "ISO8601",
"correlation_id": "uuid",
"client_id": "string"
}
}
Common Message Types
MESSAGE_TYPES = {
# Connection
"auth": "Authentication request",
"auth_response": "Authentication response",
"connection_established": "Connection confirmed",
# Rooms
"join_room": "Join a room/channel",
"leave_room": "Leave a room/channel",
"room_joined": "Successfully joined",
# Messaging
"message": "Chat message",
"typing_start": "User started typing",
"typing_stop": "User stopped typing",
# Presence
"presence_update": "Online/offline status",
# System
"ping": "Keep-alive ping",
"pong": "Keep-alive pong",
"error": "Error notification"
}
Scalable Architecture
Horizontal Scaling with Redis
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
const io = new Server({
adapter: createAdapter(pubClient, subClient),
});
Nginx WebSocket Proxy
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket_backend {
server ws1.example.com:8080;
server ws2.example.com:8080;
}
server {
location /ws/ {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_read_timeout 3600s;
}
}
Security
Connection Security
wss.on('connection', (ws, req) => {
// Token-based authentication
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get('token');
if (!token) {
ws.close(4001, 'Authentication required');
return;
}
const user = validateToken(token);
if (!user) {
ws.close(4002, 'Invalid token');
return;
}
ws.user = user;
});
Origin Validation
const ALLOWED_ORIGINS = ['https://app.example.com', 'https://admin.example.com'];
wss.on('connection', (ws, req) => {
const origin = req.headers.origin;
if (!ALLOWED_ORIGINS.includes(origin)) {
ws.close(4003, 'Origin not allowed');
return;
}
});
Rate Limiting
const rateLimitMap = new Map();
function checkRateLimit(clientId, maxMessages = 100, windowMs = 60000) {
const now = Date.now();
const clientData = rateLimitMap.get(clientId);
if (!clientData || now > clientData.resetTime) {
rateLimitMap.set(clientId, { count: 1, resetTime: now + windowMs });
return true;
}
if (clientData.count >= maxMessages) return false;
clientData.count++;
return true;
}
Error Codes
| Code | Description |
|---|---|
| 4000 | Unknown error |
| 4001 | Authentication required |
| 4002 | Authentication failed |
| 4003 | Origin not allowed |
| 4004 | Rate limit exceeded |
| 4005 | Message too large |
Production Monitoring
import prometheus_client
MESSAGES_SENT = Counter('websocket_messages_sent_total', 'Total messages sent')
CONNECTIONS_ACTIVE = Gauge('websocket_connections_active', 'Active connections')
async def monitored_handler(websocket):
CONNECTIONS_ACTIVE.inc()
try:
async for message in websocket:
MESSAGES_RECEIVED.inc()
finally:
CONNECTIONS_ACTIVE.dec()
Monitor connection health with heartbeats. Set up alerts for abnormal disconnection rates and high message latency. Log all authentication failures for security auditing.
Key Takeaways
- Native WebSocket - Full control, no dependencies
- Socket.io - Best DX, great features, automatic fallbacks
- SSE - Simpler for server-to-client only
- Managed services - Don’t manage infrastructure
Comments