System Administration with Python: Automation and Management
Python is ideal for system administration tasks, enabling automation of repetitive operations, system monitoring, and infrastructure management. This guide covers practical system administration scripting.
File and Directory Operations
Working with Paths
import os
from pathlib import Path
# Using os module
current_dir = os.getcwd()
print(f"Current directory: {current_dir}")
# Using pathlib (modern approach)
path = Path('/home/user/documents')
print(f"Path: {path}")
print(f"Exists: {path.exists()}")
print(f"Is directory: {path.is_dir()}")
print(f"Is file: {path.is_file()}")
# Path operations
home = Path.home()
documents = home / 'documents' # Join paths
print(f"Documents path: {documents}")
# Get path components
print(f"Parent: {documents.parent}")
print(f"Name: {documents.name}")
print(f"Stem: {documents.stem}")
print(f"Suffix: {documents.suffix}")
File Operations
from pathlib import Path
import shutil
# Create files and directories
Path('test_dir').mkdir(exist_ok=True)
Path('test_dir/file.txt').touch()
# Read file
file_path = Path('test_dir/file.txt')
content = file_path.read_text()
# Write file
file_path.write_text('Hello, World!')
# Append to file
with open(file_path, 'a') as f:
f.write('\nNew line')
# Copy file
shutil.copy(file_path, 'test_dir/file_copy.txt')
# Copy directory
shutil.copytree('test_dir', 'test_dir_backup')
# Remove file
Path('test_dir/file_copy.txt').unlink()
# Remove directory
shutil.rmtree('test_dir_backup')
# List files
for file in Path('test_dir').iterdir():
print(f"File: {file.name}")
# Recursive file search
for file in Path('test_dir').rglob('*.txt'):
print(f"Found: {file}")
Batch File Operations
from pathlib import Path
import os
def rename_files(directory, old_pattern, new_pattern):
"""Rename files matching pattern"""
path = Path(directory)
for file in path.glob(old_pattern):
new_name = file.name.replace(old_pattern.replace('*', ''), new_pattern)
new_path = file.parent / new_name
file.rename(new_path)
print(f"Renamed: {file.name} -> {new_name}")
def organize_by_extension(directory):
"""Organize files into subdirectories by extension"""
path = Path(directory)
for file in path.iterdir():
if file.is_file():
ext = file.suffix[1:] if file.suffix else 'no_extension'
ext_dir = path / ext
ext_dir.mkdir(exist_ok=True)
file.rename(ext_dir / file.name)
print(f"Moved: {file.name} to {ext}/")
def find_large_files(directory, size_mb=100):
"""Find files larger than specified size"""
path = Path(directory)
size_bytes = size_mb * 1024 * 1024
large_files = []
for file in path.rglob('*'):
if file.is_file() and file.stat().st_size > size_bytes:
large_files.append((file, file.stat().st_size / (1024*1024)))
return sorted(large_files, key=lambda x: x[1], reverse=True)
# Usage
# rename_files('.', '*.log', '.bak')
# organize_by_extension('.')
# large = find_large_files('.', size_mb=50)
# for file, size in large:
# print(f"{file}: {size:.2f} MB")
Process Management
Running External Commands
import subprocess
import os
# Simple command execution
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(f"Return code: {result.returncode}")
print(f"Output:\n{result.stdout}")
print(f"Error:\n{result.stderr}")
# Run command with shell
result = subprocess.run('echo "Hello from shell"', shell=True, capture_output=True, text=True)
print(result.stdout)
# Run command with timeout
try:
result = subprocess.run(['sleep', '10'], timeout=2)
except subprocess.TimeoutExpired:
print("Command timed out!")
# Run command with environment variables
env = os.environ.copy()
env['MY_VAR'] = 'my_value'
result = subprocess.run(['env'], env=env, capture_output=True, text=True)
# Pipe between commands
process1 = subprocess.Popen(['echo', 'hello world'], stdout=subprocess.PIPE)
process2 = subprocess.Popen(['wc', '-w'], stdin=process1.stdout, stdout=subprocess.PIPE, text=True)
output, _ = process2.communicate()
print(f"Word count: {output.strip()}")
Process Monitoring
import psutil
import time
# Get system information
print(f"CPU count: {psutil.cpu_count()}")
print(f"CPU percent: {psutil.cpu_percent(interval=1)}%")
print(f"Memory: {psutil.virtual_memory()}")
print(f"Disk: {psutil.disk_usage('/')}")
# List running processes
for proc in psutil.process_iter(['pid', 'name', 'status']):
try:
print(f"PID: {proc.info['pid']}, Name: {proc.info['name']}, Status: {proc.info['status']}")
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Monitor specific process
def monitor_process(pid, duration=10):
"""Monitor process resource usage"""
try:
p = psutil.Process(pid)
start_time = time.time()
while time.time() - start_time < duration:
print(f"CPU: {p.cpu_percent()}%, Memory: {p.memory_info().rss / (1024*1024):.2f} MB")
time.sleep(1)
except psutil.NoSuchProcess:
print(f"Process {pid} not found")
# Find process by name
def find_process_by_name(name):
"""Find process by name"""
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == name:
return proc.info['pid']
return None
# Kill process
def kill_process(pid):
"""Kill process by PID"""
try:
p = psutil.Process(pid)
p.terminate()
p.wait(timeout=3)
print(f"Process {pid} terminated")
except psutil.NoSuchProcess:
print(f"Process {pid} not found")
except psutil.TimeoutExpired:
p.kill()
print(f"Process {pid} killed")
System Information and Monitoring
import psutil
import platform
import socket
def get_system_info():
"""Get comprehensive system information"""
info = {
'hostname': socket.gethostname(),
'platform': platform.system(),
'platform_release': platform.release(),
'platform_version': platform.version(),
'architecture': platform.machine(),
'processor': platform.processor(),
'cpu_count': psutil.cpu_count(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory': psutil.virtual_memory(),
'disk': psutil.disk_usage('/'),
'boot_time': psutil.boot_time()
}
return info
def print_system_info():
"""Print formatted system information"""
info = get_system_info()
print("=== System Information ===")
print(f"Hostname: {info['hostname']}")
print(f"Platform: {info['platform']} {info['platform_release']}")
print(f"Architecture: {info['architecture']}")
print(f"Processor: {info['processor']}")
print(f"CPU Cores: {info['cpu_count']}")
print(f"CPU Usage: {info['cpu_percent']}%")
mem = info['memory']
print(f"Memory: {mem.used / (1024**3):.2f} GB / {mem.total / (1024**3):.2f} GB ({mem.percent}%)")
disk = info['disk']
print(f"Disk: {disk.used / (1024**3):.2f} GB / {disk.total / (1024**3):.2f} GB ({disk.percent}%)")
# Usage
print_system_info()
User and Permission Management
import os
import stat
from pathlib import Path
# Get file permissions
file_path = Path('test.txt')
file_stat = file_path.stat()
# Check permissions
print(f"Owner UID: {file_stat.st_uid}")
print(f"Owner GID: {file_stat.st_gid}")
print(f"Permissions: {oct(file_stat.st_mode)}")
# Change permissions
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
# Equivalent to: chmod 644 file.txt
# Change owner (requires root)
# os.chown(file_path, uid, gid)
# Check if file is readable/writable/executable
print(f"Readable: {os.access(file_path, os.R_OK)}")
print(f"Writable: {os.access(file_path, os.W_OK)}")
print(f"Executable: {os.access(file_path, os.X_OK)}")
# Get current user
import getpass
print(f"Current user: {getpass.getuser()}")
Scheduling Tasks
import schedule
import time
from datetime import datetime
# Define scheduled tasks
def backup_task():
print(f"Backup started at {datetime.now()}")
# Perform backup operations
def cleanup_task():
print(f"Cleanup started at {datetime.now()}")
# Perform cleanup operations
# Schedule tasks
schedule.every().day.at("02:00").do(backup_task)
schedule.every().hour.do(cleanup_task)
schedule.every(30).minutes.do(cleanup_task)
schedule.every().monday.at("09:00").do(backup_task)
# Run scheduler
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(60)
# Usage (run in background)
# run_scheduler()
# Alternative: Using APScheduler for more complex scheduling
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.add_job(backup_task, 'cron', hour=2, minute=0)
scheduler.add_job(cleanup_task, 'interval', minutes=30)
scheduler.start()
# Keep scheduler running
# try:
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# scheduler.shutdown()
Best Practices
- Error handling: Always handle subprocess errors and exceptions
- Resource cleanup: Use context managers for file operations
- Logging: Log all administrative actions for audit trails
- Permissions: Run with minimal required privileges
- Validation: Validate user input before executing commands
- Backups: Always backup before destructive operations
Common Pitfalls
Bad Practice:
# Don't: Use shell=True without validation
user_input = input("Enter command: ")
subprocess.run(user_input, shell=True) # Security risk!
# Don't: Ignore errors
os.remove(file_path) # May fail silently
# Don't: Hardcode paths
path = '/home/user/documents' # Not portable
Good Practice:
# Do: Validate input and avoid shell=True
command = ['ls', '-la', user_directory]
subprocess.run(command, capture_output=True)
# Do: Handle errors
try:
os.remove(file_path)
except FileNotFoundError:
print(f"File {file_path} not found")
# Do: Use portable paths
path = Path.home() / 'documents'
Conclusion
Python’s rich ecosystem makes it ideal for system administration. Combine file operations, process management, and scheduling to automate routine tasks. Always prioritize security, error handling, and logging in production scripts.
Comments