Skip to main content
โšก Calmops

Threading Basics and Thread Safety in Python: Building Safe Concurrent Applications

Threading Basics and Thread Safety in Python: Building Safe Concurrent Applications

Threading allows Python programs to do multiple things concurrently. A web server can handle multiple client connections simultaneously. A GUI application can remain responsive while performing background tasks. A data processing pipeline can read, process, and write data in parallel.

Yet threading is notoriously tricky. Race conditions, deadlocks, and data corruption lurk in concurrent code. Many developers avoid threading altogether, missing opportunities for responsive, efficient applications. Others implement threading incorrectly, introducing subtle bugs that only appear under specific conditions.

This guide explores threading comprehensively, showing you how to implement concurrent applications safely and effectively.

Understanding Threading Fundamentals

What is a Thread?

A thread is a lightweight unit of execution within a process. Unlike processes, threads share the same memory space, making communication between threads easy but requiring careful synchronization.

Process
โ”œโ”€โ”€ Thread 1 (Main thread)
โ”œโ”€โ”€ Thread 2
โ”œโ”€โ”€ Thread 3
โ””โ”€โ”€ Shared Memory (heap, global variables)

Threads vs Processes

Aspect Threads Processes
Memory Shared Separate
Creation Fast Slow
Communication Easy (shared memory) Complex (IPC)
Isolation Low High
Overhead Low High
GIL Impact Significant None

When to Use Threading

Threading is ideal for I/O-bound tasks:

  • Network requests (waiting for server response)
  • File I/O (waiting for disk)
  • Database queries (waiting for database)
  • User input (waiting for user)

Threading is not ideal for CPU-bound tasks:

  • Mathematical computations
  • Data processing
  • Image manipulation
  • Cryptography

For CPU-bound tasks, use multiprocessing instead.

The Global Interpreter Lock (GIL)

Python’s most important threading limitation is the Global Interpreter Lock (GIL).

What is the GIL?

The GIL is a mutex that protects access to Python objects in CPython. Only one thread can execute Python bytecode at a time, even on multi-core systems.

import threading
import time

def cpu_bound_task():
    """A CPU-intensive task"""
    total = 0
    for i in range(100_000_000):
        total += i
    return total

# Single-threaded
start = time.time()
cpu_bound_task()
cpu_bound_task()
single_time = time.time() - start
print(f"Single-threaded: {single_time:.2f}s")

# Multi-threaded (slower due to GIL!)
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
multi_time = time.time() - start
print(f"Multi-threaded: {multi_time:.2f}s")

# Output (approximate):
# Single-threaded: 8.5s
# Multi-threaded: 10.2s (slower due to GIL overhead!)

Why the GIL Exists

The GIL simplifies CPython’s memory management. Without it, every object would need its own lock, causing massive overhead and complexity.

GIL Impact on I/O-Bound Tasks

The GIL is released during I/O operations, allowing other threads to run:

import threading
import time
import urllib.request

def fetch_url(url):
    """Fetch a URL (I/O-bound)"""
    try:
        urllib.request.urlopen(url, timeout=5)
        print(f"Fetched {url}")
    except:
        print(f"Failed to fetch {url}")

urls = [
    'https://example.com',
    'https://python.org',
    'https://github.com',
]

# Single-threaded (slow)
start = time.time()
for url in urls:
    fetch_url(url)
single_time = time.time() - start
print(f"Single-threaded: {single_time:.2f}s")

# Multi-threaded (fast - GIL released during I/O)
start = time.time()
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()
multi_time = time.time() - start
print(f"Multi-threaded: {multi_time:.2f}s")

# Output (approximate):
# Single-threaded: 9.5s (sequential)
# Multi-threaded: 3.2s (parallel)

Creating and Managing Threads

Basic Thread Creation

import threading

def worker(name, count):
    """A simple worker function"""
    for i in range(count):
        print(f"{name}: {i}")

