Skip to main content

Performance Optimization: Profiling, Caching, and Scaling Strategies

Published: March 19, 2026 Updated: May 24, 2026 Larry Qu 11 min read

Introduction

Performance optimization requires understanding bottlenecks, measuring impact, and applying targeted fixes. This guide covers profiling, caching, database optimization, and scaling strategies with practical code examples and current 2026 best practices.

The key principle: measure first, optimize second. Without data, assumptions about performance bottlenecks are often wrong. Every optimization should be preceded by a measurement and followed by a verification measurement.

Profiling

Python Profiling

import cProfile
import pstats
import time
from functools import wraps
from typing import Callable

def profile(func: Callable) -> Callable:
    """Profile function execution with cProfile."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        profiler = cProfile.Profile()
        result = profiler.runcall(func, *args, **kwargs)

        stats = pstats.Stats(profiler)
        stats.sort_stats("cumulative")
        stats.print_stats(20)

        return result
    return wrapper

# Line-by-line profiling
from line_profiler import LineProfiler

def profile_lines(func: Callable) -> Callable:
    """Profile individual lines with line_profiler."""
    profiler = LineProfiler()

    @wraps(func)
    def wrapper(*args, **kwargs):
        lp = LineProfiler()
        lp_wrapper = lp(func)
        result = lp_wrapper(*args, **kwargs)
        lp.print_stats()
        return result
    return wrapper

# Memory profiling
import tracemalloc

def profile_memory(func: Callable) -> Callable:
    """Profile memory usage."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        tracemalloc.start()
        result = func(*args, **kwargs)
        current, peak = tracemalloc.get_traced_memory()
        print(f"Current: {current / 1024:.1f} KB, Peak: {peak / 1024:.1f} KB")
        tracemalloc.stop()
        return result
    return wrapper

Async Profiling

import asyncio
import time
from contextlib import asynccontextmanager


@asynccontextmanager
async def async_profile(label: str):
    """Profile async function execution time."""
    start = time.perf_counter()
    yield
    elapsed = time.perf_counter() - start
    print(f"[{label}] took {elapsed:.3f}s")


async def fetch_data():
    async with async_profile("fetch_data"):
        await asyncio.sleep(0.5)
        return {"data": "result"}


async def main():
    result = await fetch_data()
    print(result)

Flamegraph Generation

# Python flamegraphs with py-spy
pip install py-spy

# Sample running process
py-spy record -o flamegraph.svg --pid 12345 --duration 30

# Profile a script
py-spy record -o flamegraph.svg -- python myapp.py

# Generate collapsed stack for further analysis
py-spy record -o stacks.txt --pid 12345 --duration 30 --format collapsed

# Node.js flamegraphs
node --prof app.js
node --prof-process isolate-*.log > processed.txt
npm install -g flamebearer
flamebearer processed.txt

Continuous Profiling

Modern observability platforms include continuous profiling—always-on profilers that sample production processes with minimal overhead.

Tool Language Support Overhead Storage Integration
Pyroscope Python, Go, Ruby, Rust, Java < 5% Local/S3/GCS Grafana, Prometheus
Google Cloud Profiler Python, Go, Java, Node.js < 1% GCP GCP Console
Datadog Continuous Profiler Python, Java, Go, Ruby < 2% Datadog Datadog APM
Polar Signals All (eBPF) < 1% Parquet/S3 Prometheus

Database Optimization

Query Optimization

class UserRepository:
    def get_users_with_posts(self):
        # Bad: N+1 queries
        users = db.query("SELECT * FROM users")
        for user in users:
            posts = db.query(
                "SELECT * FROM posts WHERE user_id = ?", user.id
            )
        return users

    def get_users_with_posts_optimized(self):
        # Good: JOIN query
        users = db.query("""
            SELECT u.*, p.*
            FROM users u
            LEFT JOIN posts p ON u.id = p.user_id
        """)
        return users

    def get_users_with_posts_eager(self):
        # Good: Batch loading
        users = db.query("SELECT * FROM users")
        user_ids = [u.id for u in users]
        posts = db.query(
            "SELECT * FROM posts WHERE user_id IN ?", user_ids
        )
        return users

Indexing Strategies

-- Single column index
CREATE INDEX idx_users_email ON users(email);

-- Composite index (column order matters)
CREATE INDEX idx_orders_user_date ON orders(user_id, created_at DESC);

-- Partial index (for specific queries only)
CREATE INDEX idx_active_orders ON orders(user_id)
WHERE status = 'active';

-- Covering index (includes all needed columns)
CREATE INDEX idx_product_lookup ON products(category, price, name)
INCLUDE (stock_count, rating);

