Skip to main content
โšก Calmops

Building Real-Time Applications: WebSockets and Server-Sent Events

Introduction

Real-time functionality is expected in modern applications. From live notifications to collaborative editing, users expect instant updates. This guide covers WebSockets, Server-Sent Events, and patterns for building responsive real-time applications.

Real-time communication enables immediate data exchange between client and server, creating interactive experiences that keep users engaged.

Communication Patterns

Pattern Comparison

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚            Real-Time Communication Patterns                   โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚  Polling:         Client โ”€โ”€โ”€โ”€โ”€โ–บ Server (wait)              โ”‚
โ”‚                    Client โ—„โ”€โ”€โ”€โ”€โ”€ Server                    โ”‚
โ”‚                    (repeat)                                 โ”‚
โ”‚                                                             โ”‚
โ”‚  Long Polling:    Client โ”€โ”€โ”€โ”€โ”€โ–บ Server (wait for data)      โ”‚
โ”‚                    Client โ—„โ”€โ”€โ”€โ”€โ”€ Server                    โ”‚
โ”‚                    Client โ”€โ”€โ”€โ”€โ”€โ–บ Server (repeat)            โ”‚
โ”‚                                                             โ”‚
โ”‚  WebSockets:      Client โ—„โ”€โ”€โ”€โ–บ Server (persistent)          โ”‚
โ”‚                    Bi-directional, real-time                โ”‚
โ”‚                                                             โ”‚
โ”‚  SSE:             Client โ”€โ”€โ”€โ”€โ”€โ–บ Server                      โ”‚
โ”‚                    Client โ—„โ”€โ”€โ”€โ”€โ”€ Server (stream)           โ”‚
โ”‚                    Server-initiated updates                 โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
Pattern Use Case Pros Cons
Polling Simple updates Easy Inefficient
Long Polling Chat Better than polling Complex
WebSockets Bidirectional Real-time, efficient Stateful
SSE Serverโ†’Client Simple, HTTP/2 One-way

WebSockets

Server Implementation

# Python websockets
import asyncio
import json
from websockets import serve

async def echo(websocket):
    async for message in websocket:
        data = json.loads(message)
        
        # Process message
        response = {"type": "response", "data": data}
        
        await websocket.send(json.dumps(response))

async def main():
    async with serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

asyncio.run(main())

Chat Application

# WebSocket chat server
import asyncio
import json
from websockets import serve
from collections import defaultdict

# Connected clients
clients = defaultdict(set)

async def chat(websocket, path):
    client_id = id(websocket)
    
    # Parse initial message for room
    try:
        init_data = await websocket.recv()
        init = json.loads(init_data)
        room = init.get("room", "global")
    except:
        room = "global"
    
    clients[room].add(websocket)
    
    try:
        async for message in websocket:
            data = json.loads(message)
            msg_type = data.get("type")
            
            if msg_type == "chat":
                # Broadcast to room
                broadcast_message = json.dumps({
                    "type": "chat",
                    "user": data.get("user"),
                    "message": data.get("message"),
                    "timestamp": data.get("timestamp")
                })
                
                await broadcast(room, broadcast_message)
                
            elif msg_type == "typing":
                # Broadcast typing indicator
                await broadcast(room, json.dumps({
                    "type": "typing",
                    "user": data.get("user")
                }))
                
    finally:
        clients[room].discard(websocket)

async def broadcast(room, message):
    if room in clients:
        # Remove disconnected clients
        active_clients = [c for c in clients[room] if c.open]
        await asyncio.gather(
            *[c.send(message) for c in active_clients],
            return_exceptions=True
        )

# Run server
asyncio.run(serve(chat, "localhost", 8765))

Client Implementation

// WebSocket client
class ChatClient {
  constructor(url) {
    this.ws = new WebSocket(url);
    this.setupHandlers();
  }
  
  setupHandlers() {
    this.ws.onopen = () => {
      console.log('Connected');
      this.send({ type: 'join', room: 'general' });
    };
    
    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      this.handleMessage(message);
    };
    
    this.ws.onclose = () => {
      console.log('Disconnected, reconnecting...');
      setTimeout(() => this.reconnect(), 1000);
    };
    
    this.ws.onerror = (error) => {
      console.error('Error:', error);
    };
  }
  
  handleMessage(message) {
    switch (message.type) {
      case 'chat':
        this.displayMessage(message);
        break;
      case 'typing':
        this.showTypingIndicator(message.user);
        break;
    }
  }
  
  send(data) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data));
    }
  }
  
  sendMessage(text) {
    this.send({
      type: 'chat',
      user: this.userName,
      message: text,
      timestamp: Date.now()
    });
  }
}

Server-Sent Events (SSE)

SSE Server

# Flask SSE
from flask import Flask, Response, stream_with_context
import time
import json

app = Flask(__name__)

@app.route('/events')
def events():
    def generate():
        while True:
            # Send event
            data = json.dumps({
                'message': 'Update',
                'timestamp': time.time()
            })
            yield f"data: {data}\n\n"
            
            # Wait before next event
            time.sleep(1)
    
    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no'
        }
    )

