Introduction
Webhooks enable event-driven communication between applications. Instead of polling for updates, your application receives real-time notifications when events occur. This article covers webhook implementation patterns, security, reliability, and best practices.
Webhook Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Webhook Architecture โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Provider (e.g., Stripe, GitHub) โ
โ โ โ
โ โ Event occurs โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Event Queue โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ
โ โ Process โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ Webhook Sender โโโโโโโโโโโโโถ HTTP POST โ
โ โโโโโโโโโโโโโโโโโโโ (Delivery) โ
โ โ โ
โ โผ โ
โ Consumer Subscriber โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ HTTP Endpoint โโโโโโ Your Server โ โ
โ โ /webhooks โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโ โ - Verify signature โ โ
โ โ - Process event โ โ
โ โ - Store securely โ โ
โ โ - Respond 200 OK โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Webhook Implementation
Python Webhook Server
from flask import Flask, request, jsonify
import hmac
import hashlib
import time
import logging
from dataclasses import dataclass
from typing import Callable, Dict, Any
import threading
app = Flask(__name__)
@dataclass
class WebhookEvent:
"""Parsed webhook event."""
event_id: str
event_type: str
data: Dict[str, Any]
timestamp: float
raw_payload: dict
class WebhookProcessor:
"""Process and verify webhooks."""
def __init__(self, secret: str):
self.secret = secret
self.event_handlers: Dict[str, Callable] = {}
self.events = []
self.lock = threading.Lock()
def verify_signature(self, payload: bytes, signature: str) -> bool:
"""Verify webhook signature."""
if not signature:
return False
expected = hmac.new(
self.secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def parse_event(self, payload: dict, headers: dict) -> WebhookEvent:
"""Parse webhook payload."""
return WebhookEvent(
event_id=payload.get("id", ""),
event_type=payload.get("type", ""),
data=payload.get("data", {}),
timestamp=time.time(),
raw_payload=payload
)
def register_handler(self, event_type: str, handler: Callable):
"""Register event handler."""
self.event_handlers[event_type] = handler
def process(self, event: WebhookEvent) -> bool:
"""Process webhook event."""
with self.lock:
self.events.append(event)
handler = self.event_handlers.get(event.event_type)
if handler:
try:
handler(event)
return True
except Exception as e:
logging.error(f"Handler error: {e}")
return False
return False
# Initialize processor
processor = WebhookProcessor(secret="your_webhook_secret")
# Event handlers
@processor.register_handler("payment.succeeded")
def handle_payment_succeeded(event: WebhookEvent):
logging.info(f"Payment succeeded: {event.data.get('payment_id')}")
# Update database, send notification, etc.
@processor.register_handler("payment.failed")
def handle_payment_failed(event: WebhookEvent):
logging.warning(f"Payment failed: {event.data.get('payment_id')}")
# Handle failed payment
@processor.register_handler("order.created")
def handle_order_created(event: WebhookEvent):
logging.info(f"Order created: {event.data.get('order_id')}")
# Flask routes
@app.route("/webhooks", methods=["POST"])
def receive_webhook():
"""Receive and process webhook."""
signature = request.headers.get("X-Webhook-Signature")
payload = request.get_data()
# Verify signature
if not processor.verify_signature(payload, signature):
logging.warning("Invalid webhook signature")
return jsonify({"error": "Invalid signature"}), 401
# Parse event
try:
data = request.get_json()
event = processor.parse_event(data, dict(request.headers))
except Exception as e:
logging.error(f"Failed to parse webhook: {e}")
return jsonify({"error": "Invalid payload"}), 400
# Process event
success = processor.process(event)
if success:
return jsonify({"status": "received"}), 200
else:
return jsonify({"status": "processed"}), 200
@app.route("/webhooks/events", methods=["GET"])
def list_events():
"""List processed events."""
with processor.lock:
return jsonify([
{
"id": e.event_id,
"type": e.event_type,
"timestamp": e.timestamp
}
for e in processor.events[-100:]
])
Webhook Client/Sender
import requests
import hmac
import hashlib
import json
import time
from typing import Dict, Any, Optional
from dataclasses import dataclass
import logging
@dataclass
class WebhookDelivery:
"""Webhook delivery attempt."""
event_id: str
url: str
payload: dict
status_code: Optional[int] = None
response_body: Optional[str] = None
attempts: int = 0
class WebhookClient:
"""Client for sending webhooks."""
def __init__(self, secret: str):
self.secret = secret
self.deliveries: Dict[str, WebhookDelivery] = {}
def create_signature(self, payload: bytes) -> str:
"""Create webhook signature."""
return "sha256=" + hmac.new(
self.secret.encode(),
payload,
hashlib.sha256
).hexdigest()
def send(self, url: str, event_type: str, data: Dict[str, Any],
event_id: str = None) -> WebhookDelivery:
"""Send webhook."""
if event_id is None:
event_id = f"evt_{int(time.time() * 1000)}"
payload = {
"id": event_id,
"type": event_type,
"timestamp": int(time.time()),
"data": data
}
payload_bytes = json.dumps(payload).encode()
signature = self.create_signature(payload_bytes)
delivery = WebhookDelivery(
event_id=event_id,
url=url,
payload=payload
)
max_retries = 3
retry_delay = 1
for attempt in range(max_retries):
delivery.attempts = attempt + 1
try:
response = requests.post(
url,
data=payload_bytes,
headers={
"Content-Type": "application/json",
"X-Webhook-Signature": signature,
"X-Webhook-Event-Id": event_id,
},
timeout=30
)
delivery.status_code = response.status_code
delivery.response_body = response.text[:500]
if response.status_code == 200:
logging.info(f"Webhook delivered: {event_id}")
break
else:
logging.warning(
f"Webhook failed: {response.status_code} - {response.text[:200]}"
)
except requests.RequestException as e:
logging.error(f"Webhook request error: {e}")
delivery.response_body = str(e)
if attempt < max_retries - 1:
time.sleep(retry_delay * (2 ** attempt))
self.deliveries[event_id] = delivery
return delivery
# Usage
client = WebhookClient(secret="recipient_secret")
# Send webhook
delivery = client.send(
url="https://example.com/webhooks",
event_type="payment.succeeded",
data={
"payment_id": "pay_123",
"amount": 1000,
"currency": "USD"
}
)
print(f"Delivered: {delivery.status_code == 200}")
print(f"Attempts: {delivery.attempts}")
Retry Logic
import asyncio
from typing import Callable
import logging
class WebhookRetry:
"""Handle webhook retries with exponential backoff."""
def __init__(self, max_retries: int = 5, base_delay: float = 1.0):
self.max_retries = max_retries
self.base_delay = base_delay
async def deliver(self, url: str, payload: dict,
signature: str) -> bool:
"""Deliver webhook with retry."""
for attempt in range(self.max_retries):
try:
response = await self._send(url, payload, signature)
if response.status == 200:
logging.info(f"Webhook delivered on attempt {attempt + 1}")
return True
# Non-retryable error
if response.status >= 400 and response.status < 500:
logging.error(f"Non-retryable: {response.status}")
return False
except Exception as e:
logging.warning(f"Attempt {attempt + 1} failed: {e}")
# Exponential backoff
delay = self.base_delay * (2 ** attempt)
await asyncio.sleep(delay)
logging.error(f"All {self.max_retries} attempts failed")
return False
async def _send(self, url: str, payload: dict, signature: str):
"""Send webhook (placeholder)."""
# Implementation here
pass
# Dead letter queue for failed webhooks
class WebhookDeadLetterQueue:
"""Store failed webhooks for manual review."""
def __init__(self, storage_backend):
self.storage = storage_backend
async def add(self, event_id: str, payload: dict,
attempts: int, last_error: str):
"""Add failed webhook to DLQ."""
await self.storage.insert("webhook_dlq", {
"event_id": event_id,
"payload": payload,
"attempts": attempts,
"last_error": last_error,
"created_at": "NOW()"
})
async def retry(self, event_id: str):
"""Retry webhook from DLQ."""
# Implementation here
pass
Security Best Practices
import secrets
class SecureWebhookConfig:
"""Security configuration for webhooks."""
# 1. Use HMAC signatures
@staticmethod
def verify_hmac_signature(payload: bytes, signature: str,
secret: str) -> bool:
expected = "sha256=" + hmac.new(
secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
# 2. Verify source IP (if provider supports it)
@staticmethod
def verify_source_ip(allowed_ips: list, request_ip: str) -> bool:
return request_ip in allowed_ips
# 3. Use request IDs for deduplication
@staticmethod
def deduplicate(request_id: str, storage) -> bool:
"""Check and store request ID."""
key = f"webhook:seen:{request_id}"
if storage.exists(key):
return False # Already processed
storage.setex(key, 86400, "1") # 24 hour TTL
return True
# 4. Validate payload schema
@staticmethod
def validate_schema(payload: dict, schema: dict) -> bool:
"""Validate webhook payload against schema."""
# Use jsonschema or similar
pass
# Rate limiting for webhook endpoints
from functools import wraps
def rate_limit_webhooks(max_requests: int = 100, window: int = 60):
"""Rate limit webhook endpoint."""
requests_cache = {}
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
ip = request.remote_addr
now = time.time()
# Clean old entries
requests_cache[ip] = [
t for t in requests_cache.get(ip, [])
if now - t < window
]
# Check limit
if len(requests_cache.get(ip, [])) >= max_requests:
return jsonify({"error": "Rate limited"}), 429
requests_cache.setdefault(ip, []).append(now)
return f(*args, **kwargs)
return wrapper
return decorator
Conclusion
Webhooks enable powerful event-driven integrations. Key practices: always verify signatures, implement retry logic with exponential backoff, handle deduplication, and use HTTPS exclusively.
Comments