Skip to main content
โšก Calmops

Distributed Locking Patterns: Redis, etcd, and Coordination Services

Introduction

In distributed systems, coordinating access to shared resources across multiple processes and machines is challenging. A distributed lock ensures that only one client can access a critical resource at a time, preventing race conditions, data corruption, and inconsistent states.

This guide covers distributed locking patterns, implementation strategies using Redis, etcd, and ZooKeeper, and practical patterns for protecting critical sections in production systems.

Why Distributed Locks Matter

Distributed locks solve critical problems:

  • Prevent Race Conditions: Ensure sequential access to shared resources
  • Maintain Data Consistency: Protect against concurrent modifications
  • Enable Coordination: Coordinate across multiple services
  • Support Failover: Handle leader election and failovers

Redis-Based Distributed Lock

Basic Redlock Implementation

import redis
import time
import uuid

class RedisLock:
    def __init__(self, redis_client, key, timeout=30, blocking=True, blocking_timeout=5):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.blocking = blocking
        self.blocking_timeout = blocking_timeout
        self.token = str(uuid.uuid4())
        self.acquired = False
    
    def acquire(self):
        """Attempt to acquire the lock."""
        acquired_at = time.time()
        
        while True:
            # SET NX with expiration - atomic operation
            acquired = self.redis.set(
                self.key,
                self.token,
                nx=True,  # Only set if not exists
                ex=self.timeout  # Auto-expire to prevent deadlocks
            )
            
            if acquired:
                self.acquired = True
                return True
            
            if not self.blocking:
                return False
            
            # Check blocking timeout
            elapsed = time.time() - acquired_at
            if elapsed >= self.blocking_timeout:
                return False
            
            # Wait before retrying
            time.sleep(0.01)  # 10ms
    
    def release(self):
        """Release the lock if we own it."""
        if not self.acquired:
            return False
        
        # Lua script for atomic check-and-delete
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis.eval(script, 1, self.key, self.token)
        self.acquired = False
        return result == 1
    
    def extend(self, additional_time):
        """Extend lock timeout if we own it."""
        if not self.acquired:
            return False
        
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("expire", KEYS[1], ARGV[2])
        else
            return 0
        end
        """
        
        new_timeout = self.timeout + additional_time
        result = self.redis.eval(script, 1, self.key, self.token, new_timeout)
        return result == 1
    
    def __enter__(self):
        if not self.acquire():
            raise LockAcquisitionError(f"Could not acquire lock: {self.key}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False

Redlock: Multiple Redis Instances

For better reliability, use Redlock with multiple Redis instances:

import redis
import time
import uuid

class Redlock:
    def __init__(self, redis_clients, retry_count=3, retry_delay=200):
        self.clients = redis_clients
        self.retry_count = retry_count
        self.retry_delay = retry_delay / 1000  # Convert to seconds
        self.quorum = (len(redis_clients) // 2) + 1
    
    def acquire(self, key, timeout=30):
        """Acquire lock from majority of Redis instances."""
        token = str(uuid.uuid4())
        
        for attempt in range(self.retry_count):
            acquired_count = 0
            start_time = time.time()
            
            for client in self.clients:
                try:
                    result = client.set(
                        f"lock:{key}",
                        token,
                        nx=True,
                        ex=timeout
                    )
                    if result:
                        acquired_count += 1
                except redis.RedisError:
                    pass
            
            if acquired_count >= self.quorum:
                # Calculate drift - lock should not expire too early
                drift = (timeout * 0.01) + 2  # 1% + 2ms
                validity_time = timeout - (time.time() - start_time) - drift
                
                if validity_time > 0:
                    return DistributedLock(token, key, self, validity_time)
            
            # Release any partial locks
            self._release_all(key, token)
            
            # Wait before retry
            if attempt < self.retry_count - 1:
                time.sleep(self.retry_delay)
        
        return None
    
    def _release_all(self, key, token):
        """Release lock from all instances."""
        for client in self.clients:
            try:
                script = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("del", KEYS[1])
                else
                    return 0
                end
                """
                client.eval(script, 1, f"lock:{key}", token)
            except redis.RedisError:
                pass
    
    def release(self, key, token):
        """Release the lock."""
        self._release_all(key, token)