# With authentication
@app.route('/events/authenticated')
def events_authenticated():
    # Verify user session
    if not current_user.is_authenticated:
        return Response('Unauthorized', status=401)
    
    def generate():
        # Stream events for authenticated user
        for event in get_user_events(current_user.id):
            yield f"data: {json.dumps(event)}\n\n"
    
    return Response(generate(), mimetype='text/event-stream')

SSE Client

// SSE client
class EventSource {
  constructor(url) {
    this.eventSource = new EventSource(url);
    this.setupHandlers();
  }
  
  setupHandlers() {
    this.eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      console.log('Message:', data);
    };
    
    this.eventSource.addEventListener('update', (event) => {
      const data = JSON.parse(event.data);
      this.handleUpdate(data);
    });
    
    this.eventSource.onerror = () => {
      console.error('Connection lost, retrying...');
    };
  }
  
  handleUpdate(data) {
    // Handle specific update
  }
  
  close() {
    this.eventSource.close();
  }
}

// Using it
const events = new EventSource('/api/events');

WebSocket vs SSE

When to Use WebSockets

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 Use WebSockets When:                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚  โ€ข Chat applications                                         โ”‚
โ”‚  โ€ข Real-time gaming                                         โ”‚
โ”‚  โ€ข Collaborative editing                                     โ”‚
โ”‚  โ€ข Bidirectional data flow needed                           โ”‚
โ”‚  โ€ข High-frequency updates                                   โ”‚
โ”‚  โ€ข Client needs to send frequent updates                    โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

When to Use SSE

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Use SSE When:                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚  โ€ข Notifications                                             โ”‚
โ”‚  โ€ข Live feeds (Twitter, news)                               โ”‚
โ”‚  โ€ข Dashboard updates                                        โ”‚
โ”‚  โ€ข Progress updates                                          โ”‚
โ”‚  โ€ข Simple serverโ†’client updates                             โ”‚
โ”‚  โ€ข Need HTTP/2 multiplexing                                 โ”‚
โ”‚  โ€ข Behind corporate firewalls                                โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Scaling WebSockets

Connection Management

# Redis-backed WebSocket manager
import asyncio
import json
from redis.asyncio import Redis

class WebSocketManager:
    def __init__(self, redis_url):
        self.redis = Redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
    
    async def publish(self, channel, message):
        """Publish message to channel."""
        await self.redis.publish(channel, json.dumps(message))
    
    async def subscribe(self, channel):
        """Subscribe to channel."""
        await self.pubsub.subscribe(channel)
    
    async def listen(self, channel):
        """Listen for messages."""
        async for message in self.pubsub.listen():
            if message['type'] == 'message':
                yield json.loads(message['data'])

Horizontal Scaling

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              WebSocket Scaling Architecture                   โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                             โ”‚
โ”‚         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                     โ”‚
โ”‚         โ”‚  Load Balancerโ”‚                                    โ”‚
โ”‚         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                      โ”‚
โ”‚                โ”‚                                             โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                โ”‚
โ”‚    โ–ผ           โ–ผ           โ–ผ                                 โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”                             โ”‚
โ”‚ โ”‚ WS   โ”‚   โ”‚ WS   โ”‚   โ”‚ WS   โ”‚                             โ”‚
โ”‚ โ”‚Serverโ”‚   โ”‚Serverโ”‚   โ”‚Serverโ”‚                             โ”‚
โ”‚ โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”˜                             โ”‚
โ”‚    โ”‚          โ”‚          โ”‚                                   โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                   โ”‚
โ”‚               โ–ผ                                              โ”‚
โ”‚        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                          โ”‚
โ”‚        โ”‚  Redis   โ”‚                                          โ”‚
โ”‚        โ”‚ Pub/Sub  โ”‚                                          โ”‚
โ”‚        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                          โ”‚
โ”‚                                                             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Security

Authentication

# WebSocket authentication
async def authenticated_websocket(websocket, path):
    # Get token from query string
    query = urllib.parse.parse_qs(path.split('?')[1])
    token = query.get('token', [None])[0]
    
    if not token:
        await websocket.close(4001, "Authentication required")
        return
    
    # Verify token
    user = await verify_token(token)
    if not user:
        await websocket.close(4002, "Invalid token")
        return
    
    # Continue with authenticated connection
    await handle_connection(websocket, user)

Rate Limiting

# Rate limiting WebSocket messages
class RateLimitedWebSocket:
    def __init__(self, websocket, max_messages=10, window=1):
        self.ws = websocket
        self.max_messages = max_messages
        self.window = window
        self.messages = []
    
    async def send(self, data):
        now = time.time()
        
        # Remove old messages
        self.messages = [m for m in self.messages if now - m < self.window]
        
        if len(self.messages) >= self.max_messages:
            raise RateLimitError("Too many messages")
        
        self.messages.append(now)
        await self.ws.send(data)

Best Practices

  1. Use WebSockets for bidirectional: Chat, games, collaboration
  2. Use SSE for unidirectional: Notifications, feeds
  3. Implement reconnection logic: Handle network issues
  4. Add heartbeat/ping-pong: Detect dead connections
  5. Scale with Redis: Share state across instances
  6. Secure with TLS: Always use wss://
  7. Implement rate limiting: Prevent abuse

Conclusion

Real-time applications require careful choice of communication pattern. WebSockets excel at bidirectional, high-frequency communication, while SSE provides simpler server-to-client updates. Choose based on your specific requirements and scale accordingly.

Comments