-- Concurrent index creation (non-blocking)
CREATE INDEX CONCURRENTLY idx_large_table ON large_table(column_name);

-- Verify index usage with EXPLAIN
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = '[email protected]';

Index Selection Guide

Query Pattern Index Type Columns Example
Exact lookup B-tree single WHERE email = ? users(email)
Range query B-tree sortable WHERE price > ? AND price < ? products(price)
Sorting B-tree sorted ORDER BY created_at DESC orders(created_at)
Partial filter B-tree partial WHERE status = ‘active’ orders(user_id) WHERE status = ‘active’
Text search GIN/trigram WHERE name ILIKE ‘%term%’ users USING gin(name gin_trgm_ops)
JSON query GIN JSON path WHERE metadata @> ‘{“key”:“val”}’ events USING gin(metadata jsonb_path_ops)
Geospatial GiST/SP-GiST WHERE ST_DWithin(loc, point, 100) locations USING gist(coord)

Connection Pooling

from sqlalchemy import create_engine

# Production connection pool configuration
engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,        # Verify connections before use
    pool_recycle=3600,         # Recycle connections after 1 hour
    pool_use_lifo=True,        # LIFO reduces connection churn
    connect_args={
        "connect_timeout": 5,  # Fail fast on db issues
        "keepalives": 1,
        "keepalives_idle": 30,
        "keepalives_interval": 10,
        "keepalives_count": 5,
    },
)

Read Replicas

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

class DatabaseRouter:
    """Route read/write queries to appropriate nodes."""

    def __init__(self, master_url: str, replica_urls: list[str]):
        self.master = create_engine(master_url)
        self.replicas = [create_engine(url) for url in replica_urls]
        self._current_replica = 0

    def get_writer(self) -> Session:
        return Session(self.master)

    def get_reader(self) -> Session:
        """Round-robin across replicas."""
        replica = self.replicas[self._current_replica]
        self._current_replica = (self._current_replica + 1) % len(self.replicas)
        return Session(replica)


# Usage
router = DatabaseRouter(
    master_url="postgresql://user:pass@master/db",
    replica_urls=[
        "postgresql://user:pass@replica-1/db",
        "postgresql://user:pass@replica-2/db",
    ],
)

# Read queries go to replicas
with router.get_reader() as session:
    users = session.query(User).all()

# Write queries go to master
with router.get_writer() as session:
    session.add(new_user)
    session.commit()

Query Analysis

-- Find slow queries
SELECT
  queryid,
  calls,
  mean_exec_time,
  total_exec_time / 1000 AS total_seconds,
  rows / calls AS avg_rows,
  shared_blks_hit / (shared_blks_hit + shared_blks_read) * 100 AS cache_hit_ratio
FROM pg_stat_statements
WHERE query NOT LIKE '%pg_stat%'
ORDER BY mean_exec_time DESC
LIMIT 20;

-- Identify missing indexes
SELECT
  relname AS table_name,
  seq_scan,
  seq_tup_read,
  idx_scan,
  seq_tup_read / seq_scan AS avg_rows_per_seq_scan
FROM pg_stat_user_tables
WHERE seq_scan > 1000
ORDER BY seq_scan DESC;

Caching Strategies

Multi-Level Caching

from functools import lru_cache
from typing import Callable, Optional, Any
import json
import hashlib
import time

import redis


class MultiLevelCache:
    """L1 (memory) + L2 (Redis) cache with TTL."""

    def __init__(self, redis_client: redis.Redis, l1_size: int = 1024):
        self.redis = redis_client
        self.l1_ttl = 60      # 1 minute in L1
        self.l2_ttl = 300     # 5 minutes in L2

    @lru_cache(maxsize=1024)
    def _l1_get(self, cache_key: str) -> Optional[str]:
        """L1 cache hit (process-local, fast)."""
        return None  # Miss triggers L2 lookup

    def get(self, key: str, fetch_fn: Callable[[], Any], ttl: int = 300) -> Any:
        """Get with L1 → L2 → origin fallback."""
        cache_key = self._make_key(key)

        # L1 check (memory)
        l1_value = self._l1_get(cache_key)
        if l1_value is not None:
            return json.loads(l1_value)

        # L2 check (Redis)
        l2_value = self.redis.get(cache_key)
        if l2_value is not None:
            # Populate L1
            self._l1_get.cache_clear()
            return json.loads(l2_value)

        # Miss — fetch from origin
        value = fetch_fn()
        serialized = json.dumps(value)

        # Populate L2
        self.redis.setex(cache_key, ttl, serialized)

        return value

    def invalidate(self, pattern: str):
        """Invalidate by pattern across both caches."""
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)
        self._l1_get.cache_clear()

    @staticmethod
    def _make_key(key: str) -> str:
        return f"cache:{hashlib.md5(key.encode()).hexdigest()}"

