Multiprocessing and Process Management in Python: Parallel Processing for Performance
Python’s Global Interpreter Lock (GIL) prevents threads from executing Python bytecode in parallel. For CPU-bound tasks, this means threading provides no performance benefit. Multiprocessing solves this problem by using separate processes, each with its own Python interpreter and GIL.
Multiprocessing enables true parallelism on multi-core systems, but it comes with complexity. Processes don’t share memory, requiring explicit communication. Process creation has overhead. Debugging is harder. Yet for the right problems, multiprocessing can dramatically improve performance.
This guide explores multiprocessing comprehensively, showing you how to use it effectively in production applications.
Understanding Multiprocessing vs Multithreading
The GIL Problem
Python’s Global Interpreter Lock prevents multiple threads from executing Python bytecode simultaneously:
import threading
import time
def cpu_bound_task():
"""CPU-intensive task"""
total = 0
for i in range(100_000_000):
total += i
return total
# Single-threaded
start = time.time()
cpu_bound_task()
cpu_bound_task()
single_time = time.time() - start
print(f"Single-threaded: {single_time:.2f}s")
# Multi-threaded (slower due to GIL!)
start = time.time()
t1 = threading.Thread(target=cpu_bound_task)
t2 = threading.Thread(target=cpu_bound_task)
t1.start()
t2.start()
t1.join()
t2.join()
multi_time = time.time() - start
print(f"Multi-threaded: {multi_time:.2f}s")
# Output (approximate):
# Single-threaded: 8.5s
# Multi-threaded: 10.2s (slower!)
Multiprocessing Solution
Multiprocessing creates separate processes, each with its own GIL:
import multiprocessing
import time
def cpu_bound_task():
"""CPU-intensive task"""
total = 0
for i in range(100_000_000):
total += i
return total
# Multi-process (true parallelism!)
start = time.time()
p1 = multiprocessing.Process(target=cpu_bound_task)
p2 = multiprocessing.Process(target=cpu_bound_task)
p1.start()
p2.start()
p1.join()
p2.join()
multi_time = time.time() - start
print(f"Multi-process: {multi_time:.2f}s")
# Output (approximate):
# Multi-process: 4.5s (2x faster on dual-core!)
Comparison
| Aspect | Threading | Multiprocessing |
|---|---|---|
| GIL Impact | Significant (no parallelism for CPU-bound) | None (true parallelism) |
| Memory | Shared (easy communication) | Separate (explicit communication) |
| Overhead | Low | High (process creation) |
| Best For | I/O-bound tasks | CPU-bound tasks |
| Debugging | Easier | Harder |
| Synchronization | Locks, events | Queues, pipes, managers |
Core Multiprocessing Components
Process
The Process class represents a single process:
import multiprocessing
import time
def worker(name, duration):
"""A worker function"""
print(f"{name} starting")
time.sleep(duration)
print(f"{name} done")
# Create a process
process = multiprocessing.Process(target=worker, args=("Worker-1", 2))
# Start the process
process.start()
# Wait for the process to finish
process.join()
print("Main process done")
Creating Multiple Processes
import multiprocessing
def worker(worker_id):
print(f"Worker {worker_id} starting")
# Do work...
print(f"Worker {worker_id} done")
# Create multiple processes
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
# Wait for all processes
for p in processes:
p.join()
print("All workers done")
Process Subclassing
import multiprocessing
class Worker(multiprocessing.Process):
"""Custom process class"""
def __init__(self, worker_id):
super().__init__()
self.worker_id = worker_id
def run(self):
"""Override run() to define process behavior"""
print(f"Worker {self.worker_id} starting")
# Do work...
print(f"Worker {self.worker_id} done")
# Create and start processes
processes = [Worker(i) for i in range(3)]
for p in processes:
p.start()
# Wait for all processes
for p in processes:
p.join()
Pool
A Pool manages a group of worker processes:
import multiprocessing
def square(x):
"""Square a number"""
return x ** 2
# Create a pool with 4 workers
with multiprocessing.Pool(processes=4) as pool:
# Map function across data
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Pool is automatically closed and joined
Pool Methods
import multiprocessing
import time
def slow_function(x):
"""A slow function"""
time.sleep(1)
return x ** 2
with multiprocessing.Pool(processes=4) as pool:
# map: Apply function to each item
results = pool.map(slow_function, range(4))
print(f"map: {results}")
# imap: Lazy iterator
results = pool.imap(slow_function, range(4))
for result in results:
print(f"imap: {result}")
# apply_async: Asynchronous execution
result = pool.apply_async(slow_function, (5,))
print(f"apply_async: {result.get()}")
# map_async: Asynchronous map
results = pool.map_async(slow_function, range(4))
print(f"map_async: {results.get()}")
Inter-Process Communication
Queue
A Queue allows processes to communicate safely:
import multiprocessing
import time
def producer(queue):
"""Produce items"""
for i in range(5):
print(f"Producing {i}")
queue.put(i)
time.sleep(0.5)
queue.put(None) # Sentinel value to signal end
def consumer(queue):
"""Consume items"""
while True:
item = queue.get()
if item is None:
break
print(f"Consuming {item}")
# Create a queue
queue = multiprocessing.Queue()
# Create producer and consumer processes
producer_process = multiprocessing.Process(target=producer, args=(queue,))
consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
producer_process.start()
consumer_process.start()
producer_process.join()
consumer_process.join()
print("Done")
Pipe
A Pipe provides two-way communication between two processes:
import multiprocessing
def process1(conn):
"""First process"""
conn.send("Hello from process 1")
message = conn.recv()
print(f"Process 1 received: {message}")
conn.close()
def process2(conn):
"""Second process"""
message = conn.recv()
print(f"Process 2 received: {message}")
conn.send("Hello from process 2")
conn.close()
# Create a pipe
parent_conn, child_conn = multiprocessing.Pipe()
# Create processes
p1 = multiprocessing.Process(target=process1, args=(parent_conn,))
p2 = multiprocessing.Process(target=process2, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
Manager
A Manager provides shared data structures across processes:
import multiprocessing
def worker(shared_list, shared_dict, worker_id):
"""Worker that modifies shared data"""
shared_list.append(worker_id)
shared_dict[f"worker_{worker_id}"] = f"Data from {worker_id}"
# Create a manager
with multiprocessing.Manager() as manager:
# Create shared data structures
shared_list = manager.list()
shared_dict = manager.dict()
# Create processes
processes = []
for i in range(4):
p = multiprocessing.Process(
target=worker,
args=(shared_list, shared_dict, i)
)
processes.append(p)
p.start()
# Wait for all processes
for p in processes:
p.join()
# Access shared data
print(f"Shared list: {list(shared_list)}")
print(f"Shared dict: {dict(shared_dict)}")
Practical Examples
Example 1: Parallel Data Processing
import multiprocessing
import time
def process_chunk(chunk):
"""Process a chunk of data"""
time.sleep(1) # Simulate work
return sum(chunk)
def parallel_sum(data, num_processes=4):
"""Sum data in parallel"""
# Split data into chunks
chunk_size = len(data) // num_processes
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
# Process chunks in parallel
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(process_chunk, chunks)
return sum(results)
# Test
data = list(range(1000))
start = time.time()
result = parallel_sum(data, num_processes=4)
elapsed = time.time() - start
print(f"Result: {result}")
print(f"Time: {elapsed:.2f}s")
Example 2: Producer-Consumer Pipeline
import multiprocessing
import time
import random
def producer(queue, num_items):
"""Produce items"""
for i in range(num_items):
item = random.randint(1, 100)
queue.put(item)
time.sleep(0.1)
# Signal end
for _ in range(4):
queue.put(None)
def worker(input_queue, output_queue, worker_id):
"""Process items"""
while True:
item = input_queue.get()
if item is None:
break
# Process item
result = item ** 2
output_queue.put((worker_id, result))
def consumer(queue, num_items):
"""Consume results"""
for _ in range(num_items):
worker_id, result = queue.get()
print(f"Worker {worker_id}: {result}")
# Create queues
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
# Create processes
producer_process = multiprocessing.Process(
target=producer,
args=(input_queue, 20)
)
worker_processes = [
multiprocessing.Process(
target=worker,
args=(input_queue, output_queue, i)
)
for i in range(4)
]
consumer_process = multiprocessing.Process(
target=consumer,
args=(output_queue, 20)
)
# Start all processes
producer_process.start()
for p in worker_processes:
p.start()
consumer_process.start()
# Wait for completion
producer_process.join()
for p in worker_processes:
p.join()
consumer_process.join()
Example 3: Parallel Web Scraping
import multiprocessing
import time
import urllib.request
def fetch_url(url):
"""Fetch a URL"""
try:
start = time.time()
response = urllib.request.urlopen(url, timeout=5)
elapsed = time.time() - start
return (url, response.status, elapsed)
except Exception as e:
return (url, None, str(e))
def parallel_fetch(urls, num_processes=4):
"""Fetch multiple URLs in parallel"""
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(fetch_url, urls)
return results
# Test
urls = [
'https://example.com',
'https://python.org',
'https://github.com',
'https://stackoverflow.com',
]
start = time.time()
results = parallel_fetch(urls, num_processes=4)
elapsed = time.time() - start
for url, status, time_taken in results:
print(f"{url}: {status} ({time_taken:.2f}s)")
print(f"Total time: {elapsed:.2f}s")
Process Management
Starting and Stopping Processes
import multiprocessing
import time
def long_running_task():
"""A long-running task"""
try:
for i in range(10):
print(f"Working... {i}")
time.sleep(1)
except KeyboardInterrupt:
print("Interrupted")
# Create and start process
process = multiprocessing.Process(target=long_running_task)
process.start()
# Let it run for a bit
time.sleep(3)
# Terminate the process
if process.is_alive():
process.terminate()
process.join(timeout=5)
# Force kill if still alive
if process.is_alive():
process.kill()
process.join()
print("Process stopped")
Process Status
import multiprocessing
import time
def worker():
time.sleep(2)
process = multiprocessing.Process(target=worker)
print(f"Before start: is_alive={process.is_alive()}")
process.start()
print(f"After start: is_alive={process.is_alive()}")
process.join()
print(f"After join: is_alive={process.is_alive()}")
print(f"Exit code: {process.exitcode}")
Daemon Processes
import multiprocessing
import time
def background_task():
"""A background task"""
for i in range(10):
print(f"Background: {i}")
time.sleep(0.5)
# Create a daemon process
process = multiprocessing.Process(target=background_task, daemon=True)
process.start()
# Main process exits after 2 seconds
time.sleep(2)
print("Main process exiting")
# Daemon process is killed when main process exits
Common Challenges and Solutions
Challenge 1: Pickling Issues
Processes communicate by pickling objects. Not all objects are picklable:
import multiprocessing
import threading
# This fails - threading.Lock is not picklable
def worker(lock):
with lock:
print("Working")
lock = threading.Lock()
process = multiprocessing.Process(target=worker, args=(lock,))
process.start() # Error: cannot pickle '_thread.lock' object
Solution: Use multiprocessing synchronization primitives:
import multiprocessing
def worker(lock):
with lock:
print("Working")
# Use multiprocessing.Lock instead
lock = multiprocessing.Lock()
process = multiprocessing.Process(target=worker, args=(lock,))
process.start()
process.join()
Challenge 2: Shared State
Processes don’t share memory. Modifying data in one process doesn’t affect others:
import multiprocessing
data = [1, 2, 3]
def worker(data):
data.append(4) # Modifies local copy only
print(f"Worker: {data}")
process = multiprocessing.Process(target=worker, args=(data,))
process.start()
process.join()
print(f"Main: {data}") # Still [1, 2, 3]
Solution: Use shared data structures:
import multiprocessing
def worker(shared_list):
shared_list.append(4) # Modifies shared data
print(f"Worker: {list(shared_list)}")
with multiprocessing.Manager() as manager:
shared_list = manager.list([1, 2, 3])
process = multiprocessing.Process(target=worker, args=(shared_list,))
process.start()
process.join()
print(f"Main: {list(shared_list)}") # [1, 2, 3, 4]
Challenge 3: Process Overhead
Creating processes has significant overhead:
import multiprocessing
import time
def quick_task(x):
return x ** 2
# Single-threaded (fast for quick tasks)
start = time.time()
results = [quick_task(i) for i in range(1000)]
single_time = time.time() - start
# Multi-process (slow for quick tasks due to overhead)
start = time.time()
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(quick_task, range(1000))
multi_time = time.time() - start
print(f"Single-threaded: {single_time:.4f}s")
print(f"Multi-process: {multi_time:.4f}s")
# Multi-process is slower due to process creation overhead
Solution: Use processes only for expensive tasks:
import multiprocessing
import time
def expensive_task(x):
"""Task that takes significant time"""
total = 0
for i in range(10_000_000):
total += i
return total + x
# Multi-process is faster for expensive tasks
start = time.time()
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(expensive_task, range(4))
multi_time = time.time() - start
print(f"Multi-process: {multi_time:.2f}s")
# Multi-process is faster because task time dominates overhead
Debugging Multiprocessing Applications
Logging
import multiprocessing
import logging
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(processName)s - %(levelname)s - %(message)s'
)
def worker(worker_id):
logging.info(f"Worker {worker_id} starting")
# Do work...
logging.info(f"Worker {worker_id} done")
# Create processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,), name=f"Worker-{i}")
processes.append(p)
p.start()
# Wait for all processes
for p in processes:
p.join()
Exception Handling
import multiprocessing
import traceback
def worker_with_error(worker_id):
try:
if worker_id == 1:
raise ValueError("Intentional error")
print(f"Worker {worker_id} completed")
except Exception as e:
print(f"Worker {worker_id} error: {e}")
traceback.print_exc()
# Create processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker_with_error, args=(i,))
processes.append(p)
p.start()
# Wait for all processes
for p in processes:
p.join()
Best Practices
1. Use if name == ‘main’
import multiprocessing
def worker():
print("Worker running")
# Always protect process creation
if __name__ == '__main__':
process = multiprocessing.Process(target=worker)
process.start()
process.join()
2. Use Context Managers
import multiprocessing
def worker(x):
return x ** 2
# Good: Automatic cleanup
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(worker, range(10))
3. Choose the Right Start Method
import multiprocessing
# Different start methods have different characteristics
# 'spawn': Safe but slower (default on Windows)
# 'fork': Fast but can cause issues (default on Unix)
# 'forkserver': Compromise
if __name__ == '__main__':
ctx = multiprocessing.get_context('spawn')
process = ctx.Process(target=lambda: print("Hello"))
process.start()
process.join()
4. Limit Pool Size
import multiprocessing
import os
if __name__ == '__main__':
# Use number of CPU cores
num_processes = os.cpu_count()
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(lambda x: x ** 2, range(100))
5. Handle Timeouts
import multiprocessing
def slow_task(x):
import time
time.sleep(5)
return x ** 2
if __name__ == '__main__':
with multiprocessing.Pool(processes=4) as pool:
# Set timeout for map_async
result = pool.map_async(slow_task, range(4))
try:
results = result.get(timeout=3)
except multiprocessing.TimeoutError:
print("Task timed out")
pool.terminate()
When to Use Multiprocessing
Good Use Cases
- CPU-bound tasks: Computations, data processing, image manipulation
- Parallel I/O: Multiple network requests, file operations
- Independent tasks: Tasks that don’t need frequent communication
- Performance critical: When speed is essential
Bad Use Cases
- Quick tasks: Overhead exceeds benefit
- Frequent communication: Overhead of inter-process communication
- Shared state: Complex synchronization needed
- I/O-bound with few tasks: Threading is simpler
Conclusion
Multiprocessing enables true parallelism in Python, overcoming the GIL limitation for CPU-bound tasks. However, it comes with complexity: process overhead, communication challenges, and debugging difficulty.
Key takeaways:
- Multiprocessing provides true parallelism for CPU-bound tasks
- Process class creates individual processes
- Pool manages multiple worker processes efficiently
- Queue and Pipe enable inter-process communication
- Manager provides shared data structures
- Process overhead is significantโuse only for expensive tasks
- Pickling limits what can be passed between processes
- Debugging requires careful logging and error handling
- Best practices include using context managers, protecting with
if __name__ == '__main__', and choosing appropriate start methods
Start with threading for I/O-bound tasks and multiprocessing for CPU-bound tasks. Use high-level abstractions like Pool before implementing low-level process management. Test thoroughly, as concurrency bugs can be subtle and intermittent. With these principles, you can build high-performance Python applications that effectively utilize multi-core systems.
Comments