Skip to main content
โšก Calmops

File & Directory Operations (Advanced): Mastering File System Manipulation

File & Directory Operations (Advanced): Mastering File System Manipulation

Advanced file and directory operations are essential for building robust automation scripts, data processing pipelines, and system administration tools. This guide covers practical patterns for efficient file system manipulation.

Modern Path Handling with pathlib

Path Operations

from pathlib import Path
import os

# Create Path objects
p = Path('data/files/document.txt')
print(f"Name: {p.name}")  # document.txt
print(f"Stem: {p.stem}")  # document
print(f"Suffix: {p.suffix}")  # .txt
print(f"Parent: {p.parent}")  # data/files
print(f"Parts: {p.parts}")  # ('data', 'files', 'document.txt')

# Resolve absolute path
absolute = p.resolve()
print(f"Absolute: {absolute}")

# Check path type
print(f"Is file: {p.is_file()}")
print(f"Is dir: {p.is_dir()}")
print(f"Exists: {p.exists()}")

# Path joining
base = Path('data')
file_path = base / 'files' / 'document.txt'
print(f"Joined: {file_path}")

Safe Path Operations

def safe_path_join(*parts):
    """Safely join path parts, preventing directory traversal."""
    base = Path(parts[0])
    
    for part in parts[1:]:
        # Prevent directory traversal
        if '..' in part or part.startswith('/'):
            raise ValueError(f"Invalid path component: {part}")
        base = base / part
    
    # Ensure result is within base
    try:
        base.resolve().relative_to(Path(parts[0]).resolve())
    except ValueError:
        raise ValueError(f"Path escapes base directory: {base}")
    
    return base

# Usage
try:
    path = safe_path_join('/data', 'files', 'document.txt')
    print(f"Safe path: {path}")
except ValueError as e:
    print(f"Error: {e}")

Directory Operations

Recursive Directory Traversal

def walk_directory(root_path, pattern='*', max_depth=None):
    """Walk directory tree with optional depth limit."""
    root = Path(root_path)
    
    def _walk(path, depth=0):
        if max_depth is not None and depth > max_depth:
            return
        
        try:
            for item in sorted(path.glob(pattern)):
                if item.is_file():
                    yield item, depth
                elif item.is_dir():
                    yield item, depth
                    yield from _walk(item, depth + 1)
        except PermissionError:
            print(f"Permission denied: {path}")
    
    return _walk(root)

# Usage
for path, depth in walk_directory('.', pattern='*.py', max_depth=2):
    indent = '  ' * depth
    print(f"{indent}{path.name}")

Directory Statistics

def get_directory_stats(root_path):
    """Calculate directory statistics."""
    root = Path(root_path)
    
    stats = {
        'total_files': 0,
        'total_dirs': 0,
        'total_size': 0,
        'file_types': {},
        'largest_files': []
    }
    
    for item in root.rglob('*'):
        if item.is_file():
            stats['total_files'] += 1
            stats['total_size'] += item.stat().st_size
            
            # Track file types
            suffix = item.suffix or 'no_extension'
            stats['file_types'][suffix] = stats['file_types'].get(suffix, 0) + 1
            
            # Track largest files
            stats['largest_files'].append((item, item.stat().st_size))
        
        elif item.is_dir():
            stats['total_dirs'] += 1
    
    # Sort largest files
    stats['largest_files'].sort(key=lambda x: x[1], reverse=True)
    stats['largest_files'] = stats['largest_files'][:10]
    
    return stats

# Usage
stats = get_directory_stats('.')
print(f"Total files: {stats['total_files']}")
print(f"Total size: {stats['total_size'] / 1024 / 1024:.2f} MB")
print(f"File types: {stats['file_types']}")

Safe Directory Cleanup

import shutil
from datetime import datetime, timedelta

def cleanup_old_files(directory, days_old=30, dry_run=True):
    """Remove files older than specified days."""
    root = Path(directory)
    cutoff_time = datetime.now() - timedelta(days=days_old)
    
    removed = []
    
    for file_path in root.rglob('*'):
        if not file_path.is_file():
            continue
        
        # Get modification time
        mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
        
        if mtime < cutoff_time:
            if dry_run:
                print(f"Would remove: {file_path}")
            else:
                try:
                    file_path.unlink()
                    removed.append(file_path)
                    print(f"Removed: {file_path}")
                except Exception as e:
                    print(f"Error removing {file_path}: {e}")
    
    return removed

# Usage
cleanup_old_files('.', days_old=30, dry_run=True)

File Permissions and Ownership

Managing Permissions

import stat
import os

def set_file_permissions(file_path, mode):
    """Set file permissions safely."""
    path = Path(file_path)
    
    if not path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")
    
    try:
        path.chmod(mode)
        print(f"Permissions set to {oct(mode)} for {file_path}")
    except PermissionError:
        print(f"Permission denied: cannot change permissions for {file_path}")

def get_file_permissions(file_path):
    """Get file permissions in readable format."""
    path = Path(file_path)
    mode = path.stat().st_mode
    
    # Extract permission bits
    permissions = stat.filemode(mode)
    
    return {
        'mode': oct(stat.S_IMODE(mode)),
        'readable': permissions,
        'owner_read': bool(mode & stat.S_IRUSR),
        'owner_write': bool(mode & stat.S_IWUSR),
        'owner_execute': bool(mode & stat.S_IXUSR),
        'group_read': bool(mode & stat.S_IRGRP),
        'group_write': bool(mode & stat.S_IWGRP),
        'group_execute': bool(mode & stat.S_IXGRP),
        'others_read': bool(mode & stat.S_IROTH),
        'others_write': bool(mode & stat.S_IWOTH),
        'others_execute': bool(mode & stat.S_IXOTH)
    }