CDN Caching Configuration

# CDN cache configuration (Vercel/Cloudflare)
cache_rules:
  static_assets:
    - pattern: "/static/*"
      ttl: 31536000          # 1 year
      stale_while_revalidate: 86400  # Serve stale while fetching fresh

  api_responses:
    - pattern: "/api/public/*"
      ttl: 60                 # 1 minute
      stale_if_error: 3600    # Serve stale if origin fails

  html_pages:
    - pattern: "/*.html"
      ttl: 0                  # Never cache
      bypass: true

  images:
    - pattern: "/images/*"
      ttl: 604800             # 1 week
      transform:
        resize: "fit"
        width: 1200
        quality: 80

Cache Invalidation Patterns

Pattern Strategy Use Case Complexity
TTL-based Expire after fixed time Stale data acceptable Low
Write-through Update cache on write Strong consistency Medium
Write-behind Async cache update High write throughput High
Cache-aside Application manages cache General purpose Medium
Read-through Cache fetches from DB Read-heavy workloads Medium
Refresh-ahead Pre-fetch before expiry Predictable access patterns High

HTTP Caching Headers

# FastAPI response caching
from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse

app = FastAPI()


@app.get("/api/products/{product_id}")
async def get_product(product_id: str):
    product = await fetch_product(product_id)

    return JSONResponse(
        content=product,
        headers={
            "Cache-Control": "public, max-age=60, stale-while-revalidate=300",
            "ETag": f"W/\"{product['updated_at']}\"",
            "Vary": "Accept-Encoding",
        },
    )


@app.get("/api/users/me")
async def get_current_user(response: Response):
    # Private data — never cache in shared caches
    response.headers["Cache-Control"] = "private, no-cache, no-store, must-revalidate"
    return await get_user_profile()

Scaling Strategies

Horizontal Scaling

# Load balancer with health checks
class LoadBalancer:
    def __init__(self, servers: list):
        self.servers = servers
        self.current = 0

    def get_server(self):
        """Round-robin with health check."""
        healthy = self.get_healthy_servers()
        if not healthy:
            raise RuntimeError("No healthy servers available")

        server = healthy[self.current % len(healthy)]
        self.current += 1
        return server

    def get_healthy_servers(self):
        return [s for s in self.servers if s.is_healthy()]

    def add_server(self, server):
        self.servers.append(server)

    def remove_server(self, server_url: str):
        self.servers = [s for s in self.servers if s.url != server_url]

Database Sharding

import hashlib
from typing import Any


class ShardedDatabase:
    """Consistent hash-based database sharding."""

    def __init__(self, shards: list[str]):
        self.shards = shards

    def _get_shard(self, key: str) -> str:
        """Determine shard by consistent hash of key."""
        hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16)
        return self.shards[hash_val % len(self.shards)]

    def get_connection(self, user_id: str):
        """Get database connection for a user."""
        shard_url = self._get_shard(user_id)
        return create_engine(shard_url)

    def migrate_shard(self, from_shard: str, to_shard: str, keys: list[str]):
        """Migrate keys between shards."""
        for key in keys:
            data = self.read(key)
            # Write to new shard
            # Update routing table
            pass

Queue-Based Scaling

import asyncio
from typing import Callable, Any
from dataclasses import dataclass, field
from datetime import datetime

import redis.asyncio as aioredis


@dataclass
class Task:
    id: str
    name: str
    payload: dict
    priority: int = 0
    created_at: datetime = field(default_factory=datetime.utcnow)
    retries: int = 0