Using the Lock

# Basic usage
redis_client = redis.Redis(host='localhost', port=6379)

with RedisLock(redis_client, 'process-order-123') as lock:
    # Critical section - only one process can enter
    process_order('123')
    update_order_status('123', 'processed')

# With Redlock for high availability
clients = [
    redis.Redis(host='redis1', port=6379),
    redis.Redis(host='redis2', port=6379),
    redis.Redis(host='redis3', port=6379)
]

lock_manager = Redlock(clients)
lock = lock_manager.acquire('critical-task', timeout=30)

if lock:
    try:
        perform_critical_task()
    finally:
        lock_manager.release('critical-task', lock.token)

etcd Distributed Lock

etcd v3 Lock Implementation

import etcd3
import uuid
import time

class EtcdLock:
    def __init__(self, etcd_client, key, ttl=30):
        self.etcd = etcd_client
        self.key = f"/locks/{key}"
        self.ttl = ttl
        self.token = str(uuid.uuid4())
        self.lease_id = None
    
    def acquire(self, blocking=True, timeout=30):
        """Acquire etcd lock."""
        # Create lease with TTL
        self.lease_id = self.etcd.lease(self.ttl)
        
        acquired_key = f"{self.key}/{self.token}"
        
        while True:
            try:
                # Try to put the key with lease
                self.etcd.put(
                    acquired_key,
                    self.token,
                    lease=self.lease_id
                )
                return True
            except etcd3.exceptions.AlreadyExists:
                if not blocking:
                    return False
                
                # Wait for lock release
                try:
                    # Watch for delete events on lock prefix
                    events = self.etcd.watch(
                        self.key,
                        filters=[etcd3.events.DeleteEvent],
                        timeout=timeout
                    )
                    for event in events:
                        break
                except etcd3.exceptions.WatchTimedOut:
                    return False
    
    def release(self):
        """Release etcd lock."""
        if self.lease_id:
            try:
                self.etcd.lease_revoke(self.lease_id)
            except:
                pass
    
    def __enter__(self):
        if not self.acquire():
            raise LockAcquisitionError(f"Could not acquire lock: {self.key}")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False

etcd Distributed Lock with Sequences

class EtcdSequentialLock:
    """Lock with fairness - ensures FIFO ordering."""
    
    def __init__(self, etcd_client, lock_name, ttl=60):
        self.etcd = etcd_client
        self.lock_path = f"/locks/{lock_name}"
        self.my_key = None
        self.lease_id = etcd_client.lease(ttl)
    
    def acquire(self):
        """Create sequential key."""
        # Get current sequence number
        try:
            result = self.etcd.get(self.lock_path)
            if result[0]:
                seq = int(result[0].decode()) + 1
            else:
                seq = 1
        except:
            seq = 1
        
        # Create my sequential key
        self.my_key = f"{self.lock_path}/{seq}"
        self.etcd.put(self.my_key, str(seq), lease=self.lease_id)
        
        # Wait for my turn
        while True:
            # Get all keys in lock directory
            keys = self.etcd.get_all(prefix=self.lock_path)
            sorted_keys = sorted(keys, key=lambda x: x[0].decode())
            
            # Check if I'm first
            if sorted_keys[0][0].decode() == self.my_key:
                return True
            
            # Watch the previous key
            prev_key = sorted_keys[0][0].decode()
            self.etcd.watch(prev_key)
    
    def release(self):
        """Delete my key."""
        if self.my_key:
            try:
                self.etcd.delete(self.my_key)
            except:
                pass
            self.etcd.lease_revoke(self.lease_id)

Leader Election

Redis-Based Leader Election

import redis
import time
import uuid

