File & Directory Operations (Advanced): Mastering File System Manipulation
Advanced file and directory operations are essential for building robust automation scripts, data processing pipelines, and system administration tools. This guide covers practical patterns for efficient file system manipulation.
Modern Path Handling with pathlib
Path Operations
from pathlib import Path
import os
# Create Path objects
p = Path('data/files/document.txt')
print(f"Name: {p.name}") # document.txt
print(f"Stem: {p.stem}") # document
print(f"Suffix: {p.suffix}") # .txt
print(f"Parent: {p.parent}") # data/files
print(f"Parts: {p.parts}") # ('data', 'files', 'document.txt')
# Resolve absolute path
absolute = p.resolve()
print(f"Absolute: {absolute}")
# Check path type
print(f"Is file: {p.is_file()}")
print(f"Is dir: {p.is_dir()}")
print(f"Exists: {p.exists()}")
# Path joining
base = Path('data')
file_path = base / 'files' / 'document.txt'
print(f"Joined: {file_path}")
Safe Path Operations
def safe_path_join(*parts):
"""Safely join path parts, preventing directory traversal."""
base = Path(parts[0])
for part in parts[1:]:
# Prevent directory traversal
if '..' in part or part.startswith('/'):
raise ValueError(f"Invalid path component: {part}")
base = base / part
# Ensure result is within base
try:
base.resolve().relative_to(Path(parts[0]).resolve())
except ValueError:
raise ValueError(f"Path escapes base directory: {base}")
return base
# Usage
try:
path = safe_path_join('/data', 'files', 'document.txt')
print(f"Safe path: {path}")
except ValueError as e:
print(f"Error: {e}")
Directory Operations
Recursive Directory Traversal
def walk_directory(root_path, pattern='*', max_depth=None):
"""Walk directory tree with optional depth limit."""
root = Path(root_path)
def _walk(path, depth=0):
if max_depth is not None and depth > max_depth:
return
try:
for item in sorted(path.glob(pattern)):
if item.is_file():
yield item, depth
elif item.is_dir():
yield item, depth
yield from _walk(item, depth + 1)
except PermissionError:
print(f"Permission denied: {path}")
return _walk(root)
# Usage
for path, depth in walk_directory('.', pattern='*.py', max_depth=2):
indent = ' ' * depth
print(f"{indent}{path.name}")
Directory Statistics
def get_directory_stats(root_path):
"""Calculate directory statistics."""
root = Path(root_path)
stats = {
'total_files': 0,
'total_dirs': 0,
'total_size': 0,
'file_types': {},
'largest_files': []
}
for item in root.rglob('*'):
if item.is_file():
stats['total_files'] += 1
stats['total_size'] += item.stat().st_size
# Track file types
suffix = item.suffix or 'no_extension'
stats['file_types'][suffix] = stats['file_types'].get(suffix, 0) + 1
# Track largest files
stats['largest_files'].append((item, item.stat().st_size))
elif item.is_dir():
stats['total_dirs'] += 1
# Sort largest files
stats['largest_files'].sort(key=lambda x: x[1], reverse=True)
stats['largest_files'] = stats['largest_files'][:10]
return stats
# Usage
stats = get_directory_stats('.')
print(f"Total files: {stats['total_files']}")
print(f"Total size: {stats['total_size'] / 1024 / 1024:.2f} MB")
print(f"File types: {stats['file_types']}")
Safe Directory Cleanup
import shutil
from datetime import datetime, timedelta
def cleanup_old_files(directory, days_old=30, dry_run=True):
"""Remove files older than specified days."""
root = Path(directory)
cutoff_time = datetime.now() - timedelta(days=days_old)
removed = []
for file_path in root.rglob('*'):
if not file_path.is_file():
continue
# Get modification time
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if mtime < cutoff_time:
if dry_run:
print(f"Would remove: {file_path}")
else:
try:
file_path.unlink()
removed.append(file_path)
print(f"Removed: {file_path}")
except Exception as e:
print(f"Error removing {file_path}: {e}")
return removed
# Usage
cleanup_old_files('.', days_old=30, dry_run=True)
File Permissions and Ownership
Managing Permissions
import stat
import os
def set_file_permissions(file_path, mode):
"""Set file permissions safely."""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
try:
path.chmod(mode)
print(f"Permissions set to {oct(mode)} for {file_path}")
except PermissionError:
print(f"Permission denied: cannot change permissions for {file_path}")
def get_file_permissions(file_path):
"""Get file permissions in readable format."""
path = Path(file_path)
mode = path.stat().st_mode
# Extract permission bits
permissions = stat.filemode(mode)
return {
'mode': oct(stat.S_IMODE(mode)),
'readable': permissions,
'owner_read': bool(mode & stat.S_IRUSR),
'owner_write': bool(mode & stat.S_IWUSR),
'owner_execute': bool(mode & stat.S_IXUSR),
'group_read': bool(mode & stat.S_IRGRP),
'group_write': bool(mode & stat.S_IWGRP),
'group_execute': bool(mode & stat.S_IXGRP),
'others_read': bool(mode & stat.S_IROTH),
'others_write': bool(mode & stat.S_IWOTH),
'others_execute': bool(mode & stat.S_IXOTH)
}
# Usage
perms = get_file_permissions('test.txt')
print(f"Permissions: {perms['readable']}")
# Set permissions (755 = rwxr-xr-x)
set_file_permissions('test.txt', 0o755)
Atomic File Operations
Safe File Writing
import tempfile
def atomic_write(file_path, content, mode='w'):
"""Write file atomically using temporary file."""
path = Path(file_path)
# Create temporary file in same directory
temp_dir = path.parent
with tempfile.NamedTemporaryFile(
mode=mode,
dir=temp_dir,
delete=False,
suffix='.tmp'
) as tmp_file:
tmp_path = Path(tmp_file.name)
try:
tmp_file.write(content)
tmp_file.flush()
os.fsync(tmp_file.fileno()) # Ensure written to disk
# Atomic rename
tmp_path.replace(path)
print(f"File written atomically: {file_path}")
except Exception as e:
tmp_path.unlink() # Clean up temp file
raise e
# Usage
atomic_write('data.txt', 'Important data\n')
Safe File Locking
import fcntl
import time
class FileLock:
"""Context manager for file locking."""
def __init__(self, file_path, timeout=10):
self.file_path = Path(file_path)
self.timeout = timeout
self.lock_file = None
def __enter__(self):
start_time = time.time()
while True:
try:
# Open lock file
self.lock_file = open(self.file_path, 'w')
# Try to acquire exclusive lock
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return self
except IOError:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"Could not acquire lock on {self.file_path}")
time.sleep(0.1)
def __exit__(self, exc_type, exc_val, exc_tb):
if self.lock_file:
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN)
self.lock_file.close()
# Usage
try:
with FileLock('data.lock', timeout=5):
print("Lock acquired")
# Do critical work
time.sleep(1)
except TimeoutError as e:
print(f"Error: {e}")
File Watching
Monitor File Changes
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
class FileChangeHandler(FileSystemEventHandler):
"""Handle file system events."""
def on_created(self, event):
if not event.is_directory:
print(f"File created: {event.src_path}")
def on_modified(self, event):
if not event.is_directory:
print(f"File modified: {event.src_path}")
def on_deleted(self, event):
if not event.is_directory:
print(f"File deleted: {event.src_path}")
def on_moved(self, event):
if not event.is_directory:
print(f"File moved: {event.src_path} -> {event.dest_path}")
def watch_directory(directory, recursive=True):
"""Watch directory for changes."""
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, directory, recursive=recursive)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
# Usage (in separate thread or process)
# watch_directory('.')
Efficient File Processing
Streaming Large Files
def process_large_file(file_path, chunk_size=8192):
"""Process large file in chunks."""
path = Path(file_path)
with open(path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# Usage
total_size = 0
for chunk in process_large_file('large_file.bin'):
total_size += len(chunk)
print(f"Processed {total_size} bytes")
Parallel File Processing
from concurrent.futures import ThreadPoolExecutor
import hashlib
def compute_file_hash(file_path):
"""Compute file hash."""
hash_obj = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hash_obj.update(chunk)
return file_path, hash_obj.hexdigest()
def process_files_parallel(directory, pattern='*', max_workers=4):
"""Process multiple files in parallel."""
root = Path(directory)
files = list(root.glob(pattern))
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(compute_file_hash, f): f
for f in files
}
for future in futures:
file_path, file_hash = future.result()
results[str(file_path)] = file_hash
return results
# Usage
hashes = process_files_parallel('.', pattern='*.py')
for file_path, hash_value in hashes.items():
print(f"{file_path}: {hash_value}")
Common Pitfalls and Best Practices
โ Bad: String Path Concatenation
# DON'T: Use string concatenation for paths
path = 'data' + '/' + 'files' + '/' + 'document.txt'
โ Good: Use pathlib
# DO: Use pathlib for cross-platform paths
path = Path('data') / 'files' / 'document.txt'
โ Bad: No Error Handling
# DON'T: Assume operations succeed
file_path.unlink()
shutil.rmtree(directory)
โ Good: Handle Errors
# DO: Handle errors gracefully
try:
file_path.unlink()
except FileNotFoundError:
print("File not found")
except PermissionError:
print("Permission denied")
โ Bad: Non-atomic Writes
# DON'T: Write directly to file
with open('data.txt', 'w') as f:
f.write(content)
โ Good: Atomic Writes
# DO: Use atomic writes
atomic_write('data.txt', content)
Summary
Advanced file operations require:
- Modern path handling with pathlib
- Safe directory operations with error handling
- Atomic writes to prevent corruption
- File locking for concurrent access
- Permission management for security
- Efficient processing for large files
- File watching for real-time monitoring
- Parallel processing for performance
These patterns ensure robust, efficient, and secure file system operations in production environments.
Comments