Introduction
Performance optimization requires understanding bottlenecks, measuring impact, and applying targeted fixes. This guide covers profiling, caching, database optimization, and scaling strategies with practical code examples and current 2026 best practices.
The key principle: measure first, optimize second. Without data, assumptions about performance bottlenecks are often wrong. Every optimization should be preceded by a measurement and followed by a verification measurement.
Profiling
Python Profiling
import cProfile
import pstats
import time
from functools import wraps
from typing import Callable
def profile(func: Callable) -> Callable:
"""Profile function execution with cProfile."""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
result = profiler.runcall(func, *args, **kwargs)
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
stats.print_stats(20)
return result
return wrapper
# Line-by-line profiling
from line_profiler import LineProfiler
def profile_lines(func: Callable) -> Callable:
"""Profile individual lines with line_profiler."""
profiler = LineProfiler()
@wraps(func)
def wrapper(*args, **kwargs):
lp = LineProfiler()
lp_wrapper = lp(func)
result = lp_wrapper(*args, **kwargs)
lp.print_stats()
return result
return wrapper
# Memory profiling
import tracemalloc
def profile_memory(func: Callable) -> Callable:
"""Profile memory usage."""
@wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
print(f"Current: {current / 1024:.1f} KB, Peak: {peak / 1024:.1f} KB")
tracemalloc.stop()
return result
return wrapper
Async Profiling
import asyncio
import time
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_profile(label: str):
"""Profile async function execution time."""
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"[{label}] took {elapsed:.3f}s")
async def fetch_data():
async with async_profile("fetch_data"):
await asyncio.sleep(0.5)
return {"data": "result"}
async def main():
result = await fetch_data()
print(result)
Flamegraph Generation
# Python flamegraphs with py-spy
pip install py-spy
# Sample running process
py-spy record -o flamegraph.svg --pid 12345 --duration 30
# Profile a script
py-spy record -o flamegraph.svg -- python myapp.py
# Generate collapsed stack for further analysis
py-spy record -o stacks.txt --pid 12345 --duration 30 --format collapsed
# Node.js flamegraphs
node --prof app.js
node --prof-process isolate-*.log > processed.txt
npm install -g flamebearer
flamebearer processed.txt
Continuous Profiling
Modern observability platforms include continuous profiling—always-on profilers that sample production processes with minimal overhead.
| Tool | Language Support | Overhead | Storage | Integration |
|---|---|---|---|---|
| Pyroscope | Python, Go, Ruby, Rust, Java | < 5% | Local/S3/GCS | Grafana, Prometheus |
| Google Cloud Profiler | Python, Go, Java, Node.js | < 1% | GCP | GCP Console |
| Datadog Continuous Profiler | Python, Java, Go, Ruby | < 2% | Datadog | Datadog APM |
| Polar Signals | All (eBPF) | < 1% | Parquet/S3 | Prometheus |
Database Optimization
Query Optimization
class UserRepository:
def get_users_with_posts(self):
# Bad: N+1 queries
users = db.query("SELECT * FROM users")
for user in users:
posts = db.query(
"SELECT * FROM posts WHERE user_id = ?", user.id
)
return users
def get_users_with_posts_optimized(self):
# Good: JOIN query
users = db.query("""
SELECT u.*, p.*
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
""")
return users
def get_users_with_posts_eager(self):
# Good: Batch loading
users = db.query("SELECT * FROM users")
user_ids = [u.id for u in users]
posts = db.query(
"SELECT * FROM posts WHERE user_id IN ?", user_ids
)
return users
Indexing Strategies
-- Single column index
CREATE INDEX idx_users_email ON users(email);
-- Composite index (column order matters)
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at DESC);
-- Partial index (for specific queries only)
CREATE INDEX idx_active_orders ON orders(user_id)
WHERE status = 'active';
-- Covering index (includes all needed columns)
CREATE INDEX idx_product_lookup ON products(category, price, name)
INCLUDE (stock_count, rating);
-- Concurrent index creation (non-blocking)
CREATE INDEX CONCURRENTLY idx_large_table ON large_table(column_name);
-- Verify index usage with EXPLAIN
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = '[email protected]';
Index Selection Guide
| Query Pattern | Index Type | Columns | Example |
|---|---|---|---|
| Exact lookup | B-tree single | WHERE email = ? | users(email) |
| Range query | B-tree sortable | WHERE price > ? AND price < ? | products(price) |
| Sorting | B-tree sorted | ORDER BY created_at DESC | orders(created_at) |
| Partial filter | B-tree partial | WHERE status = ‘active’ | orders(user_id) WHERE status = ‘active’ |
| Text search | GIN/trigram | WHERE name ILIKE ‘%term%’ | users USING gin(name gin_trgm_ops) |
| JSON query | GIN JSON path | WHERE metadata @> ‘{“key”:“val”}’ | events USING gin(metadata jsonb_path_ops) |
| Geospatial | GiST/SP-GiST | WHERE ST_DWithin(loc, point, 100) | locations USING gist(coord) |
Connection Pooling
from sqlalchemy import create_engine
# Production connection pool configuration
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
pool_use_lifo=True, # LIFO reduces connection churn
connect_args={
"connect_timeout": 5, # Fail fast on db issues
"keepalives": 1,
"keepalives_idle": 30,
"keepalives_interval": 10,
"keepalives_count": 5,
},
)
Read Replicas
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
class DatabaseRouter:
"""Route read/write queries to appropriate nodes."""
def __init__(self, master_url: str, replica_urls: list[str]):
self.master = create_engine(master_url)
self.replicas = [create_engine(url) for url in replica_urls]
self._current_replica = 0
def get_writer(self) -> Session:
return Session(self.master)
def get_reader(self) -> Session:
"""Round-robin across replicas."""
replica = self.replicas[self._current_replica]
self._current_replica = (self._current_replica + 1) % len(self.replicas)
return Session(replica)
# Usage
router = DatabaseRouter(
master_url="postgresql://user:pass@master/db",
replica_urls=[
"postgresql://user:pass@replica-1/db",
"postgresql://user:pass@replica-2/db",
],
)
# Read queries go to replicas
with router.get_reader() as session:
users = session.query(User).all()
# Write queries go to master
with router.get_writer() as session:
session.add(new_user)
session.commit()
Query Analysis
-- Find slow queries
SELECT
queryid,
calls,
mean_exec_time,
total_exec_time / 1000 AS total_seconds,
rows / calls AS avg_rows,
shared_blks_hit / (shared_blks_hit + shared_blks_read) * 100 AS cache_hit_ratio
FROM pg_stat_statements
WHERE query NOT LIKE '%pg_stat%'
ORDER BY mean_exec_time DESC
LIMIT 20;
-- Identify missing indexes
SELECT
relname AS table_name,
seq_scan,
seq_tup_read,
idx_scan,
seq_tup_read / seq_scan AS avg_rows_per_seq_scan
FROM pg_stat_user_tables
WHERE seq_scan > 1000
ORDER BY seq_scan DESC;
Caching Strategies
Multi-Level Caching
from functools import lru_cache
from typing import Callable, Optional, Any
import json
import hashlib
import time
import redis
class MultiLevelCache:
"""L1 (memory) + L2 (Redis) cache with TTL."""
def __init__(self, redis_client: redis.Redis, l1_size: int = 1024):
self.redis = redis_client
self.l1_ttl = 60 # 1 minute in L1
self.l2_ttl = 300 # 5 minutes in L2
@lru_cache(maxsize=1024)
def _l1_get(self, cache_key: str) -> Optional[str]:
"""L1 cache hit (process-local, fast)."""
return None # Miss triggers L2 lookup
def get(self, key: str, fetch_fn: Callable[[], Any], ttl: int = 300) -> Any:
"""Get with L1 → L2 → origin fallback."""
cache_key = self._make_key(key)
# L1 check (memory)
l1_value = self._l1_get(cache_key)
if l1_value is not None:
return json.loads(l1_value)
# L2 check (Redis)
l2_value = self.redis.get(cache_key)
if l2_value is not None:
# Populate L1
self._l1_get.cache_clear()
return json.loads(l2_value)
# Miss — fetch from origin
value = fetch_fn()
serialized = json.dumps(value)
# Populate L2
self.redis.setex(cache_key, ttl, serialized)
return value
def invalidate(self, pattern: str):
"""Invalidate by pattern across both caches."""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
self._l1_get.cache_clear()
@staticmethod
def _make_key(key: str) -> str:
return f"cache:{hashlib.md5(key.encode()).hexdigest()}"
CDN Caching Configuration
# CDN cache configuration (Vercel/Cloudflare)
cache_rules:
static_assets:
- pattern: "/static/*"
ttl: 31536000 # 1 year
stale_while_revalidate: 86400 # Serve stale while fetching fresh
api_responses:
- pattern: "/api/public/*"
ttl: 60 # 1 minute
stale_if_error: 3600 # Serve stale if origin fails
html_pages:
- pattern: "/*.html"
ttl: 0 # Never cache
bypass: true
images:
- pattern: "/images/*"
ttl: 604800 # 1 week
transform:
resize: "fit"
width: 1200
quality: 80
Cache Invalidation Patterns
| Pattern | Strategy | Use Case | Complexity |
|---|---|---|---|
| TTL-based | Expire after fixed time | Stale data acceptable | Low |
| Write-through | Update cache on write | Strong consistency | Medium |
| Write-behind | Async cache update | High write throughput | High |
| Cache-aside | Application manages cache | General purpose | Medium |
| Read-through | Cache fetches from DB | Read-heavy workloads | Medium |
| Refresh-ahead | Pre-fetch before expiry | Predictable access patterns | High |
HTTP Caching Headers
# FastAPI response caching
from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/api/products/{product_id}")
async def get_product(product_id: str):
product = await fetch_product(product_id)
return JSONResponse(
content=product,
headers={
"Cache-Control": "public, max-age=60, stale-while-revalidate=300",
"ETag": f"W/\"{product['updated_at']}\"",
"Vary": "Accept-Encoding",
},
)
@app.get("/api/users/me")
async def get_current_user(response: Response):
# Private data — never cache in shared caches
response.headers["Cache-Control"] = "private, no-cache, no-store, must-revalidate"
return await get_user_profile()
Scaling Strategies
Horizontal Scaling
# Load balancer with health checks
class LoadBalancer:
def __init__(self, servers: list):
self.servers = servers
self.current = 0
def get_server(self):
"""Round-robin with health check."""
healthy = self.get_healthy_servers()
if not healthy:
raise RuntimeError("No healthy servers available")
server = healthy[self.current % len(healthy)]
self.current += 1
return server
def get_healthy_servers(self):
return [s for s in self.servers if s.is_healthy()]
def add_server(self, server):
self.servers.append(server)
def remove_server(self, server_url: str):
self.servers = [s for s in self.servers if s.url != server_url]
Database Sharding
import hashlib
from typing import Any
class ShardedDatabase:
"""Consistent hash-based database sharding."""
def __init__(self, shards: list[str]):
self.shards = shards
def _get_shard(self, key: str) -> str:
"""Determine shard by consistent hash of key."""
hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
return self.shards[hash_val % len(self.shards)]
def get_connection(self, user_id: str):
"""Get database connection for a user."""
shard_url = self._get_shard(user_id)
return create_engine(shard_url)
def migrate_shard(self, from_shard: str, to_shard: str, keys: list[str]):
"""Migrate keys between shards."""
for key in keys:
data = self.read(key)
# Write to new shard
# Update routing table
pass
Queue-Based Scaling
import asyncio
from typing import Callable, Any
from dataclasses import dataclass, field
from datetime import datetime
import redis.asyncio as aioredis
@dataclass
class Task:
id: str
name: str
payload: dict
priority: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
retries: int = 0
class AsyncTaskQueue:
"""Redis-backed async task queue for background processing."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = aioredis.from_url(redis_url)
async def enqueue(self, task: Task) -> str:
"""Add task to queue."""
serialized = json.dumps({
"id": task.id,
"name": task.name,
"payload": task.payload,
"priority": task.priority,
"created_at": task.created_at.isoformat(),
})
await self.redis.zadd(
"task_queue",
{serialized: task.priority},
)
return task.id
async def dequeue(self) -> Optional[Task]:
"""Get highest-priority task."""
tasks = await self.redis.zpopmax("task_queue")
if not tasks:
return None
data = json.loads(tasks[0][0])
return Task(**data)
async def process_loop(self, handler: Callable[[Task], Any], workers: int = 4):
"""Continuously process tasks with N workers."""
async def worker(worker_id: int):
while True:
task = await self.dequeue()
if task is None:
await asyncio.sleep(0.1)
continue
try:
await handler(task)
except Exception as e:
print(f"[Worker {worker_id}] Error processing {task.id}: {e}")
if task.retries < 3:
task.retries += 1
await self.enqueue(task)
await asyncio.gather(*[worker(i) for i in range(workers)])
Auto-Scaling Configuration
# Auto-scaling policy (Kubernetes HPA)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-server
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api-server
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: requests_per_second
target:
type: AverageValue
averageValue: 1000
Frontend Optimization
Bundle Size Optimization
// Dynamic imports for code splitting
const Dashboard = React.lazy(() => import('./Dashboard'));
const Analytics = React.lazy(() => import('./Analytics'));
const Settings = React.lazy(() => import('./Settings'));
function App() {
return (
<Suspense fallback={<Loading />}>
<Routes>
<Route path="/dashboard" element={<Dashboard />} />
<Route path="/analytics" element={<Analytics />} />
<Route path="/settings" element={<Settings />} />
</Routes>
</Suspense>
);
}
// Tree-shakable imports — import only what you use
// ❌ Bad: imports entire library
import _ from 'lodash';
const result = _.chunk(array, 2);
// ✅ Good: imports only the function
import chunk from 'lodash/chunk';
const result = chunk(array, 2);
// ✅ Best: ES module tree-shakable
import { chunk } from 'es-toolkit';
const result = chunk(array, 2);
Image Optimization
<!-- Modern image formats with responsive sizes -->
<img
src="photo.avif"
srcset="
photo-320.avif 320w,
photo-768.avif 768w,
photo-1200.avif 1200w
"
sizes="
(max-width: 320px) 100vw,
(max-width: 768px) 100vw,
1200px
"
loading="lazy"
decoding="async"
fetchpriority="low"
alt="Description"
width="1200"
height="800"
/>
Critical CSS
<!-- Inline critical CSS, defer the rest -->
<head>
<style>
/* Critical CSS — above-the-fold styles */
body { font-family: system-ui, sans-serif; margin: 0; }
header { height: 60px; display: flex; align-items: center; }
.hero { min-height: 400px; background: #f5f5f5; }
</style>
<link
rel="stylesheet"
href="/styles/full.css"
media="print"
onload="this.media='all'"
/>
</head>
Performance Budgets
| Metric | Good | Needs Improvement | Poor |
|---|---|---|---|
| Largest Contentful Paint (LCP) | < 2.5s | 2.5s - 4.0s | > 4.0s |
| First Input Delay (FID) | < 100ms | 100ms - 300ms | > 300ms |
| Cumulative Layout Shift (CLS) | < 0.1 | 0.1 - 0.25 | > 0.25 |
| Interaction to Next Paint (INP) | < 200ms | 200ms - 500ms | > 500ms |
| Time to First Byte (TTFB) | < 800ms | 800ms - 1800ms | > 1800ms |
| First Contentful Paint (FCP) | < 1.8s | 1.8s - 3.0s | > 3.0s |
Network and I/O Optimization
Connection Pooling and Reuse
import aiohttp
import asyncio
class HttpClientPool:
"""Reusable HTTP connection pool."""
def __init__(self, max_connections: int = 100):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=max_connections,
ttl_dns_cache=300,
keepalive_timeout=30,
force_close=False,
),
timeout=aiohttp.ClientTimeout(total=30),
)
async def get(self, url: str) -> dict:
async with self.session.get(url) as response:
return await response.json()
async def close(self):
await self.session.close()
Compression
# Nginx compression configuration
gzip on;
gzip_comp_level 6;
gzip_min_length 256;
gzip_types
text/plain
text/css
text/javascript
application/json
application/javascript
application/xml
image/svg+xml;
# Brotli (preferred over gzip when available)
brotli on;
brotli_comp_level 6;
brotli_types
text/plain
text/css
text/javascript
application/json
application/javascript;
Conclusion
Performance optimization is iterative: profile to find bottlenecks, optimize the critical path, cache aggressively, and scale horizontally. Measure before and after every change. Focus on user-perceived performance, not just metrics.
Key principles to remember:
- Always measure before optimizing
- Profile in production-like environments
- Cache at every level (L1 → L2 → CDN)
- Scale horizontally before vertically
- Frontend performance matters as much as backend
- Set performance budgets and enforce them in CI
Resources
- Python Profiling Documentation — cProfile and pstats
- py-spy — Sampling profiler for Python
- Pyroscope — Continuous profiling platform
- Redis Documentation — Caching and data structures
- PostgreSQL Performance Tuning — Official guide
- Google Web Fundamentals - Performance — Web performance best practices
- Web Vitals — Core Web Vitals metrics
- High Performance MySQL — Database optimization book
- Nginx Caching Guide — HTTP caching configuration
Comments