class LeaderElection:
    def __init__(self, redis_client, election_name, candidate_id):
        self.redis = redis_client
        self.election_key = f"election:{election_name}"
        self.candidate_id = candidate_id
        self.is_leader = False
    
    def become_leader(self):
        """Attempt to become leader."""
        # Add candidate to sorted set with timestamp as score
        score = time.time()
        self.redis.zadd(self.election_key, {self.candidate_id: score})
        
        # Get all candidates
        candidates = self.redis.zrange(self.election_key, 0, 0, withscores=True)
        
        # Check if we're the leader (lowest score = earliest)
        if candidates and candidates[0][0].decode() == self.candidate_id:
            self.is_leader = True
            # Set leader TTL
            self.redis.expire(self.election_key, 60)
            return True
        
        return False
    
    def get_leader(self):
        """Get current leader."""
        candidates = self.redis.zrange(self.election_key, 0, 0, withscores=True)
        if candidates:
            return {
                'id': candidates[0][0].decode(),
                'elected_at': candidates[0][1]
            }
        return None
    
    def renew_leadership(self):
        """Renew leadership."""
        if self.is_leader:
            return self.become_leader()
        return False
    
    def resign(self):
        """Resign leadership."""
        if self.is_leader:
            self.redis.zrem(self.election_key, self.candidate_id)
            self.is_leader = False

etcd-Based Leader Election

class EtcdLeaderElection:
    """Leader election using etcd compare-and-swap."""
    
    def __init__(self, etcd_client, election_name, candidate_id):
        self.etcd = etcd_client
        self.election_path = f"/election/{election_name}"
        self.candidate_id = candidate_id
        self.leader_key = f"{self.election_path}/leader"
        self.is_leader = False
    
    def campaign(self):
        """Campaign for leadership."""
        # Create ephemeral key with candidate ID
        try:
            self.etcd.put(
                self.leader_key,
                self.candidate_id,
                prev_kv=False  # Don't check for existence
            )
            self.is_leader = True
            return True
        except etcd3.exceptions.PreconditionFailed:
            return self.become_follower()
    
    def become_follower(self):
        """Watch for leader changes and wait."""
        self.is_leader = False
        
        while True:
            # Watch for leader changes
            events = self.etcd.watch(self.leader_key)
            
            for event:
                if isinstance(event, etcd3.events.PutEvent):
                    # New leader elected
                    leader = event.value.decode()
                    if leader != self.candidate_id:
                        return False
    
    def get_leader(self):
        """Get current leader."""
        try:
            result = self.etcd.get(self.leader_key)
            if result[0]:
                return result[0].decode()
        except:
            pass
        return None
    
    def resign(self):
        """Resign leadership."""
        if self.is_leader:
            try:
                self.etcd.delete(self.leader_key)
            except:
                pass
            self.is_leader = False

Distributed Semaphore

Redis Semaphore

import redis
import time
import uuid

class RedisSemaphore:
    def __init__(self, redis_client, key, max_permits):
        self.redis = redis_client
        self.key = f"semaphore:{key}"
        self.max_permits = max_permits
        self.token = str(uuid.uuid4())
    
    def acquire(self, blocking=True, timeout=30):
        """Acquire semaphore permit."""
        start_time = time.time()
        
        while True:
            # Add to sorted set with current time as score
            score = time.time()
            self.redis.zadd(self.key, {self.token: score})
            
            # Count permits
            count = self.redis.zcard(self.key)
            
            if count <= self.max_permits:
                # Successfully acquired
                return True
            
            if not blocking:
                self.redis.zrem(self.key, self.token)
                return False
            
            # Check timeout
            elapsed = time.time() - start_time
            if elapsed >= timeout:
                self.redis.zrem(self.key, self.token)
                return False
            
            # Remove my token and wait
            self.redis.zrem(self.key, self.token)
            time.sleep(0.01)
    
    def release(self):
        """Release semaphore permit."""
        result = self.redis.zrem(self.key, self.token)
        return result > 0
    
    def get_permits(self):
        """Get number of available permits."""
        count = self.redis.zcard(self.key)
        return max(0, self.max_permits - count)

