Introduction
In distributed systems, coordinating access to shared resources across multiple processes and machines is challenging. A distributed lock ensures that only one client can access a critical resource at a time, preventing race conditions, data corruption, and inconsistent states.
This guide covers distributed locking patterns, implementation strategies using Redis, etcd, and ZooKeeper, and practical patterns for protecting critical sections in production systems.
Why Distributed Locks Matter
Distributed locks solve critical problems:
- Prevent Race Conditions: Ensure sequential access to shared resources
- Maintain Data Consistency: Protect against concurrent modifications
- Enable Coordination: Coordinate across multiple services
- Support Failover: Handle leader election and failovers
Redis-Based Distributed Lock
Basic Redlock Implementation
import redis
import time
import uuid
class RedisLock:
def __init__(self, redis_client, key, timeout=30, blocking=True, blocking_timeout=5):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.blocking = blocking
self.blocking_timeout = blocking_timeout
self.token = str(uuid.uuid4())
self.acquired = False
def acquire(self):
"""Attempt to acquire the lock."""
acquired_at = time.time()
while True:
# SET NX with expiration - atomic operation
acquired = self.redis.set(
self.key,
self.token,
nx=True, # Only set if not exists
ex=self.timeout # Auto-expire to prevent deadlocks
)
if acquired:
self.acquired = True
return True
if not self.blocking:
return False
# Check blocking timeout
elapsed = time.time() - acquired_at
if elapsed >= self.blocking_timeout:
return False
# Wait before retrying
time.sleep(0.01) # 10ms
def release(self):
"""Release the lock if we own it."""
if not self.acquired:
return False
# Lua script for atomic check-and-delete
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(script, 1, self.key, self.token)
self.acquired = False
return result == 1
def extend(self, additional_time):
"""Extend lock timeout if we own it."""
if not self.acquired:
return False
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
new_timeout = self.timeout + additional_time
result = self.redis.eval(script, 1, self.key, self.token, new_timeout)
return result == 1
def __enter__(self):
if not self.acquire():
raise LockAcquisitionError(f"Could not acquire lock: {self.key}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False
Redlock: Multiple Redis Instances
For better reliability, use Redlock with multiple Redis instances:
import redis
import time
import uuid
class Redlock:
def __init__(self, redis_clients, retry_count=3, retry_delay=200):
self.clients = redis_clients
self.retry_count = retry_count
self.retry_delay = retry_delay / 1000 # Convert to seconds
self.quorum = (len(redis_clients) // 2) + 1
def acquire(self, key, timeout=30):
"""Acquire lock from majority of Redis instances."""
token = str(uuid.uuid4())
for attempt in range(self.retry_count):
acquired_count = 0
start_time = time.time()
for client in self.clients:
try:
result = client.set(
f"lock:{key}",
token,
nx=True,
ex=timeout
)
if result:
acquired_count += 1
except redis.RedisError:
pass
if acquired_count >= self.quorum:
# Calculate drift - lock should not expire too early
drift = (timeout * 0.01) + 2 # 1% + 2ms
validity_time = timeout - (time.time() - start_time) - drift
if validity_time > 0:
return DistributedLock(token, key, self, validity_time)
# Release any partial locks
self._release_all(key, token)
# Wait before retry
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay)
return None
def _release_all(self, key, token):
"""Release lock from all instances."""
for client in self.clients:
try:
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
client.eval(script, 1, f"lock:{key}", token)
except redis.RedisError:
pass
def release(self, key, token):
"""Release the lock."""
self._release_all(key, token)
Using the Lock
# Basic usage
redis_client = redis.Redis(host='localhost', port=6379)
with RedisLock(redis_client, 'process-order-123') as lock:
# Critical section - only one process can enter
process_order('123')
update_order_status('123', 'processed')
# With Redlock for high availability
clients = [
redis.Redis(host='redis1', port=6379),
redis.Redis(host='redis2', port=6379),
redis.Redis(host='redis3', port=6379)
]
lock_manager = Redlock(clients)
lock = lock_manager.acquire('critical-task', timeout=30)
if lock:
try:
perform_critical_task()
finally:
lock_manager.release('critical-task', lock.token)
etcd Distributed Lock
etcd v3 Lock Implementation
import etcd3
import uuid
import time
class EtcdLock:
def __init__(self, etcd_client, key, ttl=30):
self.etcd = etcd_client
self.key = f"/locks/{key}"
self.ttl = ttl
self.token = str(uuid.uuid4())
self.lease_id = None
def acquire(self, blocking=True, timeout=30):
"""Acquire etcd lock."""
# Create lease with TTL
self.lease_id = self.etcd.lease(self.ttl)
acquired_key = f"{self.key}/{self.token}"
while True:
try:
# Try to put the key with lease
self.etcd.put(
acquired_key,
self.token,
lease=self.lease_id
)
return True
except etcd3.exceptions.AlreadyExists:
if not blocking:
return False
# Wait for lock release
try:
# Watch for delete events on lock prefix
events = self.etcd.watch(
self.key,
filters=[etcd3.events.DeleteEvent],
timeout=timeout
)
for event in events:
break
except etcd3.exceptions.WatchTimedOut:
return False
def release(self):
"""Release etcd lock."""
if self.lease_id:
try:
self.etcd.lease_revoke(self.lease_id)
except:
pass
def __enter__(self):
if not self.acquire():
raise LockAcquisitionError(f"Could not acquire lock: {self.key}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
return False
etcd Distributed Lock with Sequences
class EtcdSequentialLock:
"""Lock with fairness - ensures FIFO ordering."""
def __init__(self, etcd_client, lock_name, ttl=60):
self.etcd = etcd_client
self.lock_path = f"/locks/{lock_name}"
self.my_key = None
self.lease_id = etcd_client.lease(ttl)
def acquire(self):
"""Create sequential key."""
# Get current sequence number
try:
result = self.etcd.get(self.lock_path)
if result[0]:
seq = int(result[0].decode()) + 1
else:
seq = 1
except:
seq = 1
# Create my sequential key
self.my_key = f"{self.lock_path}/{seq}"
self.etcd.put(self.my_key, str(seq), lease=self.lease_id)
# Wait for my turn
while True:
# Get all keys in lock directory
keys = self.etcd.get_all(prefix=self.lock_path)
sorted_keys = sorted(keys, key=lambda x: x[0].decode())
# Check if I'm first
if sorted_keys[0][0].decode() == self.my_key:
return True
# Watch the previous key
prev_key = sorted_keys[0][0].decode()
self.etcd.watch(prev_key)
def release(self):
"""Delete my key."""
if self.my_key:
try:
self.etcd.delete(self.my_key)
except:
pass
self.etcd.lease_revoke(self.lease_id)
Leader Election
Redis-Based Leader Election
import redis
import time
import uuid
class LeaderElection:
def __init__(self, redis_client, election_name, candidate_id):
self.redis = redis_client
self.election_key = f"election:{election_name}"
self.candidate_id = candidate_id
self.is_leader = False
def become_leader(self):
"""Attempt to become leader."""
# Add candidate to sorted set with timestamp as score
score = time.time()
self.redis.zadd(self.election_key, {self.candidate_id: score})
# Get all candidates
candidates = self.redis.zrange(self.election_key, 0, 0, withscores=True)
# Check if we're the leader (lowest score = earliest)
if candidates and candidates[0][0].decode() == self.candidate_id:
self.is_leader = True
# Set leader TTL
self.redis.expire(self.election_key, 60)
return True
return False
def get_leader(self):
"""Get current leader."""
candidates = self.redis.zrange(self.election_key, 0, 0, withscores=True)
if candidates:
return {
'id': candidates[0][0].decode(),
'elected_at': candidates[0][1]
}
return None
def renew_leadership(self):
"""Renew leadership."""
if self.is_leader:
return self.become_leader()
return False
def resign(self):
"""Resign leadership."""
if self.is_leader:
self.redis.zrem(self.election_key, self.candidate_id)
self.is_leader = False
etcd-Based Leader Election
class EtcdLeaderElection:
"""Leader election using etcd compare-and-swap."""
def __init__(self, etcd_client, election_name, candidate_id):
self.etcd = etcd_client
self.election_path = f"/election/{election_name}"
self.candidate_id = candidate_id
self.leader_key = f"{self.election_path}/leader"
self.is_leader = False
def campaign(self):
"""Campaign for leadership."""
# Create ephemeral key with candidate ID
try:
self.etcd.put(
self.leader_key,
self.candidate_id,
prev_kv=False # Don't check for existence
)
self.is_leader = True
return True
except etcd3.exceptions.PreconditionFailed:
return self.become_follower()
def become_follower(self):
"""Watch for leader changes and wait."""
self.is_leader = False
while True:
# Watch for leader changes
events = self.etcd.watch(self.leader_key)
for event:
if isinstance(event, etcd3.events.PutEvent):
# New leader elected
leader = event.value.decode()
if leader != self.candidate_id:
return False
def get_leader(self):
"""Get current leader."""
try:
result = self.etcd.get(self.leader_key)
if result[0]:
return result[0].decode()
except:
pass
return None
def resign(self):
"""Resign leadership."""
if self.is_leader:
try:
self.etcd.delete(self.leader_key)
except:
pass
self.is_leader = False
Distributed Semaphore
Redis Semaphore
import redis
import time
import uuid
class RedisSemaphore:
def __init__(self, redis_client, key, max_permits):
self.redis = redis_client
self.key = f"semaphore:{key}"
self.max_permits = max_permits
self.token = str(uuid.uuid4())
def acquire(self, blocking=True, timeout=30):
"""Acquire semaphore permit."""
start_time = time.time()
while True:
# Add to sorted set with current time as score
score = time.time()
self.redis.zadd(self.key, {self.token: score})
# Count permits
count = self.redis.zcard(self.key)
if count <= self.max_permits:
# Successfully acquired
return True
if not blocking:
self.redis.zrem(self.key, self.token)
return False
# Check timeout
elapsed = time.time() - start_time
if elapsed >= timeout:
self.redis.zrem(self.key, self.token)
return False
# Remove my token and wait
self.redis.zrem(self.key, self.token)
time.sleep(0.01)
def release(self):
"""Release semaphore permit."""
result = self.redis.zrem(self.key, self.token)
return result > 0
def get_permits(self):
"""Get number of available permits."""
count = self.redis.zcard(self.key)
return max(0, self.max_permits - count)
Practical Patterns
Lock with Automatic Cleanup
import signal
import atexit
class AutoCleanupLock:
def __init__(self, redis_client, key, timeout=30):
self.redis = redis_client
self.key = f"lock:{key}"
self.timeout = timeout
self.token = str(uuid.uuid4())
self.acquired = False
# Register cleanup handlers
atexit.register(self._cleanup)
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
self.release()
exit(1)
def _cleanup(self):
if self.acquired:
self.release()
def acquire(self):
result = self.redis.set(
self.key,
self.token,
nx=True,
ex=self.timeout
)
self.acquired = bool(result)
return self.acquired
def release(self):
if not self.acquired:
return False
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(script, 1, self.key, self.token)
self.acquired = False
return result == 1
Distributed Lock for Job Processing
import redis
import time
class JobProcessor:
def __init__(self, redis_client, job_name):
self.redis = redis_client
self.job_name = job_name
self.lock_timeout = 300 # 5 minutes
def try_process(self, job_id, processor_func):
"""Try to process a job with distributed locking."""
lock_key = f"job:{self.job_name}:{job_id}"
with RedisLock(self.redis, lock_key, timeout=self.lock_timeout) as lock:
# Check if job already processed
if self.redis.get(f"completed:{job_id}"):
return {'status': 'already_processed'}
# Process the job
result = processor_func()
# Mark as completed
self.redis.setex(f"completed:{job_id}", 86400, 'done')
return {'status': 'processed', 'result': result}
def claim_job(self, job_ids):
"""Atomically claim an unprocessed job."""
for job_id in job_ids:
lock_key = f"job:{self.job_name}:{job_id}"
# Try to atomically claim
claimed = self.redis.set(
f"claimed:{job_id}",
socket.gethostname(),
nx=True,
ex=300
)
if claimed:
return job_id
return None
Monitoring and Debugging
import logging
from prometheus_client import Counter, Gauge
lock_acquisitions = Counter(
'dist_lock_acquisitions_total',
'Total lock acquisitions',
['lock_name', 'result']
)
lock_wait_time = Histogram(
'dist_lock_wait_seconds',
'Lock wait time',
['lock_name']
)
active_locks = Gauge(
'dist_locks_active',
'Active locks',
['lock_name']
)
class InstrumentedRedisLock(RedisLock):
def acquire(self):
start_time = time.time()
result = super().acquire()
wait_time = time.time() - start_time
lock_wait_time.labels(lock_name=self.key).observe(wait_time)
lock_acquisitions.labels(
lock_name=self.key,
result='success' if result else 'failed'
).inc()
if result:
active_locks.labels(lock_name=self.key).inc(1)
return result
def release(self):
result = super().release()
if result:
active_locks.labels(lock_name=self.key).dec(1)
return result
Best Practices
- Always Use Timeouts: Prevent deadlocks with automatic expiration
- Use Unique Tokens: Ensure only lock owner can release
- Implement Retry Logic: Handle transient failures gracefully
- Monitor Lock Metrics: Track acquisition times and failures
- Use Redlock for Critical Systems: Multiple Redis instances provide better guarantees
- Keep Critical Sections Short: Minimize lock hold time
- Handle Failures Gracefully: Implement circuit breakers for lock services
Conclusion
Distributed locks are essential for coordinating access to shared resources in distributed systems. Start with simple Redis-based locks for most use cases, and graduate to Redlock or etcd-based solutions for critical systems requiring higher availability. Always implement proper timeouts, monitoring, and cleanup handlers to ensure system reliability.
Comments