Introduction
Performance optimization requires understanding bottlenecks, measuring impact, and applying targeted fixes. This guide covers profiling, caching, database optimization, and scaling strategies.
Profiling
import cProfile
import pstats
import time
from functools import wraps
def profile(func):
"""Profile function execution."""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
result = profiler.runcall(func, *args, **kwargs)
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
stats.print_stats(20)
return result
return wrapper
# Line-by-line profiling
from line_profiler import LineProfiler
def profile_lines(func):
"""Profile individual lines."""
profiler = LineProfiler()
profiler.add_function(func)
@wraps(func)
def wrapper(*args, **kwargs):
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
profiler.print_stats()
return result
return wrapper
# Memory profiling
import tracemalloc
def profile_memory(func):
"""Profile memory usage."""
@wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
print(f"Current: {current / 1024:.1f} KB, Peak: {peak / 1024:.1f} KB")
tracemalloc.stop()
return result
return wrapper
Database Optimization
# Query optimization
class UserRepository:
def get_users_with_posts(self):
# Bad: N+1 queries
users = db.query("SELECT * FROM users")
for user in users:
posts = db.query(
"SELECT * FROM posts WHERE user_id = ?", user.id
)
# Good: JOIN query
users = db.query("""
SELECT u.*, p.*
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
""")
# Good: Eager loading
users = db.query("SELECT * FROM users")
user_ids = [u.id for u in users]
posts = db.query(
"SELECT * FROM posts WHERE user_id IN ?", user_ids
)
return users
# Index optimization
# Create indexes for frequently queried columns
# Composite indexes for multi-column queries
# Use EXPLAIN ANALYZE to verify query plans
# Connection pooling
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True
)
Caching Strategies
from functools import lru_cache
from typing import Callable
# In-memory caching
@lru_cache(maxsize=128)
def expensive_computation(n: int) -> int:
"""Cache expensive function results."""
time.sleep(1) # Simulate work
return n * 2
# Redis caching
import redis
class CacheService:
def __init__(self):
self.redis = redis.Redis()
def get_or_set(self, key: str, fetch: Callable, ttl: int = 300):
value = self.redis.get(key)
if value:
return value
value = fetch()
self.redis.setex(key, ttl, value)
return value
def invalidate_pattern(self, pattern: str):
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
Scaling Strategies
# Horizontal scaling with load balancing
class LoadBalancer:
def __init__(self, servers: list):
self.servers = servers
self.current = 0
def get_server(self):
# Round-robin
server = self.servers[self.current]
self.current = (self.current + 1) % len(self.servers)
return server
def get_healthy_servers(self):
return [s for s in self.servers if s.is_healthy()]
# Async processing for heavy tasks
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncProcessor:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers)
async def process_background(self, task: Callable, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, task, *args)
Conclusion
Performance optimization is iterative: profile to find bottlenecks, optimize the critical path, cache aggressively, and scale horizontally. Measure before and after changes. Focus on user-perceived performance, not just metrics.
Resources
- “High Performance MySQL”
- Google Web Fundamentals - Performance
Comments