Practical Patterns

Lock with Automatic Cleanup

import signal
import atexit

class AutoCleanupLock:
    def __init__(self, redis_client, key, timeout=30):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.token = str(uuid.uuid4())
        self.acquired = False
        
        # Register cleanup handlers
        atexit.register(self._cleanup)
        signal.signal(signal.SIGTERM, self._signal_handler)
        signal.signal(signal.SIGINT, self._signal_handler)
    
    def _signal_handler(self, signum, frame):
        self.release()
        exit(1)
    
    def _cleanup(self):
        if self.acquired:
            self.release()
    
    def acquire(self):
        result = self.redis.set(
            self.key,
            self.token,
            nx=True,
            ex=self.timeout
        )
        self.acquired = bool(result)
        return self.acquired
    
    def release(self):
        if not self.acquired:
            return False
        
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis.eval(script, 1, self.key, self.token)
        self.acquired = False
        return result == 1

Distributed Lock for Job Processing

import redis
import time

class JobProcessor:
    def __init__(self, redis_client, job_name):
        self.redis = redis_client
        self.job_name = job_name
        self.lock_timeout = 300  # 5 minutes
    
    def try_process(self, job_id, processor_func):
        """Try to process a job with distributed locking."""
        lock_key = f"job:{self.job_name}:{job_id}"
        
        with RedisLock(self.redis, lock_key, timeout=self.lock_timeout) as lock:
            # Check if job already processed
            if self.redis.get(f"completed:{job_id}"):
                return {'status': 'already_processed'}
            
            # Process the job
            result = processor_func()
            
            # Mark as completed
            self.redis.setex(f"completed:{job_id}", 86400, 'done')
            
            return {'status': 'processed', 'result': result}
    
    def claim_job(self, job_ids):
        """Atomically claim an unprocessed job."""
        for job_id in job_ids:
            lock_key = f"job:{self.job_name}:{job_id}"
            
            # Try to atomically claim
            claimed = self.redis.set(
                f"claimed:{job_id}",
                socket.gethostname(),
                nx=True,
                ex=300
            )
            
            if claimed:
                return job_id
        
        return None

Monitoring and Debugging

import logging
from prometheus_client import Counter, Gauge

lock_acquisitions = Counter(
    'dist_lock_acquisitions_total',
    'Total lock acquisitions',
    ['lock_name', 'result']
)

lock_wait_time = Histogram(
    'dist_lock_wait_seconds',
    'Lock wait time',
    ['lock_name']
)

active_locks = Gauge(
    'dist_locks_active',
    'Active locks',
    ['lock_name']
)

class InstrumentedRedisLock(RedisLock):
    def acquire(self):
        start_time = time.time()
        
        result = super().acquire()
        
        wait_time = time.time() - start_time
        lock_wait_time.labels(lock_name=self.key).observe(wait_time)
        lock_acquisitions.labels(
            lock_name=self.key,
            result='success' if result else 'failed'
        ).inc()
        
        if result:
            active_locks.labels(lock_name=self.key).inc(1)
        
        return result
    
    def release(self):
        result = super().release()
        
        if result:
            active_locks.labels(lock_name=self.key).dec(1)
        
        return result

Best Practices

  1. Always Use Timeouts: Prevent deadlocks with automatic expiration
  2. Use Unique Tokens: Ensure only lock owner can release
  3. Implement Retry Logic: Handle transient failures gracefully
  4. Monitor Lock Metrics: Track acquisition times and failures
  5. Use Redlock for Critical Systems: Multiple Redis instances provide better guarantees
  6. Keep Critical Sections Short: Minimize lock hold time
  7. Handle Failures Gracefully: Implement circuit breakers for lock services

Conclusion

Distributed locks are essential for coordinating access to shared resources in distributed systems. Start with simple Redis-based locks for most use cases, and graduate to Redlock or etcd-based solutions for critical systems requiring higher availability. Always implement proper timeouts, monitoring, and cleanup handlers to ensure system reliability.

Comments