Skip to main content
โšก Calmops

Webhooks: Event-Driven Integration Patterns

Introduction

Webhooks enable event-driven communication between applications. Instead of polling for updates, your application receives real-time notifications when events occur. This article covers webhook implementation patterns, security, reliability, and best practices.

Webhook Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Webhook Architecture                             โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Provider (e.g., Stripe, GitHub)                               โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ”‚ Event occurs                                             โ”‚
โ”‚       โ–ผ                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                          โ”‚
โ”‚  โ”‚  Event Queue    โ”‚                                          โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                          โ”‚
โ”‚           โ”‚                                                     โ”‚
โ”‚           โ”‚ Process                                              โ”‚
โ”‚           โ–ผ                                                     โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                          โ”‚
โ”‚  โ”‚ Webhook Sender  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ HTTP POST                      โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              (Delivery)                    โ”‚
โ”‚                                        โ”‚                        โ”‚
โ”‚                                        โ–ผ                        โ”‚
โ”‚  Consumer                              Subscriber               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”‚
โ”‚  โ”‚ HTTP Endpoint  โ”‚โ—€โ”€โ”€โ”€โ”‚ Your Server                โ”‚       โ”‚
โ”‚  โ”‚ /webhooks      โ”‚    โ”‚                            โ”‚       โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚ - Verify signature         โ”‚       โ”‚
โ”‚                         โ”‚ - Process event            โ”‚       โ”‚
โ”‚                         โ”‚ - Store securely           โ”‚       โ”‚
โ”‚                         โ”‚ - Respond 200 OK          โ”‚       โ”‚
โ”‚                         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜       โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Webhook Implementation

Python Webhook Server

from flask import Flask, request, jsonify
import hmac
import hashlib
import time
import logging
from dataclasses import dataclass
from typing import Callable, Dict, Any
import threading

app = Flask(__name__)

@dataclass
class WebhookEvent:
    """Parsed webhook event."""
    event_id: str
    event_type: str
    data: Dict[str, Any]
    timestamp: float
    raw_payload: dict