# Create a thread
thread = threading.Thread(target=worker, args=("Worker-1", 3))

# Start the thread
thread.start()

# Wait for the thread to finish
thread.join()

print("Done")

Creating Multiple Threads

import threading

def worker(worker_id):
    print(f"Worker {worker_id} starting")
    # Do work...
    print(f"Worker {worker_id} done")

# Create multiple threads
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

# Wait for all threads to finish
for t in threads:
    t.join()

print("All workers done")

Thread Subclassing

import threading

class Worker(threading.Thread):
    """A custom thread class"""
    
    def __init__(self, worker_id):
        super().__init__()
        self.worker_id = worker_id
    
    def run(self):
        """Override run() to define thread behavior"""
        print(f"Worker {self.worker_id} starting")
        # Do work...
        print(f"Worker {self.worker_id} done")

# Create and start threads
threads = [Worker(i) for i in range(3)]
for t in threads:
    t.start()

# Wait for all threads
for t in threads:
    t.join()

Daemon Threads

Daemon threads run in the background and don’t prevent the program from exiting:

import threading
import time

def background_task():
    """A background task"""
    for i in range(10):
        print(f"Background: {i}")
        time.sleep(0.5)

# Create a daemon thread
thread = threading.Thread(target=background_task, daemon=True)
thread.start()

# Main thread exits after 2 seconds
time.sleep(2)
print("Main thread exiting")
# Background thread is killed when main thread exits

Race Conditions and Shared State

The Problem: Race Conditions

A race condition occurs when multiple threads access shared data concurrently, and at least one modifies it:

import threading

# Shared counter
counter = 0

def increment():
    """Increment the counter 1 million times"""
    global counter
    for _ in range(1_000_000):
        counter += 1

# Create two threads
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

t1.start()
t2.start()

t1.join()
t2.join()

print(f"Counter: {counter}")
# Expected: 2,000,000
# Actual: ~1,500,000 (varies each run!)

Why This Happens

The operation counter += 1 is not atomic. It consists of three steps:

  1. Read the current value of counter
  2. Add 1 to it
  3. Write the new value back

With two threads, these steps can interleave:

Thread 1: Read counter (0)
Thread 2: Read counter (0)
Thread 1: Add 1 (1)
Thread 2: Add 1 (1)
Thread 1: Write counter (1)
Thread 2: Write counter (1)
Result: counter = 1 (should be 2!)

Thread Synchronization Primitives

Lock (Mutex)

A lock ensures only one thread can execute a critical section at a time:

import threading

counter = 0
lock = threading.Lock()

def increment():
    """Increment the counter safely"""
    global counter
    for _ in range(1_000_000):
        with lock:  # Acquire lock
            counter += 1
        # Lock is released here

# Create two threads
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

t1.start()
t2.start()

t1.join()
t2.join()

print(f"Counter: {counter}")
# Output: 2,000,000 (correct!)

Lock Patterns

import threading

lock = threading.Lock()

# Pattern 1: with statement (recommended)
with lock:
    # Critical section
    pass

# Pattern 2: Manual acquire/release
lock.acquire()
try:
    # Critical section
    pass
finally:
    lock.release()

RLock (Reentrant Lock)

An RLock can be acquired multiple times by the same thread:

import threading

rlock = threading.RLock()

def recursive_function(n):
    with rlock:
        if n <= 0:
            return
        print(f"Level {n}")
        recursive_function(n - 1)

# This works with RLock
recursive_function(3)

# With regular Lock, this would deadlock
# lock = threading.Lock()
# lock.acquire()
# lock.acquire()  # Deadlock! Same thread can't acquire twice

Semaphore

A semaphore maintains a counter, allowing a limited number of threads:

import threading
import time

# Allow 2 threads at a time
semaphore = threading.Semaphore(2)

def worker(worker_id):
    with semaphore:
        print(f"Worker {worker_id} acquired semaphore")
        time.sleep(1)
        print(f"Worker {worker_id} released semaphore")

