Skip to main content
โšก Calmops

Process Management & Subprocess (Advanced): Mastering System Process Control

Process Management & Subprocess (Advanced): Mastering System Process Control

Advanced process management is crucial for building robust automation scripts, system administration tools, and distributed applications. This guide covers practical patterns for controlling and monitoring system processes.

Subprocess Fundamentals

Basic Process Execution

import subprocess
import sys

def run_command(command, timeout=30):
    """Execute command and capture output."""
    try:
        result = subprocess.run(
            command,
            shell=True,
            capture_output=True,
            text=True,
            timeout=timeout
        )
        
        return {
            'returncode': result.returncode,
            'stdout': result.stdout,
            'stderr': result.stderr,
            'success': result.returncode == 0
        }
    
    except subprocess.TimeoutExpired:
        return {
            'returncode': -1,
            'stdout': '',
            'stderr': f'Command timed out after {timeout}s',
            'success': False
        }
    
    except Exception as e:
        return {
            'returncode': -1,
            'stdout': '',
            'stderr': str(e),
            'success': False
        }

# Usage
result = run_command('echo "Hello, World!"')
print(f"Output: {result['stdout']}")
print(f"Success: {result['success']}")

Process with Pipes

def pipe_commands(commands):
    """Execute piped commands."""
    processes = []
    
    for i, cmd in enumerate(commands):
        if i == 0:
            # First command
            p = subprocess.Popen(
                cmd,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
        else:
            # Pipe from previous command
            p = subprocess.Popen(
                cmd,
                shell=True,
                stdin=processes[-1].stdout,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            processes[-1].stdout.close()
        
        processes.append(p)
    
    # Get final output
    stdout, stderr = processes[-1].communicate()
    
    return stdout, stderr

# Usage
commands = [
    'echo "line1\nline2\nline3"',
    'grep line',
    'wc -l'
]
stdout, stderr = pipe_commands(commands)
print(f"Output: {stdout}")

Real-time Output Streaming

def stream_process_output(command):
    """Stream process output in real-time."""
    process = subprocess.Popen(
        command,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1
    )
    
    for line in process.stdout:
        print(line.rstrip())
    
    process.wait()
    return process.returncode

# Usage
returncode = stream_process_output('ls -la')

Process Pools and Parallelization

Using ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import time

def cpu_intensive_task(n):
    """Simulate CPU-intensive work."""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

def process_with_pool(tasks, max_workers=4):
    """Process tasks using process pool."""
    results = []
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(cpu_intensive_task, task) for task in tasks]
        
        for i, future in enumerate(futures):
            try:
                result = future.result(timeout=30)
                results.append(result)
                print(f"Task {i} completed: {result}")
            except Exception as e:
                print(f"Task {i} failed: {e}")
    
    return results

# Usage
tasks = [1000000, 2000000, 3000000, 4000000]
results = process_with_pool(tasks, max_workers=2)

Custom Process Pool

from multiprocessing import Pool, cpu_count

def worker_function(item):
    """Worker function for pool."""
    return item ** 2

def custom_process_pool(items, num_processes=None):
    """Custom process pool with error handling."""
    if num_processes is None:
        num_processes = cpu_count()
    
    with Pool(processes=num_processes) as pool:
        try:
            results = pool.map(worker_function, items)
            return results
        except Exception as e:
            print(f"Pool error: {e}")
            return []

# Usage
items = list(range(10))
results = custom_process_pool(items)
print(f"Results: {results}")

Inter-Process Communication (IPC)

Using Queues

from multiprocessing import Process, Queue
import time

def producer(queue, num_items):
    """Producer process."""
    for i in range(num_items):
        item = f"Item {i}"
        queue.put(item)
        print(f"Produced: {item}")
        time.sleep(0.1)
    
    queue.put(None)  # Signal end

def consumer(queue):
    """Consumer process."""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"Consumed: {item}")

def producer_consumer_pattern():
    """Demonstrate producer-consumer pattern."""
    queue = Queue()
    
    p = Process(target=producer, args=(queue, 5))
    c = Process(target=consumer, args=(queue,))
    
    p.start()
    c.start()
    
    p.join()
    c.join()

# Usage
# producer_consumer_pattern()

Using Pipes

from multiprocessing import Process, Pipe

def sender(conn, messages):
    """Send messages through pipe."""
    for msg in messages:
        conn.send(msg)
        print(f"Sent: {msg}")
    conn.close()

def receiver(conn):
    """Receive messages through pipe."""
    while True:
        try:
            msg = conn.recv()
            print(f"Received: {msg}")
        except EOFError:
            break

def pipe_communication():
    """Demonstrate pipe communication."""
    parent_conn, child_conn = Pipe()
    
    messages = ["Hello", "World", "From", "Pipe"]
    
    p = Process(target=sender, args=(child_conn, messages))
    p.start()
    
    receiver(parent_conn)
    p.join()

# Usage
# pipe_communication()

Signal Handling

Graceful Shutdown

import signal
import time

class GracefulShutdown:
    """Handle graceful shutdown with signals."""
    
    def __init__(self):
        self.shutdown_requested = False
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)
    
    def _signal_handler(self, signum, frame):
        """Handle shutdown signals."""
        print(f"\nReceived signal {signum}")
        self.shutdown_requested = True
    
    def run(self):
        """Main loop with graceful shutdown."""
        print("Starting process (Ctrl+C to stop)...")
        
        while not self.shutdown_requested:
            print("Working...")
            time.sleep(1)
        
        print("Shutting down gracefully...")
        self.cleanup()
    
    def cleanup(self):
        """Cleanup resources."""
        print("Cleanup complete")

