Process Management & Subprocess (Advanced): Mastering System Process Control
Advanced process management is crucial for building robust automation scripts, system administration tools, and distributed applications. This guide covers practical patterns for controlling and monitoring system processes.
Subprocess Fundamentals
Basic Process Execution
import subprocess
import sys
def run_command(command, timeout=30):
"""Execute command and capture output."""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return {
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
return {
'returncode': -1,
'stdout': '',
'stderr': f'Command timed out after {timeout}s',
'success': False
}
except Exception as e:
return {
'returncode': -1,
'stdout': '',
'stderr': str(e),
'success': False
}
# Usage
result = run_command('echo "Hello, World!"')
print(f"Output: {result['stdout']}")
print(f"Success: {result['success']}")
Process with Pipes
def pipe_commands(commands):
"""Execute piped commands."""
processes = []
for i, cmd in enumerate(commands):
if i == 0:
# First command
p = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
else:
# Pipe from previous command
p = subprocess.Popen(
cmd,
shell=True,
stdin=processes[-1].stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
processes[-1].stdout.close()
processes.append(p)
# Get final output
stdout, stderr = processes[-1].communicate()
return stdout, stderr
# Usage
commands = [
'echo "line1\nline2\nline3"',
'grep line',
'wc -l'
]
stdout, stderr = pipe_commands(commands)
print(f"Output: {stdout}")
Real-time Output Streaming
def stream_process_output(command):
"""Stream process output in real-time."""
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
for line in process.stdout:
print(line.rstrip())
process.wait()
return process.returncode
# Usage
returncode = stream_process_output('ls -la')
Process Pools and Parallelization
Using ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_intensive_task(n):
"""Simulate CPU-intensive work."""
total = 0
for i in range(n):
total += i ** 2
return total
def process_with_pool(tasks, max_workers=4):
"""Process tasks using process pool."""
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(cpu_intensive_task, task) for task in tasks]
for i, future in enumerate(futures):
try:
result = future.result(timeout=30)
results.append(result)
print(f"Task {i} completed: {result}")
except Exception as e:
print(f"Task {i} failed: {e}")
return results
# Usage
tasks = [1000000, 2000000, 3000000, 4000000]
results = process_with_pool(tasks, max_workers=2)
Custom Process Pool
from multiprocessing import Pool, cpu_count
def worker_function(item):
"""Worker function for pool."""
return item ** 2
def custom_process_pool(items, num_processes=None):
"""Custom process pool with error handling."""
if num_processes is None:
num_processes = cpu_count()
with Pool(processes=num_processes) as pool:
try:
results = pool.map(worker_function, items)
return results
except Exception as e:
print(f"Pool error: {e}")
return []
# Usage
items = list(range(10))
results = custom_process_pool(items)
print(f"Results: {results}")
Inter-Process Communication (IPC)
Using Queues
from multiprocessing import Process, Queue
import time
def producer(queue, num_items):
"""Producer process."""
for i in range(num_items):
item = f"Item {i}"
queue.put(item)
print(f"Produced: {item}")
time.sleep(0.1)
queue.put(None) # Signal end
def consumer(queue):
"""Consumer process."""
while True:
item = queue.get()
if item is None:
break
print(f"Consumed: {item}")
def producer_consumer_pattern():
"""Demonstrate producer-consumer pattern."""
queue = Queue()
p = Process(target=producer, args=(queue, 5))
c = Process(target=consumer, args=(queue,))
p.start()
c.start()
p.join()
c.join()
# Usage
# producer_consumer_pattern()
Using Pipes
from multiprocessing import Process, Pipe
def sender(conn, messages):
"""Send messages through pipe."""
for msg in messages:
conn.send(msg)
print(f"Sent: {msg}")
conn.close()
def receiver(conn):
"""Receive messages through pipe."""
while True:
try:
msg = conn.recv()
print(f"Received: {msg}")
except EOFError:
break
def pipe_communication():
"""Demonstrate pipe communication."""
parent_conn, child_conn = Pipe()
messages = ["Hello", "World", "From", "Pipe"]
p = Process(target=sender, args=(child_conn, messages))
p.start()
receiver(parent_conn)
p.join()
# Usage
# pipe_communication()
Signal Handling
Graceful Shutdown
import signal
import time
class GracefulShutdown:
"""Handle graceful shutdown with signals."""
def __init__(self):
self.shutdown_requested = False
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
print(f"\nReceived signal {signum}")
self.shutdown_requested = True
def run(self):
"""Main loop with graceful shutdown."""
print("Starting process (Ctrl+C to stop)...")
while not self.shutdown_requested:
print("Working...")
time.sleep(1)
print("Shutting down gracefully...")
self.cleanup()
def cleanup(self):
"""Cleanup resources."""
print("Cleanup complete")
# Usage
# shutdown = GracefulShutdown()
# shutdown.run()
Process Timeout
import signal
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Process timed out")
def run_with_timeout(func, timeout_seconds):
"""Run function with timeout."""
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
result = func()
signal.alarm(0) # Cancel alarm
return result
except TimeoutError:
print(f"Function timed out after {timeout_seconds}s")
return None
# Usage
def long_running_task():
time.sleep(5)
return "Done"
# result = run_with_timeout(long_running_task, timeout_seconds=2)
System Resource Management
Monitor Process Resources
import psutil
def monitor_process(pid=None):
"""Monitor process resource usage."""
if pid is None:
pid = os.getpid()
try:
process = psutil.Process(pid)
info = {
'pid': process.pid,
'name': process.name(),
'status': process.status(),
'cpu_percent': process.cpu_percent(interval=1),
'memory_info': process.memory_info(),
'memory_percent': process.memory_percent(),
'num_threads': process.num_threads(),
'open_files': len(process.open_files()),
'connections': len(process.connections())
}
return info
except psutil.NoSuchProcess:
return None
# Usage
import os
info = monitor_process()
print(f"CPU: {info['cpu_percent']}%")
print(f"Memory: {info['memory_percent']:.2f}%")
Resource Limits
import resource
def set_resource_limits():
"""Set resource limits for process."""
# Limit CPU time to 10 seconds
resource.setrlimit(resource.RLIMIT_CPU, (10, 10))
# Limit memory to 100MB
resource.setrlimit(resource.RLIMIT_AS, (100 * 1024 * 1024, 100 * 1024 * 1024))
# Limit open files
resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024))
print("Resource limits set")
def get_resource_limits():
"""Get current resource limits."""
limits = {
'CPU': resource.getrlimit(resource.RLIMIT_CPU),
'Memory': resource.getrlimit(resource.RLIMIT_AS),
'Open Files': resource.getrlimit(resource.RLIMIT_NOFILE)
}
for name, (soft, hard) in limits.items():
print(f"{name}: soft={soft}, hard={hard}")
return limits
# Usage
get_resource_limits()
Advanced Process Management
Process Pool with Timeout
from concurrent.futures import ProcessPoolExecutor, TimeoutError as FutureTimeoutError
def process_with_timeout(tasks, timeout=30, max_workers=4):
"""Process tasks with timeout."""
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(cpu_intensive_task, task): task
for task in tasks
}
for future in futures:
try:
result = future.result(timeout=timeout)
results.append(result)
except FutureTimeoutError:
print(f"Task timed out: {futures[future]}")
except Exception as e:
print(f"Task failed: {e}")
return results
# Usage
tasks = [1000000, 2000000, 3000000]
results = process_with_timeout(tasks, timeout=5)
Process Monitoring
class ProcessMonitor:
"""Monitor and manage processes."""
def __init__(self):
self.processes = {}
def start_process(self, name, target, args=()):
"""Start a managed process."""
p = Process(target=target, args=args)
p.start()
self.processes[name] = p
print(f"Started process: {name} (PID: {p.pid})")
def stop_process(self, name, timeout=5):
"""Stop a managed process."""
if name not in self.processes:
return False
p = self.processes[name]
p.terminate()
try:
p.join(timeout=timeout)
if p.is_alive():
p.kill()
p.join()
print(f"Stopped process: {name}")
return True
except Exception as e:
print(f"Error stopping process {name}: {e}")
return False
def get_status(self):
"""Get status of all processes."""
status = {}
for name, p in self.processes.items():
status[name] = {
'pid': p.pid,
'alive': p.is_alive(),
'exitcode': p.exitcode
}
return status
# Usage
# monitor = ProcessMonitor()
# monitor.start_process('worker', cpu_intensive_task, args=(1000000,))
# print(monitor.get_status())
Common Pitfalls and Best Practices
โ Bad: No Timeout
# DON'T: Run subprocess without timeout
result = subprocess.run(command, shell=True)
โ Good: Always Use Timeout
# DO: Always specify timeout
result = subprocess.run(command, shell=True, timeout=30)
โ Bad: Shell Injection
# DON'T: Use shell=True with user input
user_input = "file.txt; rm -rf /"
subprocess.run(f"cat {user_input}", shell=True)
โ Good: Avoid Shell Injection
# DO: Use list form without shell=True
subprocess.run(['cat', user_input])
โ Bad: Ignoring Process Cleanup
# DON'T: Leave processes running
p = subprocess.Popen(command, shell=True)
โ Good: Ensure Cleanup
# DO: Use context managers or ensure cleanup
with subprocess.Popen(command, shell=True) as p:
p.wait()
Summary
Advanced process management requires:
- Proper subprocess execution with error handling
- Process pools for parallelization
- Inter-process communication for coordination
- Signal handling for graceful shutdown
- Resource monitoring and limits
- Timeout management to prevent hangs
- Security awareness to prevent injection attacks
- Proper cleanup to avoid resource leaks
These patterns ensure robust, efficient, and secure process management in production systems.
Comments