# Create 5 threads
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Output shows only 2 workers running at a time

Event

An event allows threads to signal each other:

import threading
import time

event = threading.Event()

def waiter():
    print("Waiter: waiting for event...")
    event.wait()  # Block until event is set
    print("Waiter: event received!")

def signaler():
    time.sleep(2)
    print("Signaler: setting event...")
    event.set()

t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=signaler)

t1.start()
t2.start()

t1.join()
t2.join()

# Output:
# Waiter: waiting for event...
# Signaler: setting event...
# Waiter: event received!

Condition Variable

A condition variable allows threads to wait for a specific condition:

import threading
import time

condition = threading.Condition()
data = []

def producer():
    global data
    for i in range(5):
        time.sleep(0.5)
        with condition:
            data.append(i)
            print(f"Produced: {i}")
            condition.notify_all()  # Wake up consumers

def consumer(consumer_id):
    global data
    while True:
        with condition:
            condition.wait_for(lambda: len(data) > 0)  # Wait for data
            if data:
                item = data.pop(0)
                print(f"Consumer {consumer_id} consumed: {item}")

# Create producer and consumers
producer_thread = threading.Thread(target=producer)
consumer_threads = [threading.Thread(target=consumer, args=(i,)) for i in range(2)]

producer_thread.start()
for t in consumer_threads:
    t.start()

producer_thread.join()

Thread-Safe Data Structures

Queue (Thread-Safe)

import threading
import queue
import time

# Thread-safe queue
q = queue.Queue(maxsize=10)

def producer():
    for i in range(5):
        q.put(i)
        print(f"Produced: {i}")
        time.sleep(0.5)

def consumer():
    while True:
        item = q.get()
        if item is None:  # Sentinel value
            break
        print(f"Consumed: {item}")
        q.task_done()

# Create threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
q.put(None)  # Signal consumer to stop
consumer_thread.join()

Thread-Local Storage

import threading

# Thread-local storage
thread_local = threading.local()

def worker(worker_id):
    # Each thread has its own value
    thread_local.value = worker_id
    print(f"Worker {worker_id}: {thread_local.value}")

threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Output:
# Worker 0: 0
# Worker 1: 1
# Worker 2: 2

Practical Example: Thread-Safe Counter

import threading

class ThreadSafeCounter:
    """A thread-safe counter"""
    
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        """Increment the counter"""
        with self._lock:
            self._value += 1
    
    def decrement(self):
        """Decrement the counter"""
        with self._lock:
            self._value -= 1
    
    def get(self):
        """Get the current value"""
        with self._lock:
            return self._value

# Usage
counter = ThreadSafeCounter()

def worker():
    for _ in range(100_000):
        counter.increment()

threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"Final count: {counter.get()}")  # 500,000 (correct!)

Common Pitfalls and How to Avoid Them

Pitfall 1: Forgetting to Lock

# Bad: No lock
class UnsafeCounter:
    def __init__(self):
        self.value = 0
    
    def increment(self):
        self.value += 1

# Good: With lock
class SafeCounter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:
            self.value += 1

Pitfall 2: Holding Locks Too Long

# Bad: Lock held during I/O
with lock:
    data = fetch_from_network()  # Slow!
    process_data(data)

# Good: Lock only for shared state
data = fetch_from_network()  # No lock needed
with lock:
    process_data(data)  # Lock only for critical section

Pitfall 3: Deadlock

# Bad: Potential deadlock
lock1 = threading.Lock()
lock2 = threading.Lock()

def thread1_func():
    with lock1:
        time.sleep(0.1)
        with lock2:  # Might wait forever
            pass

def thread2_func():
    with lock2:
        time.sleep(0.1)
        with lock1:  # Might wait forever
            pass

# Good: Always acquire locks in the same order
def thread1_func():
    with lock1:
        with lock2:
            pass

def thread2_func():
    with lock1:
        with lock2:
            pass