# Usage
# shutdown = GracefulShutdown()
# shutdown.run()

Process Timeout

import signal

class TimeoutError(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutError("Process timed out")

def run_with_timeout(func, timeout_seconds):
    """Run function with timeout."""
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout_seconds)
    
    try:
        result = func()
        signal.alarm(0)  # Cancel alarm
        return result
    except TimeoutError:
        print(f"Function timed out after {timeout_seconds}s")
        return None

# Usage
def long_running_task():
    time.sleep(5)
    return "Done"

# result = run_with_timeout(long_running_task, timeout_seconds=2)

System Resource Management

Monitor Process Resources

import psutil

def monitor_process(pid=None):
    """Monitor process resource usage."""
    if pid is None:
        pid = os.getpid()
    
    try:
        process = psutil.Process(pid)
        
        info = {
            'pid': process.pid,
            'name': process.name(),
            'status': process.status(),
            'cpu_percent': process.cpu_percent(interval=1),
            'memory_info': process.memory_info(),
            'memory_percent': process.memory_percent(),
            'num_threads': process.num_threads(),
            'open_files': len(process.open_files()),
            'connections': len(process.connections())
        }
        
        return info
    
    except psutil.NoSuchProcess:
        return None

# Usage
import os
info = monitor_process()
print(f"CPU: {info['cpu_percent']}%")
print(f"Memory: {info['memory_percent']:.2f}%")

Resource Limits

import resource

def set_resource_limits():
    """Set resource limits for process."""
    
    # Limit CPU time to 10 seconds
    resource.setrlimit(resource.RLIMIT_CPU, (10, 10))
    
    # Limit memory to 100MB
    resource.setrlimit(resource.RLIMIT_AS, (100 * 1024 * 1024, 100 * 1024 * 1024))
    
    # Limit open files
    resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024))
    
    print("Resource limits set")

def get_resource_limits():
    """Get current resource limits."""
    limits = {
        'CPU': resource.getrlimit(resource.RLIMIT_CPU),
        'Memory': resource.getrlimit(resource.RLIMIT_AS),
        'Open Files': resource.getrlimit(resource.RLIMIT_NOFILE)
    }
    
    for name, (soft, hard) in limits.items():
        print(f"{name}: soft={soft}, hard={hard}")
    
    return limits

# Usage
get_resource_limits()

Advanced Process Management

Process Pool with Timeout

from concurrent.futures import ProcessPoolExecutor, TimeoutError as FutureTimeoutError

def process_with_timeout(tasks, timeout=30, max_workers=4):
    """Process tasks with timeout."""
    results = []
    
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(cpu_intensive_task, task): task 
            for task in tasks
        }
        
        for future in futures:
            try:
                result = future.result(timeout=timeout)
                results.append(result)
            except FutureTimeoutError:
                print(f"Task timed out: {futures[future]}")
            except Exception as e:
                print(f"Task failed: {e}")
    
    return results

# Usage
tasks = [1000000, 2000000, 3000000]
results = process_with_timeout(tasks, timeout=5)

Process Monitoring

class ProcessMonitor:
    """Monitor and manage processes."""
    
    def __init__(self):
        self.processes = {}
    
    def start_process(self, name, target, args=()):
        """Start a managed process."""
        p = Process(target=target, args=args)
        p.start()
        self.processes[name] = p
        print(f"Started process: {name} (PID: {p.pid})")
    
    def stop_process(self, name, timeout=5):
        """Stop a managed process."""
        if name not in self.processes:
            return False
        
        p = self.processes[name]
        p.terminate()
        
        try:
            p.join(timeout=timeout)
            if p.is_alive():
                p.kill()
                p.join()
            print(f"Stopped process: {name}")
            return True
        except Exception as e:
            print(f"Error stopping process {name}: {e}")
            return False
    
    def get_status(self):
        """Get status of all processes."""
        status = {}
        for name, p in self.processes.items():
            status[name] = {
                'pid': p.pid,
                'alive': p.is_alive(),
                'exitcode': p.exitcode
            }
        return status

# Usage
# monitor = ProcessMonitor()
# monitor.start_process('worker', cpu_intensive_task, args=(1000000,))
# print(monitor.get_status())

Common Pitfalls and Best Practices

โŒ Bad: No Timeout

# DON'T: Run subprocess without timeout
result = subprocess.run(command, shell=True)

โœ… Good: Always Use Timeout

# DO: Always specify timeout
result = subprocess.run(command, shell=True, timeout=30)

โŒ Bad: Shell Injection

# DON'T: Use shell=True with user input
user_input = "file.txt; rm -rf /"
subprocess.run(f"cat {user_input}", shell=True)

โœ… Good: Avoid Shell Injection

# DO: Use list form without shell=True
subprocess.run(['cat', user_input])

โŒ Bad: Ignoring Process Cleanup

# DON'T: Leave processes running
p = subprocess.Popen(command, shell=True)

โœ… Good: Ensure Cleanup

# DO: Use context managers or ensure cleanup
with subprocess.Popen(command, shell=True) as p:
    p.wait()

Summary

Advanced process management requires:

  1. Proper subprocess execution with error handling
  2. Process pools for parallelization
  3. Inter-process communication for coordination
  4. Signal handling for graceful shutdown
  5. Resource monitoring and limits
  6. Timeout management to prevent hangs
  7. Security awareness to prevent injection attacks
  8. Proper cleanup to avoid resource leaks

These patterns ensure robust, efficient, and secure process management in production systems.

Comments