Introduction
Network failures, timeouts, and client retries can cause the same operation to be executed multiple times. Without proper idempotency handling, this can lead to duplicate charges, duplicated records, or inconsistent state. Idempotency ensures that executing an operation multiple times produces the same result as executing it once.
Understanding Idempotency
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Idempotency Concepts โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Idempotent Operations: โ
โ โ
โ GET /users/123 โ Always returns user 123 โ
โ DELETE /users/123 โ Deleting user 123 twice = same as once โ
โ PUT /users/123 โ Setting name to "John" (same value) โ
โ โ
โ Non-Idempotent Operations: โ
โ โ
โ POST /orders โ Creating order (each call = new order) โ
โ POST /payments โ Each call = new payment โ
โ DELETE /items/1 โ First call deletes, second returns 404 โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ The Problem: Duplicate Payments โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Client Server Database โ
โ โ โ โ โ
โ โ POST /pay โ โ โ
โ โโโโโโโโโโโโโโโโโโโโถโ โ โ
โ โ โ (processing) โ โ
โ โ (timeout) โ โ โ
โ โโ โโโโโโโโโโโโโโโโถโ โ โ
โ โ โ โ โ
โ โ (retry) โ โ โ
โ โ POST /pay โ โ โ
โ โโโโโโโโโโโโโโโโโโโโถโ โ โ
โ โ โ Process payment โ โ
โ โ โโโโโโโโโโโโโโโโโโโโถโ โ
โ โ โ โ โ
โ โ โ Process payment โ โ
โ โ โโโโโโโโโโโโโโโโโโโโถโ (DUPLICATE!) โ
โ โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโ โ โ
โ โ 200 OK โ โ โ
โ โ
โ Solution: Idempotency Key โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Idempotency Keys
import uuid
import hashlib
import time
from dataclasses import dataclass
from typing import Optional
import threading
@dataclass
class IdempotencyRecord:
"""Record of an idempotent request."""
key: str
response_status: int
response_body: dict
created_at: float
expires_at: float
class IdempotencyStore:
"""Store for idempotency records."""
def __init__(self, ttl_seconds: int = 86400):
self.records = {}
self.lock = threading.RLock()
self.ttl = ttl_seconds
def get(self, key: str) -> Optional[IdempotencyRecord]:
"""Get idempotency record if exists and valid."""
with self.lock:
record = self.records.get(key)
if record and time.time() < record.expires_at:
return record
# Clean up expired
if record:
del self.records[key]
return None
def set(self, key: str, status: int, body: dict):
"""Store idempotency record."""
with self.lock:
now = time.time()
self.records[key] = IdempotencyRecord(
key=key,
response_status=status,
response_body=body,
created_at=now,
expires_at=now + self.ttl
)
def delete(self, key: str):
"""Delete idempotency record."""
with self.lock:
self.records.pop(key, None)
def cleanup_expired(self):
"""Clean up expired records."""
with self.lock:
now = time.time()
expired = [
k for k, v in self.records.items()
if now >= v.expires_at
]
for k in expired:
del self.records[k]
class IdempotentEndpoint:
"""Decorator for idempotent endpoints."""
def __init__(self, store: IdempotencyStore):
self.store = store
def __call__(self, func):
"""Decorator to make endpoint idempotent."""
def wrapper(request, *args, **kwargs):
# Get idempotency key from header
idempotency_key = request.headers.get('Idempotency-Key')
if not idempotency_key:
return {"error": "Idempotency-Key required"}, 400
# Validate key format
if len(idempotency_key) < 16:
return {"error": "Invalid idempotency key"}, 400
# Check for existing request
existing = self.store.get(idempotency_key)
if existing:
return (
existing.response_body,
existing.response_status,
{'X-Idempotent-Replayed': 'true'}
)
# Execute request
try:
result = func(request, *args, **kwargs)
# Store successful result
if isinstance(result, tuple):
body, status = result
else:
body, status = result, 200
self.store.set(idempotency_key, status, body)
return body, status
except Exception as e:
# Don't store failed results
raise
return wrapper
Database-Level Idempotency
import sqlite3
from typing import Optional
import json
import uuid
class IdempotentDatabase:
"""Database operations with idempotency support."""
def __init__(self, db_path: str):
self.conn = sqlite3.connect(db_path)
self._init_tables()
def _init_tables(self):
"""Initialize idempotency table."""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS idempotency_keys (
key TEXT PRIMARY KEY,
result TEXT,
status_code INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def execute_idempotent(self, key: str, operation: callable) -> dict:
"""Execute operation with idempotency check."""
cursor = self.conn.cursor()
# Check for existing result
cursor.execute(
"SELECT result, status_code FROM idempotency_keys WHERE key = ?",
(key,)
)
row = cursor.fetchone()
if row:
return json.loads(row[0]), row[1]
# Execute operation
try:
result = operation()
self.conn.commit()
# Store result
self.conn.execute(
"INSERT INTO idempotency_keys (key, result, status_code) VALUES (?, ?, ?)",
(key, json.dumps(result[0]), result[1])
)
self.conn.commit()
return result
except Exception as e:
self.conn.rollback()
raise
def execute_with_unique_constraint(self, table: str,
unique_fields: list[str],
data: dict) -> int:
"""Execute insert with unique constraint handling."""
unique_cols = ', '.join(unique_fields)
placeholders = ', '.join(['?'] * len(unique_fields))
values = tuple(data.get(f) for f in unique_fields)
try:
cursor = self.conn.cursor()
# Try to insert
cols = ', '.join(data.keys())
val_placeholders = ', '.join(['?'] * len(data))
cursor.execute(
f"INSERT INTO {table} ({cols}) VALUES ({val_placeholders})",
tuple(data.values())
)
self.conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError as e:
# Duplicate - fetch existing record
cursor.execute(
f"SELECT id FROM {table} WHERE {unique_cols} = ?",
values
)
row = cursor.fetchone()
return row[0] if row else None
class PaymentRepository:
"""Payment repository with idempotency."""
def __init__(self, db: IdempotentDatabase):
self.db = db
def create_payment(self, payment_data: dict) -> dict:
"""Create payment with idempotency."""
key = payment_data.get("idempotency_key")
def do_payment():
return self._process_payment(payment_data)
return self.db.execute_idempotent(key, do_payment)
def _process_payment(self, data: dict) -> dict:
"""Actual payment processing logic."""
# Payment processing logic here
return {
"payment_id": str(uuid.uuid4()),
"status": "completed",
"amount": data["amount"]
}, 201
Optimistic Locking
import threading
from dataclasses import dataclass
from typing import Optional
@dataclass
class VersionedEntity:
id: str
version: int
data: dict
class OptimisticLockStore:
"""Store with optimistic locking."""
def __init__(self):
self.entities = {}
self.lock = threading.RLock()
def get(self, entity_id: str) -> Optional[VersionedEntity]:
with self.lock:
return self.entities.get(entity_id)
def update(self, entity_id: str, expected_version: int,
new_data: dict) -> VersionedEntity:
"""Update with optimistic locking."""
with self.lock:
entity = self.entities.get(entity_id)
if not entity:
# Create new entity
entity = VersionedEntity(
id=entity_id,
version=1,
data=new_data
)
self.entities[entity_id] = entity
return entity
# Check version
if entity.version != expected_version:
raise OptimisticLockError(
f"Version mismatch: expected {expected_version}, "
f"found {entity.version}"
)
# Update
entity.version += 1
entity.data = new_data
return entity
class OptimisticLockError(Exception):
"""Exception for optimistic lock failures."""
pass
# REST API Implementation
class VersionedAPI:
"""API with optimistic locking via ETag."""
def __init__(self, store: OptimisticLockStore):
self.store = store
def get_with_etag(self, entity_id: str) -> tuple[dict, str, int]:
"""Get entity with ETag."""
entity = self.store.get(entity_id)
if not entity:
return {"error": "Not found"}, 404, None
# Generate ETag
etag = f'"{entity.id}-{entity.version}"'
return entity.data, 200, etag
def update(self, entity_id: str, data: dict,
if_match: str) -> tuple[dict, int]:
"""Update with optimistic locking."""
if not if_match:
return {"error": "If-Match header required"}, 400
# Parse ETag
try:
expected_version = int(if_match.strip('"').split('-')[1])
except (ValueError, IndexError):
return {"error": "Invalid ETag"}, 400
try:
entity = self.store.update(entity_id, expected_version, data)
etag = f'"{entity.id}-{entity.version}"'
return {
"data": entity.data,
"version": entity.version
}, 200
except OptimisticLockError as e:
return {"error": str(e)}, 409
# Example: ETag in response headers
def handle_get(request, entity_id: str):
api = VersionedAPI(store)
data, status, etag = api.get_with_etag(entity_id)
if status == 200:
return data, 200, {'ETag': etag}
return data, status
def handle_update(request, entity_id: str, data: dict):
api = VersionedAPI(store)
if_match = request.headers.get('If-Match')
result, status = api.update(entity_id, data, if_match)
if status == 200:
result['ETag'] = f'"{entity_id}-{result["version"]}"'
return result, status
Client-Side Idempotency
import requests
import uuid
import time
from typing import Callable
import threading
class IdempotentClient:
"""HTTP client with automatic idempotency."""
def __init__(self, base_url: str = "", retry_on_timeout: bool = True):
self.base_url = base_url
self.retry_on_timeout = retry_on_timeout
self.pending_requests = {}
self.lock = threading.Lock()
def request(self, method: str, endpoint: str,
idempotency_key: str = None,
**kwargs) -> requests.Response:
"""Make request with idempotency support."""
url = f"{self.base_url}{endpoint}"
# Generate idempotency key if not provided
if not idempotency_key:
idempotency_key = str(uuid.uuid4())
headers = kwargs.pop('headers', {})
headers['Idempotency-Key'] = idempotency_key
max_retries = 3
last_error = None
for attempt in range(max_retries):
try:
response = requests.request(
method,
url,
headers=headers,
**kwargs
)
# Check if response indicates in-progress
if response.status_code == 409:
# Another request in progress, wait and retry
time.sleep(0.5 * (attempt + 1))
continue
return response
except requests.Timeout as e:
last_error = e
if not self.retry_on_timeout:
raise
# Check if request might have succeeded
if attempt < max_retries - 1:
print(f"Request timeout, retrying ({attempt + 1}/{max_retries})")
time.sleep(2 ** attempt) # Exponential backoff
continue
raise last_error
raise last_error
def post(self, endpoint: str, data: dict, **kwargs) -> requests.Response:
return self.request('POST', endpoint, json=data, **kwargs)
def put(self, endpoint: str, data: dict, **kwargs) -> requests.Response:
return self.request('PUT', endpoint, json=data, **kwargs)
def delete(self, endpoint: str, **kwargs) -> requests.Response:
return self.request('DELETE', endpoint, **kwargs)
# Usage
client = IdempotentClient("https://api.example.com")
# Payment request - use same key for retries
payment_data = {
"amount": 100.00,
"currency": "USD",
"customer_id": "cust_123"
}
# First attempt
try:
response = client.post(
"/payments",
payment_data,
idempotency_key="payment_cust123_12345"
)
except requests.Timeout:
# Retry with same key - won't create duplicate
response = client.post(
"/payments",
payment_data,
idempotency_key="payment_cust123_12345"
)
Idempotency in Message Queues
import hashlib
import json
from typing import Optional
class MessageIdempotency:
"""Idempotency for message processing."""
def __init__(self, redis_client):
self.redis = redis_client
def process_message(self, message: dict, processor: callable) -> bool:
"""Process message with idempotency."""
# Create unique key from message
message_key = self._create_message_key(message)
# Check if already processed
if self.redis.exists(message_key):
print(f"Message already processed: {message_key}")
return True
try:
# Process message
result = processor(message)
# Mark as processed (with TTL)
self.redis.setex(message_key, 86400, json.dumps(result))
return result
except Exception as e:
print(f"Error processing message: {e}")
raise
def _create_message_key(self, message: dict) -> str:
"""Create unique key from message."""
# Use deterministic fields
key_data = json.dumps({
"type": message.get("type"),
"id": message.get("id"),
"timestamp": message.get("timestamp"),
"source": message.get("source")
}, sort_keys=True)
return f"message:processed:{hashlib.sha256(key_data).hexdigest()}"
def cleanup_old_keys(self):
"""Clean up old processed message keys."""
# Redis handles TTL automatically
pass
Best Practices
class IdempotencyBestPractices:
"""Guidelines for idempotent API design."""
@staticmethod
def should_be_idempotent(method: str, resource: str) -> bool:
"""Determine if operation should be idempotent."""
idempotent_methods = ['GET', 'PUT', 'DELETE', 'HEAD', 'OPTIONS']
# GET is always idempotent
if method in idempotent_methods:
return True
# POST operations that create resources should use idempotency
if method == 'POST':
return resource in [
'/payments', '/orders', '/subscriptions',
'/transfers', '/bookings'
]
return False
@staticmethod
def generate_idempotency_key(parts: list[str]) -> str:
"""Generate idempotency key from parts."""
combined = '-'.join(str(p) for p in parts)
return hashlib.sha256(combined.encode()).hexdigest()[:32]
@staticmethod
def idempotency_key_requirements() -> dict:
"""Requirements for idempotency keys."""
return {
"min_length": 16,
"max_length": 256,
"format": "alphanumeric with dashes/underscores",
"uniqueness": "one per user per operation",
"ttl": "24 hours minimum"
}
Conclusion
Idempotency is crucial for building reliable distributed systems. By implementing proper idempotency handling, you can safely handle retries without worrying about duplicate operations.
Key takeaways:
- Use idempotency keys for all state-changing POST operations
- Implement optimistic locking for updates with ETags
- Store idempotency records with appropriate TTL
- Generate descriptive idempotency keys for debugging
- Consider idempotency at all layers: API, database, and message queues
Comments