class WebhookProcessor:
    """Process and verify webhooks."""
    
    def __init__(self, secret: str):
        self.secret = secret
        self.event_handlers: Dict[str, Callable] = {}
        self.events = []
        self.lock = threading.Lock()
    
    def verify_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook signature."""
        if not signature:
            return False
        
        expected = hmac.new(
            self.secret.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(f"sha256={expected}", signature)
    
    def parse_event(self, payload: dict, headers: dict) -> WebhookEvent:
        """Parse webhook payload."""
        return WebhookEvent(
            event_id=payload.get("id", ""),
            event_type=payload.get("type", ""),
            data=payload.get("data", {}),
            timestamp=time.time(),
            raw_payload=payload
        )
    
    def register_handler(self, event_type: str, handler: Callable):
        """Register event handler."""
        self.event_handlers[event_type] = handler
    
    def process(self, event: WebhookEvent) -> bool:
        """Process webhook event."""
        with self.lock:
            self.events.append(event)
        
        handler = self.event_handlers.get(event.event_type)
        
        if handler:
            try:
                handler(event)
                return True
            except Exception as e:
                logging.error(f"Handler error: {e}")
                return False
        
        return False

# Initialize processor
processor = WebhookProcessor(secret="your_webhook_secret")

# Event handlers
@processor.register_handler("payment.succeeded")
def handle_payment_succeeded(event: WebhookEvent):
    logging.info(f"Payment succeeded: {event.data.get('payment_id')}")
    # Update database, send notification, etc.

@processor.register_handler("payment.failed")
def handle_payment_failed(event: WebhookEvent):
    logging.warning(f"Payment failed: {event.data.get('payment_id')}")
    # Handle failed payment

@processor.register_handler("order.created")
def handle_order_created(event: WebhookEvent):
    logging.info(f"Order created: {event.data.get('order_id')}")

# Flask routes
@app.route("/webhooks", methods=["POST"])
def receive_webhook():
    """Receive and process webhook."""
    signature = request.headers.get("X-Webhook-Signature")
    payload = request.get_data()
    
    # Verify signature
    if not processor.verify_signature(payload, signature):
        logging.warning("Invalid webhook signature")
        return jsonify({"error": "Invalid signature"}), 401
    
    # Parse event
    try:
        data = request.get_json()
        event = processor.parse_event(data, dict(request.headers))
    except Exception as e:
        logging.error(f"Failed to parse webhook: {e}")
        return jsonify({"error": "Invalid payload"}), 400
    
    # Process event
    success = processor.process(event)
    
    if success:
        return jsonify({"status": "received"}), 200
    else:
        return jsonify({"status": "processed"}), 200

@app.route("/webhooks/events", methods=["GET"])
def list_events():
    """List processed events."""
    with processor.lock:
        return jsonify([
            {
                "id": e.event_id,
                "type": e.event_type,
                "timestamp": e.timestamp
            }
            for e in processor.events[-100:]
        ])

Webhook Client/Sender

import requests
import hmac
import hashlib
import json
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
import logging

@dataclass
class WebhookDelivery:
    """Webhook delivery attempt."""
    event_id: str
    url: str
    payload: dict
    status_code: Optional[int] = None
    response_body: Optional[str] = None
    attempts: int = 0

class WebhookClient:
    """Client for sending webhooks."""
    
    def __init__(self, secret: str):
        self.secret = secret
        self.deliveries: Dict[str, WebhookDelivery] = {}
    
    def create_signature(self, payload: bytes) -> str:
        """Create webhook signature."""
        return "sha256=" + hmac.new(
            self.secret.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
    
    def send(self, url: str, event_type: str, data: Dict[str, Any],
             event_id: str = None) -> WebhookDelivery:
        """Send webhook."""
        if event_id is None:
            event_id = f"evt_{int(time.time() * 1000)}"
        
        payload = {
            "id": event_id,
            "type": event_type,
            "timestamp": int(time.time()),
            "data": data
        }
        
        payload_bytes = json.dumps(payload).encode()
        signature = self.create_signature(payload_bytes)
        
        delivery = WebhookDelivery(
            event_id=event_id,
            url=url,
            payload=payload
        )
        
        max_retries = 3
        retry_delay = 1
        
        for attempt in range(max_retries):
            delivery.attempts = attempt + 1
            
            try:
                response = requests.post(
                    url,
                    data=payload_bytes,
                    headers={
                        "Content-Type": "application/json",
                        "X-Webhook-Signature": signature,
                        "X-Webhook-Event-Id": event_id,
                    },
                    timeout=30
                )
                
                delivery.status_code = response.status_code
                delivery.response_body = response.text[:500]
                
                if response.status_code == 200:
                    logging.info(f"Webhook delivered: {event_id}")
                    break
                else:
                    logging.warning(
                        f"Webhook failed: {response.status_code} - {response.text[:200]}"
                    )
                    
            except requests.RequestException as e:
                logging.error(f"Webhook request error: {e}")
                delivery.response_body = str(e)
            
            if attempt < max_retries - 1:
                time.sleep(retry_delay * (2 ** attempt))
        
        self.deliveries[event_id] = delivery
        return delivery

# Usage
client = WebhookClient(secret="recipient_secret")

# Send webhook
delivery = client.send(
    url="https://example.com/webhooks",
    event_type="payment.succeeded",
    data={
        "payment_id": "pay_123",
        "amount": 1000,
        "currency": "USD"
    }
)

print(f"Delivered: {delivery.status_code == 200}")
print(f"Attempts: {delivery.attempts}")

Retry Logic

import asyncio
from typing import Callable
import logging

class WebhookRetry:
    """Handle webhook retries with exponential backoff."""
    
    def __init__(self, max_retries: int = 5, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    async def deliver(self, url: str, payload: dict, 
                    signature: str) -> bool:
        """Deliver webhook with retry."""
        
        for attempt in range(self.max_retries):
            try:
                response = await self._send(url, payload, signature)
                
                if response.status == 200:
                    logging.info(f"Webhook delivered on attempt {attempt + 1}")
                    return True
                
                # Non-retryable error
                if response.status >= 400 and response.status < 500:
                    logging.error(f"Non-retryable: {response.status}")
                    return False
                    
            except Exception as e:
                logging.warning(f"Attempt {attempt + 1} failed: {e}")
            
            # Exponential backoff
            delay = self.base_delay * (2 ** attempt)
            await asyncio.sleep(delay)
        
        logging.error(f"All {self.max_retries} attempts failed")
        return False
    
    async def _send(self, url: str, payload: dict, signature: str):
        """Send webhook (placeholder)."""
        # Implementation here
        pass

# Dead letter queue for failed webhooks
class WebhookDeadLetterQueue:
    """Store failed webhooks for manual review."""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    async def add(self, event_id: str, payload: dict, 
                 attempts: int, last_error: str):
        """Add failed webhook to DLQ."""
        await self.storage.insert("webhook_dlq", {
            "event_id": event_id,
            "payload": payload,
            "attempts": attempts,
            "last_error": last_error,
            "created_at": "NOW()"
        })
    
    async def retry(self, event_id: str):
        """Retry webhook from DLQ."""
        # Implementation here
        pass

Security Best Practices

import secrets

class SecureWebhookConfig:
    """Security configuration for webhooks."""
    
    # 1. Use HMAC signatures
    @staticmethod
    def verify_hmac_signature(payload: bytes, signature: str, 
                             secret: str) -> bool:
        expected = "sha256=" + hmac.new(
            secret.encode(), payload, hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(expected, signature)
    
    # 2. Verify source IP (if provider supports it)
    @staticmethod
    def verify_source_ip(allowed_ips: list, request_ip: str) -> bool:
        return request_ip in allowed_ips
    
    # 3. Use request IDs for deduplication
    @staticmethod
    def deduplicate(request_id: str, storage) -> bool:
        """Check and store request ID."""
        key = f"webhook:seen:{request_id}"
        
        if storage.exists(key):
            return False  # Already processed
        
        storage.setex(key, 86400, "1")  # 24 hour TTL
        return True
    
    # 4. Validate payload schema
    @staticmethod
    def validate_schema(payload: dict, schema: dict) -> bool:
        """Validate webhook payload against schema."""
        # Use jsonschema or similar
        pass

# Rate limiting for webhook endpoints
from functools import wraps

def rate_limit_webhooks(max_requests: int = 100, window: int = 60):
    """Rate limit webhook endpoint."""
    requests_cache = {}
    
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            ip = request.remote_addr
            now = time.time()
            
            # Clean old entries
            requests_cache[ip] = [
                t for t in requests_cache.get(ip, [])
                if now - t < window
            ]
            
            # Check limit
            if len(requests_cache.get(ip, [])) >= max_requests:
                return jsonify({"error": "Rate limited"}), 429
            
            requests_cache.setdefault(ip, []).append(now)
            return f(*args, **kwargs)
        
        return wrapper
    
    return decorator

Conclusion

Webhooks enable powerful event-driven integrations. Key practices: always verify signatures, implement retry logic with exponential backoff, handle deduplication, and use HTTPS exclusively.

Resources

Comments