Skip to main content
โšก Calmops

Idempotency in Distributed Systems: Building Safe APIs

Introduction

Network failures, timeouts, and client retries can cause the same operation to be executed multiple times. Without proper idempotency handling, this can lead to duplicate charges, duplicated records, or inconsistent state. Idempotency ensures that executing an operation multiple times produces the same result as executing it once.

Understanding Idempotency

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Idempotency Concepts                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Idempotent Operations:                                         โ”‚
โ”‚                                                                 โ”‚
โ”‚  GET /users/123     โ†’ Always returns user 123                   โ”‚
โ”‚  DELETE /users/123  โ†’ Deleting user 123 twice = same as once    โ”‚
โ”‚  PUT /users/123    โ†’ Setting name to "John" (same value)       โ”‚
โ”‚                                                                     โ”‚
โ”‚  Non-Idempotent Operations:                                       โ”‚
โ”‚                                                                 โ”‚
โ”‚  POST /orders     โ†’ Creating order (each call = new order)      โ”‚
โ”‚  POST /payments   โ†’ Each call = new payment                     โ”‚
โ”‚  DELETE /items/1  โ†’ First call deletes, second returns 404      โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 The Problem: Duplicate Payments                  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Client              Server              Database                โ”‚
โ”‚     โ”‚                  โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚ POST /pay        โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚                  โ”‚  (processing)      โ”‚                    โ”‚
โ”‚     โ”‚   (timeout)      โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚โœ— โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚                  โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚ (retry)          โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚ POST /pay        โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚                  โ”‚  Process payment โ”‚                    โ”‚
โ”‚     โ”‚                  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚                   โ”‚
โ”‚     โ”‚                  โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚                  โ”‚  Process payment โ”‚                    โ”‚
โ”‚     โ”‚                  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถโ”‚ (DUPLICATE!)       โ”‚
โ”‚     โ”‚                  โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚โ—€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚                   โ”‚                    โ”‚
โ”‚     โ”‚   200 OK         โ”‚                   โ”‚                    โ”‚
โ”‚                                                                 โ”‚
โ”‚  Solution: Idempotency Key                                       โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Idempotency Keys

import uuid
import hashlib
import time
from dataclasses import dataclass
from typing import Optional
import threading

@dataclass
class IdempotencyRecord:
    """Record of an idempotent request."""
    key: str
    response_status: int
    response_body: dict
    created_at: float
    expires_at: float

class IdempotencyStore:
    """Store for idempotency records."""
    
    def __init__(self, ttl_seconds: int = 86400):
        self.records = {}
        self.lock = threading.RLock()
        self.ttl = ttl_seconds
    
    def get(self, key: str) -> Optional[IdempotencyRecord]:
        """Get idempotency record if exists and valid."""
        with self.lock:
            record = self.records.get(key)
            
            if record and time.time() < record.expires_at:
                return record
            
            # Clean up expired
            if record:
                del self.records[key]
            
            return None
    
    def set(self, key: str, status: int, body: dict):
        """Store idempotency record."""
        with self.lock:
            now = time.time()
            self.records[key] = IdempotencyRecord(
                key=key,
                response_status=status,
                response_body=body,
                created_at=now,
                expires_at=now + self.ttl
            )
    
    def delete(self, key: str):
        """Delete idempotency record."""
        with self.lock:
            self.records.pop(key, None)
    
    def cleanup_expired(self):
        """Clean up expired records."""
        with self.lock:
            now = time.time()
            expired = [
                k for k, v in self.records.items()
                if now >= v.expires_at
            ]
            for k in expired:
                del self.records[k]