class AsyncTaskQueue:
    """Redis-backed async task queue for background processing."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = aioredis.from_url(redis_url)

    async def enqueue(self, task: Task) -> str:
        """Add task to queue."""
        serialized = json.dumps({
            "id": task.id,
            "name": task.name,
            "payload": task.payload,
            "priority": task.priority,
            "created_at": task.created_at.isoformat(),
        })

        await self.redis.zadd(
            "task_queue",
            {serialized: task.priority},
        )
        return task.id

    async def dequeue(self) -> Optional[Task]:
        """Get highest-priority task."""
        tasks = await self.redis.zpopmax("task_queue")
        if not tasks:
            return None

        data = json.loads(tasks[0][0])
        return Task(**data)

    async def process_loop(self, handler: Callable[[Task], Any], workers: int = 4):
        """Continuously process tasks with N workers."""

        async def worker(worker_id: int):
            while True:
                task = await self.dequeue()
                if task is None:
                    await asyncio.sleep(0.1)
                    continue

                try:
                    await handler(task)
                except Exception as e:
                    print(f"[Worker {worker_id}] Error processing {task.id}: {e}")
                    if task.retries < 3:
                        task.retries += 1
                        await self.enqueue(task)

        await asyncio.gather(*[worker(i) for i in range(workers)])

Auto-Scaling Configuration

# Auto-scaling policy (Kubernetes HPA)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: api-server
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: api-server
  minReplicas: 3
  maxReplicas: 50
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80
    - type: Pods
      pods:
        metric:
          name: requests_per_second
        target:
          type: AverageValue
          averageValue: 1000

Frontend Optimization

Bundle Size Optimization

// Dynamic imports for code splitting
const Dashboard = React.lazy(() => import('./Dashboard'));
const Analytics = React.lazy(() => import('./Analytics'));
const Settings = React.lazy(() => import('./Settings'));

function App() {
  return (
    <Suspense fallback={<Loading />}>
      <Routes>
        <Route path="/dashboard" element={<Dashboard />} />
        <Route path="/analytics" element={<Analytics />} />
        <Route path="/settings" element={<Settings />} />
      </Routes>
    </Suspense>
  );
}
// Tree-shakable imports — import only what you use
// ❌ Bad: imports entire library
import _ from 'lodash';
const result = _.chunk(array, 2);

// ✅ Good: imports only the function
import chunk from 'lodash/chunk';
const result = chunk(array, 2);

// ✅ Best: ES module tree-shakable
import { chunk } from 'es-toolkit';
const result = chunk(array, 2);

Image Optimization

<!-- Modern image formats with responsive sizes -->
<img
  src="photo.avif"
  srcset="
    photo-320.avif 320w,
    photo-768.avif 768w,
    photo-1200.avif 1200w
  "
  sizes="
    (max-width: 320px) 100vw,
    (max-width: 768px) 100vw,
    1200px
  "
  loading="lazy"
  decoding="async"
  fetchpriority="low"
  alt="Description"
  width="1200"
  height="800"
/>

Critical CSS

<!-- Inline critical CSS, defer the rest -->
<head>
  <style>
    /* Critical CSS — above-the-fold styles */
    body { font-family: system-ui, sans-serif; margin: 0; }
    header { height: 60px; display: flex; align-items: center; }
    .hero { min-height: 400px; background: #f5f5f5; }
  </style>
  <link
    rel="stylesheet"
    href="/styles/full.css"
    media="print"
    onload="this.media='all'"
  />
</head>

Performance Budgets

Metric Good Needs Improvement Poor
Largest Contentful Paint (LCP) < 2.5s 2.5s - 4.0s > 4.0s
First Input Delay (FID) < 100ms 100ms - 300ms > 300ms
Cumulative Layout Shift (CLS) < 0.1 0.1 - 0.25 > 0.25
Interaction to Next Paint (INP) < 200ms 200ms - 500ms > 500ms
Time to First Byte (TTFB) < 800ms 800ms - 1800ms > 1800ms
First Contentful Paint (FCP) < 1.8s 1.8s - 3.0s > 3.0s

Network and I/O Optimization

Connection Pooling and Reuse

import aiohttp
import asyncio


class HttpClientPool:
    """Reusable HTTP connection pool."""

    def __init__(self, max_connections: int = 100):
        self.session = aiohttp.ClientSession(
            connector=aiohttp.TCPConnector(
                limit=max_connections,
                ttl_dns_cache=300,
                keepalive_timeout=30,
                force_close=False,
            ),
            timeout=aiohttp.ClientTimeout(total=30),
        )

    async def get(self, url: str) -> dict:
        async with self.session.get(url) as response:
            return await response.json()

    async def close(self):
        await self.session.close()

Compression

# Nginx compression configuration
gzip on;
gzip_comp_level 6;
gzip_min_length 256;
gzip_types
  text/plain
  text/css
  text/javascript
  application/json
  application/javascript
  application/xml
  image/svg+xml;

# Brotli (preferred over gzip when available)
brotli on;
brotli_comp_level 6;
brotli_types
  text/plain
  text/css
  text/javascript
  application/json
  application/javascript;

Conclusion

Performance optimization is iterative: profile to find bottlenecks, optimize the critical path, cache aggressively, and scale horizontally. Measure before and after every change. Focus on user-perceived performance, not just metrics.

Key principles to remember:

  • Always measure before optimizing
  • Profile in production-like environments
  • Cache at every level (L1 → L2 → CDN)
  • Scale horizontally before vertically
  • Frontend performance matters as much as backend
  • Set performance budgets and enforce them in CI

Resources

Comments

👍 Was this article helpful?