Pitfall 4: Modifying Shared Data Without Locking

# Bad: Modifying list without lock
shared_list = []

def add_item(item):
    shared_list.append(item)  # Not atomic!

# Good: Protect with lock
shared_list = []
lock = threading.Lock()

def add_item(item):
    with lock:
        shared_list.append(item)

Best Practices

1. Use High-Level Abstractions

# Instead of manual threading, use ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor

def process_item(item):
    return item * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(process_item, range(10))
    for result in results:
        print(result)

2. Minimize Lock Scope

# Bad: Large critical section
with lock:
    data = fetch_data()
    processed = process_data(data)
    save_data(processed)

# Good: Small critical section
data = fetch_data()
processed = process_data(data)
with lock:
    save_data(processed)

3. Use Thread-Safe Data Structures

# Bad: Manual synchronization
data = []
lock = threading.Lock()

# Good: Use queue
from queue import Queue
data = Queue()

4. Document Thread Safety

class DataStore:
    """
    Thread-safe data store.
    
    All public methods are thread-safe and can be called
    from multiple threads concurrently.
    """
    
    def __init__(self):
        self._data = {}
        self._lock = threading.Lock()
    
    def get(self, key):
        """Get a value (thread-safe)"""
        with self._lock:
            return self._data.get(key)

5. Test for Race Conditions

import threading
import time

def test_counter_thread_safety():
    """Test that counter is thread-safe"""
    counter = ThreadSafeCounter()
    
    def worker():
        for _ in range(10_000):
            counter.increment()
    
    # Run multiple times to catch race conditions
    for _ in range(10):
        counter = ThreadSafeCounter()
        threads = [threading.Thread(target=worker) for _ in range(5)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        
        assert counter.get() == 50_000, "Race condition detected!"
    
    print("All tests passed!")

test_counter_thread_safety()

Threading vs Multiprocessing

When to Use Threading

  • I/O-bound tasks (network, file, database)
  • Responsive user interfaces
  • Server handling multiple clients
  • Tasks that benefit from shared memory

When to Use Multiprocessing

  • CPU-bound tasks (computation, data processing)
  • Tasks that need true parallelism
  • Tasks that need isolation
  • When GIL is a bottleneck
import threading
import multiprocessing
import time

def cpu_task(n):
    """CPU-intensive task"""
    total = 0
    for i in range(n):
        total += i
    return total

# Threading (slow for CPU-bound)
start = time.time()
threads = [threading.Thread(target=cpu_task, args=(50_000_000,)) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
threading_time = time.time() - start

# Multiprocessing (fast for CPU-bound)
start = time.time()
processes = [multiprocessing.Process(target=cpu_task, args=(50_000_000,)) for _ in range(2)]
for p in processes:
    p.start()
for p in processes:
    p.join()
multiprocessing_time = time.time() - start

print(f"Threading: {threading_time:.2f}s")
print(f"Multiprocessing: {multiprocessing_time:.2f}s")
# Multiprocessing is typically 2-3x faster for CPU-bound tasks

Conclusion

Threading is a powerful tool for building responsive, efficient Python applications. However, it requires careful attention to thread safety and synchronization.

Key takeaways:

  • Threading is ideal for I/O-bound tasks, not CPU-bound tasks
  • The GIL limits Python threading for CPU-bound work
  • Race conditions occur when multiple threads access shared data unsafely
  • Locks protect critical sections and prevent race conditions
  • Synchronization primitives (locks, semaphores, events) coordinate threads
  • Thread-safe data structures like queues simplify concurrent programming
  • Best practices include minimizing lock scope, using high-level abstractions, and testing thoroughly
  • Multiprocessing is better for CPU-bound tasks

Start with high-level abstractions like ThreadPoolExecutor and queue.Queue. Only use low-level threading primitives when necessary. Always test concurrent code thoroughly, as race conditions can be subtle and intermittent. With these principles in mind, you can build safe, efficient multi-threaded Python applications.

Comments