# Usage
perms = get_file_permissions('test.txt')
print(f"Permissions: {perms['readable']}")

# Set permissions (755 = rwxr-xr-x)
set_file_permissions('test.txt', 0o755)

Atomic File Operations

Safe File Writing

import tempfile

def atomic_write(file_path, content, mode='w'):
    """Write file atomically using temporary file."""
    path = Path(file_path)
    
    # Create temporary file in same directory
    temp_dir = path.parent
    
    with tempfile.NamedTemporaryFile(
        mode=mode,
        dir=temp_dir,
        delete=False,
        suffix='.tmp'
    ) as tmp_file:
        tmp_path = Path(tmp_file.name)
        
        try:
            tmp_file.write(content)
            tmp_file.flush()
            os.fsync(tmp_file.fileno())  # Ensure written to disk
            
            # Atomic rename
            tmp_path.replace(path)
            print(f"File written atomically: {file_path}")
        
        except Exception as e:
            tmp_path.unlink()  # Clean up temp file
            raise e

# Usage
atomic_write('data.txt', 'Important data\n')

Safe File Locking

import fcntl
import time

class FileLock:
    """Context manager for file locking."""
    
    def __init__(self, file_path, timeout=10):
        self.file_path = Path(file_path)
        self.timeout = timeout
        self.lock_file = None
    
    def __enter__(self):
        start_time = time.time()
        
        while True:
            try:
                # Open lock file
                self.lock_file = open(self.file_path, 'w')
                
                # Try to acquire exclusive lock
                fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                return self
            
            except IOError:
                if time.time() - start_time > self.timeout:
                    raise TimeoutError(f"Could not acquire lock on {self.file_path}")
                time.sleep(0.1)
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.lock_file:
            fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN)
            self.lock_file.close()

# Usage
try:
    with FileLock('data.lock', timeout=5):
        print("Lock acquired")
        # Do critical work
        time.sleep(1)
except TimeoutError as e:
    print(f"Error: {e}")

File Watching

Monitor File Changes

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time

class FileChangeHandler(FileSystemEventHandler):
    """Handle file system events."""
    
    def on_created(self, event):
        if not event.is_directory:
            print(f"File created: {event.src_path}")
    
    def on_modified(self, event):
        if not event.is_directory:
            print(f"File modified: {event.src_path}")
    
    def on_deleted(self, event):
        if not event.is_directory:
            print(f"File deleted: {event.src_path}")
    
    def on_moved(self, event):
        if not event.is_directory:
            print(f"File moved: {event.src_path} -> {event.dest_path}")

def watch_directory(directory, recursive=True):
    """Watch directory for changes."""
    event_handler = FileChangeHandler()
    observer = Observer()
    
    observer.schedule(event_handler, directory, recursive=recursive)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

# Usage (in separate thread or process)
# watch_directory('.')

Efficient File Processing

Streaming Large Files

def process_large_file(file_path, chunk_size=8192):
    """Process large file in chunks."""
    path = Path(file_path)
    
    with open(path, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            
            yield chunk

# Usage
total_size = 0
for chunk in process_large_file('large_file.bin'):
    total_size += len(chunk)
    print(f"Processed {total_size} bytes")

Parallel File Processing

from concurrent.futures import ThreadPoolExecutor
import hashlib

def compute_file_hash(file_path):
    """Compute file hash."""
    hash_obj = hashlib.sha256()
    
    with open(file_path, 'rb') as f:
        for chunk in iter(lambda: f.read(8192), b''):
            hash_obj.update(chunk)
    
    return file_path, hash_obj.hexdigest()

def process_files_parallel(directory, pattern='*', max_workers=4):
    """Process multiple files in parallel."""
    root = Path(directory)
    files = list(root.glob(pattern))
    
    results = {}
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(compute_file_hash, f): f 
            for f in files
        }
        
        for future in futures:
            file_path, file_hash = future.result()
            results[str(file_path)] = file_hash
    
    return results

# Usage
hashes = process_files_parallel('.', pattern='*.py')
for file_path, hash_value in hashes.items():
    print(f"{file_path}: {hash_value}")

Common Pitfalls and Best Practices

โŒ Bad: String Path Concatenation

# DON'T: Use string concatenation for paths
path = 'data' + '/' + 'files' + '/' + 'document.txt'

โœ… Good: Use pathlib

# DO: Use pathlib for cross-platform paths
path = Path('data') / 'files' / 'document.txt'

โŒ Bad: No Error Handling

# DON'T: Assume operations succeed
file_path.unlink()
shutil.rmtree(directory)

โœ… Good: Handle Errors

# DO: Handle errors gracefully
try:
    file_path.unlink()
except FileNotFoundError:
    print("File not found")
except PermissionError:
    print("Permission denied")

โŒ Bad: Non-atomic Writes

# DON'T: Write directly to file
with open('data.txt', 'w') as f:
    f.write(content)

โœ… Good: Atomic Writes

# DO: Use atomic writes
atomic_write('data.txt', content)

Summary

Advanced file operations require:

  1. Modern path handling with pathlib
  2. Safe directory operations with error handling
  3. Atomic writes to prevent corruption
  4. File locking for concurrent access
  5. Permission management for security
  6. Efficient processing for large files
  7. File watching for real-time monitoring
  8. Parallel processing for performance

These patterns ensure robust, efficient, and secure file system operations in production environments.

Comments