class IdempotentEndpoint:
    """Decorator for idempotent endpoints."""
    
    def __init__(self, store: IdempotencyStore):
        self.store = store
    
    def __call__(self, func):
        """Decorator to make endpoint idempotent."""
        
        def wrapper(request, *args, **kwargs):
            # Get idempotency key from header
            idempotency_key = request.headers.get('Idempotency-Key')
            
            if not idempotency_key:
                return {"error": "Idempotency-Key required"}, 400
            
            # Validate key format
            if len(idempotency_key) < 16:
                return {"error": "Invalid idempotency key"}, 400
            
            # Check for existing request
            existing = self.store.get(idempotency_key)
            
            if existing:
                return (
                    existing.response_body,
                    existing.response_status,
                    {'X-Idempotent-Replayed': 'true'}
                )
            
            # Execute request
            try:
                result = func(request, *args, **kwargs)
                
                # Store successful result
                if isinstance(result, tuple):
                    body, status = result
                else:
                    body, status = result, 200
                
                self.store.set(idempotency_key, status, body)
                
                return body, status
            
            except Exception as e:
                # Don't store failed results
                raise
        
        return wrapper

Database-Level Idempotency

import sqlite3
from typing import Optional
import json
import uuid

class IdempotentDatabase:
    """Database operations with idempotency support."""
    
    def __init__(self, db_path: str):
        self.conn = sqlite3.connect(db_path)
        self._init_tables()
    
    def _init_tables(self):
        """Initialize idempotency table."""
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS idempotency_keys (
                key TEXT PRIMARY KEY,
                result TEXT,
                status_code INTEGER,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()
    
    def execute_idempotent(self, key: str, operation: callable) -> dict:
        """Execute operation with idempotency check."""
        cursor = self.conn.cursor()
        
        # Check for existing result
        cursor.execute(
            "SELECT result, status_code FROM idempotency_keys WHERE key = ?",
            (key,)
        )
        row = cursor.fetchone()
        
        if row:
            return json.loads(row[0]), row[1]
        
        # Execute operation
        try:
            result = operation()
            self.conn.commit()
            
            # Store result
            self.conn.execute(
                "INSERT INTO idempotency_keys (key, result, status_code) VALUES (?, ?, ?)",
                (key, json.dumps(result[0]), result[1])
            )
            self.conn.commit()
            
            return result
        
        except Exception as e:
            self.conn.rollback()
            raise
    
    def execute_with_unique_constraint(self, table: str, 
                                      unique_fields: list[str],
                                      data: dict) -> int:
        """Execute insert with unique constraint handling."""
        unique_cols = ', '.join(unique_fields)
        placeholders = ', '.join(['?'] * len(unique_fields))
        values = tuple(data.get(f) for f in unique_fields)
        
        try:
            cursor = self.conn.cursor()
            
            # Try to insert
            cols = ', '.join(data.keys())
            val_placeholders = ', '.join(['?'] * len(data))
            
            cursor.execute(
                f"INSERT INTO {table} ({cols}) VALUES ({val_placeholders})",
                tuple(data.values())
            )
            self.conn.commit()
            
            return cursor.lastrowid
        
        except sqlite3.IntegrityError as e:
            # Duplicate - fetch existing record
            cursor.execute(
                f"SELECT id FROM {table} WHERE {unique_cols} = ?",
                values
            )
            row = cursor.fetchone()
            return row[0] if row else None


class PaymentRepository:
    """Payment repository with idempotency."""
    
    def __init__(self, db: IdempotentDatabase):
        self.db = db
    
    def create_payment(self, payment_data: dict) -> dict:
        """Create payment with idempotency."""
        key = payment_data.get("idempotency_key")
        
        def do_payment():
            return self._process_payment(payment_data)
        
        return self.db.execute_idempotent(key, do_payment)
    
    def _process_payment(self, data: dict) -> dict:
        """Actual payment processing logic."""
        # Payment processing logic here
        return {
            "payment_id": str(uuid.uuid4()),
            "status": "completed",
            "amount": data["amount"]
        }, 201

Optimistic Locking

import threading
from dataclasses import dataclass
from typing import Optional

@dataclass
class VersionedEntity:
    id: str
    version: int
    data: dict

class OptimisticLockStore:
    """Store with optimistic locking."""
    
    def __init__(self):
        self.entities = {}
        self.lock = threading.RLock()
    
    def get(self, entity_id: str) -> Optional[VersionedEntity]:
        with self.lock:
            return self.entities.get(entity_id)
    
    def update(self, entity_id: str, expected_version: int, 
               new_data: dict) -> VersionedEntity:
        """Update with optimistic locking."""
        with self.lock:
            entity = self.entities.get(entity_id)
            
            if not entity:
                # Create new entity
                entity = VersionedEntity(
                    id=entity_id,
                    version=1,
                    data=new_data
                )
                self.entities[entity_id] = entity
                return entity
            
            # Check version
            if entity.version != expected_version:
                raise OptimisticLockError(
                    f"Version mismatch: expected {expected_version}, "
                    f"found {entity.version}"
                )
            
            # Update
            entity.version += 1
            entity.data = new_data
            
            return entity

class OptimisticLockError(Exception):
    """Exception for optimistic lock failures."""
    pass

# REST API Implementation
class VersionedAPI:
    """API with optimistic locking via ETag."""
    
    def __init__(self, store: OptimisticLockStore):
        self.store = store
    
    def get_with_etag(self, entity_id: str) -> tuple[dict, str, int]:
        """Get entity with ETag."""
        entity = self.store.get(entity_id)
        
        if not entity:
            return {"error": "Not found"}, 404, None
        
        # Generate ETag
        etag = f'"{entity.id}-{entity.version}"'
        
        return entity.data, 200, etag
    
    def update(self, entity_id: str, data: dict, 
              if_match: str) -> tuple[dict, int]:
        """Update with optimistic locking."""
        if not if_match:
            return {"error": "If-Match header required"}, 400
        
        # Parse ETag
        try:
            expected_version = int(if_match.strip('"').split('-')[1])
        except (ValueError, IndexError):
            return {"error": "Invalid ETag"}, 400
        
        try:
            entity = self.store.update(entity_id, expected_version, data)
            etag = f'"{entity.id}-{entity.version}"'
            
            return {
                "data": entity.data,
                "version": entity.version
            }, 200
        
        except OptimisticLockError as e:
            return {"error": str(e)}, 409


# Example: ETag in response headers
def handle_get(request, entity_id: str):
    api = VersionedAPI(store)
    data, status, etag = api.get_with_etag(entity_id)
    
    if status == 200:
        return data, 200, {'ETag': etag}
    return data, status


def handle_update(request, entity_id: str, data: dict):
    api = VersionedAPI(store)
    if_match = request.headers.get('If-Match')
    
    result, status = api.update(entity_id, data, if_match)
    
    if status == 200:
        result['ETag'] = f'"{entity_id}-{result["version"]}"'
    
    return result, status

Client-Side Idempotency

import requests
import uuid
import time
from typing import Callable
import threading

class IdempotentClient:
    """HTTP client with automatic idempotency."""
    
    def __init__(self, base_url: str = "", retry_on_timeout: bool = True):
        self.base_url = base_url
        self.retry_on_timeout = retry_on_timeout
        self.pending_requests = {}
        self.lock = threading.Lock()
    
    def request(self, method: str, endpoint: str, 
               idempotency_key: str = None,
               **kwargs) -> requests.Response:
        """Make request with idempotency support."""
        url = f"{self.base_url}{endpoint}"
        
        # Generate idempotency key if not provided
        if not idempotency_key:
            idempotency_key = str(uuid.uuid4())
        
        headers = kwargs.pop('headers', {})
        headers['Idempotency-Key'] = idempotency_key
        
        max_retries = 3
        last_error = None
        
        for attempt in range(max_retries):
            try:
                response = requests.request(
                    method,
                    url,
                    headers=headers,
                    **kwargs
                )
                
                # Check if response indicates in-progress
                if response.status_code == 409:
                    # Another request in progress, wait and retry
                    time.sleep(0.5 * (attempt + 1))
                    continue
                
                return response
            
            except requests.Timeout as e:
                last_error = e
                
                if not self.retry_on_timeout:
                    raise
                
                # Check if request might have succeeded
                if attempt < max_retries - 1:
                    print(f"Request timeout, retrying ({attempt + 1}/{max_retries})")
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
                
                raise last_error
        
        raise last_error
    
    def post(self, endpoint: str, data: dict, **kwargs) -> requests.Response:
        return self.request('POST', endpoint, json=data, **kwargs)
    
    def put(self, endpoint: str, data: dict, **kwargs) -> requests.Response:
        return self.request('PUT', endpoint, json=data, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> requests.Response:
        return self.request('DELETE', endpoint, **kwargs)


# Usage
client = IdempotentClient("https://api.example.com")

# Payment request - use same key for retries
payment_data = {
    "amount": 100.00,
    "currency": "USD",
    "customer_id": "cust_123"
}

# First attempt
try:
    response = client.post(
        "/payments",
        payment_data,
        idempotency_key="payment_cust123_12345"
    )
except requests.Timeout:
    # Retry with same key - won't create duplicate
    response = client.post(
        "/payments",
        payment_data,
        idempotency_key="payment_cust123_12345"
    )

Idempotency in Message Queues

import hashlib
import json
from typing import Optional

class MessageIdempotency:
    """Idempotency for message processing."""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def process_message(self, message: dict, processor: callable) -> bool:
        """Process message with idempotency."""
        # Create unique key from message
        message_key = self._create_message_key(message)
        
        # Check if already processed
        if self.redis.exists(message_key):
            print(f"Message already processed: {message_key}")
            return True
        
        try:
            # Process message
            result = processor(message)
            
            # Mark as processed (with TTL)
            self.redis.setex(message_key, 86400, json.dumps(result))
            
            return result
        
        except Exception as e:
            print(f"Error processing message: {e}")
            raise
    
    def _create_message_key(self, message: dict) -> str:
        """Create unique key from message."""
        # Use deterministic fields
        key_data = json.dumps({
            "type": message.get("type"),
            "id": message.get("id"),
            "timestamp": message.get("timestamp"),
            "source": message.get("source")
        }, sort_keys=True)
        
        return f"message:processed:{hashlib.sha256(key_data).hexdigest()}"
    
    def cleanup_old_keys(self):
        """Clean up old processed message keys."""
        # Redis handles TTL automatically
        pass

Best Practices

class IdempotencyBestPractices:
    """Guidelines for idempotent API design."""
    
    @staticmethod
    def should_be_idempotent(method: str, resource: str) -> bool:
        """Determine if operation should be idempotent."""
        idempotent_methods = ['GET', 'PUT', 'DELETE', 'HEAD', 'OPTIONS']
        
        # GET is always idempotent
        if method in idempotent_methods:
            return True
        
        # POST operations that create resources should use idempotency
        if method == 'POST':
            return resource in [
                '/payments', '/orders', '/subscriptions',
                '/transfers', '/bookings'
            ]
        
        return False
    
    @staticmethod
    def generate_idempotency_key(parts: list[str]) -> str:
        """Generate idempotency key from parts."""
        combined = '-'.join(str(p) for p in parts)
        return hashlib.sha256(combined.encode()).hexdigest()[:32]
    
    @staticmethod
    def idempotency_key_requirements() -> dict:
        """Requirements for idempotency keys."""
        return {
            "min_length": 16,
            "max_length": 256,
            "format": "alphanumeric with dashes/underscores",
            "uniqueness": "one per user per operation",
            "ttl": "24 hours minimum"
        }

Conclusion

Idempotency is crucial for building reliable distributed systems. By implementing proper idempotency handling, you can safely handle retries without worrying about duplicate operations.

Key takeaways:

  • Use idempotency keys for all state-changing POST operations
  • Implement optimistic locking for updates with ETags
  • Store idempotency records with appropriate TTL
  • Generate descriptive idempotency keys for debugging
  • Consider idempotency at all layers: API, database, and message queues

Resources

Comments