Skip to main content
โšก Calmops

Performance Optimization: Profiling, Caching, and Scaling Strategies

Introduction

Performance optimization requires understanding bottlenecks, measuring impact, and applying targeted fixes. This guide covers profiling, caching, database optimization, and scaling strategies.

Profiling

import cProfile
import pstats
import time
from functools import wraps

def profile(func):
    """Profile function execution."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        result = profiler.runcall(func, *args, **kwargs)
        
        stats = pstats.Stats(profiler)
        stats.sort_stats("cumulative")
        stats.print_stats(20)
        
        return result
    return wrapper

# Line-by-line profiling
from line_profiler import LineProfiler

def profile_lines(func):
    """Profile individual lines."""
    profiler = LineProfiler()
    profiler.add_function(func)
    
    @wraps(func)
    def wrapper(*args, **kwargs):
        profiler.enable()
        result = func(*args, **kwargs)
        profiler.disable()
        profiler.print_stats()
        return result
    return wrapper

# Memory profiling
import tracemalloc

def profile_memory(func):
    """Profile memory usage."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        tracemalloc.start()
        result = func(*args, **kwargs)
        current, peak = tracemalloc.get_traced_memory()
        print(f"Current: {current / 1024:.1f} KB, Peak: {peak / 1024:.1f} KB")
        tracemalloc.stop()
        return result
    return wrapper

Database Optimization

# Query optimization
class UserRepository:
    def get_users_with_posts(self):
        # Bad: N+1 queries
        users = db.query("SELECT * FROM users")
        for user in users:
            posts = db.query(
                "SELECT * FROM posts WHERE user_id = ?", user.id
            )
        
        # Good: JOIN query
        users = db.query("""
            SELECT u.*, p.*
            FROM users u
            LEFT JOIN posts p ON u.id = p.user_id
        """)
        
        # Good: Eager loading
        users = db.query("SELECT * FROM users")
        user_ids = [u.id for u in users]
        posts = db.query(
            "SELECT * FROM posts WHERE user_id IN ?", user_ids
        )
        
        return users

# Index optimization
# Create indexes for frequently queried columns
# Composite indexes for multi-column queries
# Use EXPLAIN ANALYZE to verify query plans

# Connection pooling
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True
)

Caching Strategies

from functools import lru_cache
from typing import Callable

# In-memory caching
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
    """Cache expensive function results."""
    time.sleep(1)  # Simulate work
    return n * 2

# Redis caching
import redis

class CacheService:
    def __init__(self):
        self.redis = redis.Redis()
    
    def get_or_set(self, key: str, fetch: Callable, ttl: int = 300):
        value = self.redis.get(key)
        if value:
            return value
        
        value = fetch()
        self.redis.setex(key, ttl, value)
        return value
    
    def invalidate_pattern(self, pattern: str):
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)

Scaling Strategies

# Horizontal scaling with load balancing
class LoadBalancer:
    def __init__(self, servers: list):
        self.servers = servers
        self.current = 0
    
    def get_server(self):
        # Round-robin
        server = self.servers[self.current]
        self.current = (self.current + 1) % len(self.servers)
        return server
    
    def get_healthy_servers(self):
        return [s for s in self.servers if s.is_healthy()]

# Async processing for heavy tasks
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncProcessor:
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers)
    
    async def process_background(self, task: Callable, *args):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(self.executor, task, *args)

Conclusion

Performance optimization is iterative: profile to find bottlenecks, optimize the critical path, cache aggressively, and scale horizontally. Measure before and after changes. Focus on user-perceived performance, not just metrics.

Resources

  • “High Performance MySQL”
  • Google Web Fundamentals - Performance

Comments