Introduction
Scaling a SaaS from 100 to 1 million users requires deliberate architectural decisions at each stage. What works at 100 users fails at 10,000. This guide covers the journey.
Key Statistics:
- 90% of startups fail to scale past 100 users
- Proper architecture reduces scaling costs by 60%
- Database is the #1 scaling bottleneck
- Caching can reduce load by 99%
What Is Scalability and Why It Matters
Scalability is the ability of a system to handle increased load without sacrificing performance or reliability. In SaaS, scalability means your application can grow with your businessโfrom a handful of users to millionsโwithout requiring a complete rewrite.
Vertical vs Horizontal Scaling
Vertical Scaling (Scaling Up)
- Add more resources to existing servers (CPU, RAM, storage)
- Simpler to implement
- Limited by hardware constraints
- Single point of failure
Horizontal Scaling (Scaling Out)
- Add more servers to distribute load
- More complex but unlimited potential
- Better fault tolerance
- Requires load balancing and state management
The Scalability Matrix
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Scalability Challenges โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Stage Users Requests/sec Database Size โ
โ โโโโโ โโโโโ โโโโโโโโโโโโ โโโโโโโโโโโ โ
โ 1. 100 100 10 1GB โ
โ 2. 1K 1,000 100 10GB โ
โ 3. 10K 10,000 1,000 100GB โ
โ 4. 100K 100,000 10,000 1TB โ
โ 5. 1M+ 1M+ 100K+ 10TB+ โ
โ โ
โ Each stage requires different architectural approaches โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Common Scaling Anti-Patterns
1. Premature Optimization
- โ Over-engineering for future scale
- โ Optimize when you hit bottlenecks
- โ Start simple and evolve
2. Single Point of Failure
- โ Single database, single server
- โ Redundancy at every layer
- โ Multi-AZ and multi-region deployments
3. Stateful Applications
- โ Storing session data in memory
- โ Use external session stores (Redis)
- โ Make applications stateless
4. No Monitoring
- โ Flying blind without metrics
- โ Instrument everything
- โ Set up alerts for anomalies
Scaling Stages
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SaaS Scaling Journey โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Stage 1: 100-1K users โ
โ โโโ Monolithic app โ
โ โโโ Single PostgreSQL instance โ
โ โโโ Basic caching (Redis) โ
โ โโโ CDN for static assets โ
โ โ
โ Stage 2: 1K-10K users โ
โ โโโ Read replicas โ
โ โโโ Redis cluster โ
โ โโโ Application caching โ
โ โโโ Queue for async processing โ
โ โ
โ Stage 3: 10K-100K users โ
โ โโโ Microservices โ
โ โโโ Database sharding โ
โ โโโ Multi-region deployment โ
โ โโโ Message queues for reliability โ
โ โ
โ Stage 4: 100K-1M+ users โ
โ โโโ Event-driven architecture โ
โ โโโ Polyglot persistence โ
โ โโโ Edge computing โ
โ โโโ Advanced optimization โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Stage 1: Foundation (100-1K users)
Monolithic Optimization
# docker-compose.yml for small scale
services:
app:
image: myapp:latest
deploy:
replicas: 2
resources:
limits:
cpu: 1000m
memory: 1G
environment:
- DATABASE_URL=postgresql://db:5432/app
- REDIS_URL=redis://cache:6379
depends_on:
- db
- cache
db:
image: postgres:15
volumes:
- pgdata:/var/lib/postgresql/data
environment:
- POSTGRES_DB=app
command: >
postgres
-c shared_buffers=256MB
-c effective_cache_size=1GB
-c work_mem=16MB
cache:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
Database Optimization for Stage 1
At this stage, your database is the main bottleneck. Here’s how to optimize it:
1. Connection Pooling
#!/usr/bin/env python3
"""Database connection pooling."""
import psycopg2
from psycopg2 import pool
class ConnectionPool:
"""Manage database connection pool."""
def __init__(self, min_conn=5, max_conn=20):
self.pool = pool.ThreadedConnectionPool(
min_conn,
max_conn,
host='localhost',
database='app',
user='app_user',
password='password'
)
def get_connection(self):
"""Get connection from pool."""
return self.pool.getconn()
def return_connection(self, conn):
"""Return connection to pool."""
self.pool.putconn(conn)
def close_all(self):
"""Close all connections."""
self.pool.closeall()
# Usage
pool = ConnectionPool()
def get_db_connection():
conn = pool.get_connection()
try:
yield conn
finally:
pool.return_connection(conn)
2. Query Optimization
#!/usr/bin/env python3
"""Query optimization examples."""
import psycopg2
def optimize_queries():
"""Optimize database queries."""
conn = psycopg2.connect(
host='localhost',
database='app',
user='app_user',
password='password'
)
cursor = conn.cursor()
# Bad: SELECT * is slow and fetches unnecessary data
cursor.execute("SELECT * FROM users WHERE email = %s", ('[email protected]',))
# Good: Select only needed columns
cursor.execute(
"SELECT id, name, email, created_at FROM users WHERE email = %s",
('[email protected]',)
)
# Bad: No index on email column
# Good: Create index
cursor.execute("CREATE INDEX idx_users_email ON users(email)")
# Bad: N+1 query problem
users = cursor.execute("SELECT * FROM users").fetchall()
for user in users:
orders = cursor.execute(
"SELECT * FROM orders WHERE user_id = %s", (user['id'],)
).fetchall()
# Good: Use JOIN to fetch related data
cursor.execute("""
SELECT u.id, u.name, u.email, o.id as order_id, o.amount
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.email = %s
""", ('[email protected]',))
conn.commit()
cursor.close()
conn.close()
3. Read Replicas
#!/usr/bin/env python3
"""Read replica configuration."""
import psycopg2
from psycopg2 import pool
class ReadReplicaPool:
"""Manage read replica connection pool."""
def __init__(self, replicas):
self.replicas = replicas
self.current_replica = 0
self.pools = {}
for replica in replicas:
self.pools[replica['name']] = pool.ThreadedConnectionPool(
5, 20,
host=replica['host'],
database=replica['database'],
user=replica['user'],
password=replica['password']
)
def get_replica_connection(self):
"""Get connection from replica (round-robin)."""
replica = self.replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.replicas)
return self.pools[replica['name']].getconn()
def return_connection(self, conn, replica_name):
"""Return connection to replica pool."""
self.pools[replica_name].putconn(conn)
Caching Strategy for Stage 1
#!/usr/bin/env python3
"""Caching implementation for small scale."""
import redis
import json
from functools import wraps
class SimpleCache:
"""Simple caching implementation."""
def __init__(self, redis_url='redis://localhost:6379'):
self.redis = redis.from_url(redis_url)
def cache(self, key_prefix, ttl=300):
"""Cache decorator."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{str(args)}:{str(sorted(kwargs.items()))}"
# Check cache
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Execute function
result = func(*args, **kwargs)
# Cache result
self.redis.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
def invalidate(self, pattern):
"""Invalidate cache by pattern."""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
# Usage
cache = SimpleCache()
@cache.cache('users', ttl=3600)
def get_user(user_id):
"""Get user from database with caching."""
# Database query
return db.query("SELECT * FROM users WHERE id = %s", (user_id,))
@cache.cache('products', ttl=1800)
def get_products(category=None):
"""Get products with caching."""
query = "SELECT * FROM products"
if category:
query += " WHERE category = %s"
return db.query(query, (category,) if category else ())
Monitoring for Stage 1
#!/usr/bin/env python3
"""Basic monitoring for small scale."""
import time
import psutil
from datetime import datetime
class SimpleMonitor:
"""Basic system monitoring."""
def __init__(self):
self.metrics = []
def record_metrics(self):
"""Record current system metrics."""
metrics = {
'timestamp': datetime.utcnow().isoformat(),
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'network_io': psutil.net_io_counters()._asdict()
}
self.metrics.append(metrics)
return metrics
def check_thresholds(self, cpu_threshold=80, memory_threshold=80):
"""Check if metrics exceed thresholds."""
current = self.record_metrics()
alerts = []
if current['cpu_percent'] > cpu_threshold:
alerts.append(f"High CPU: {current['cpu_percent']}%")
if current['memory_percent'] > memory_threshold:
alerts.append(f"High Memory: {current['memory_percent']}%")
return alerts
def get_history(self, minutes=60):
"""Get metrics history."""
cutoff = datetime.utcnow().timestamp() - (minutes * 60)
return [
m for m in self.metrics
if datetime.fromisoformat(m['timestamp']).timestamp() > cutoff
]
Stage 2: Read Scaling (1K-10K)
Read Replicas
#!/usr/bin/env python3
"""Read/write splitting for PostgreSQL."""
import random
from contextlib import contextmanager
class ReadWriteRouter:
"""Route queries to appropriate database."""
def __init__(self, primary_config, replica_configs):
self.primary = primary_config
self.replicas = replica_configs
self.current_replica = 0
def get_replica(self):
"""Get next replica in round-robin."""
if not self.replicas:
return self.primary
replica = self.replicas[self.current_replica]
self.current_replica = (self.current_replica + 1) % len(self.replicas)
return replica
@contextmanager
def get_connection(self, read_only=False):
"""Get database connection."""
config = self.get_replica() if read_only else self.primary
connection = create_connection(config)
try:
yield connection
finally:
connection.close()
# Usage in SQLAlchemy
class RoutingSession(Session):
def get_bind(self):
if self._flushing or self._query_changed():
return self.primary_bind
return self.get_replica_bind()
Application-Level Caching
#!/usr/bin/env python3
"""Multi-layer caching implementation."""
import redis
import json
from functools import wraps
import hashlib
class CacheManager:
"""Manage multi-layer caching."""
def __init__(self, redis_client, local_cache_size=1000):
self.redis = redis_client
self.local_cache = {}
self.local_cache_size = local_cache_size
def cache(self, key_prefix, ttl=300):
"""Cache decorator."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self._generate_key(key_prefix, args, kwargs)
# Check local cache first
if cache_key in self.local_cache:
return self.local_cache[cache_key]
# Check Redis
cached = self.redis.get(cache_key)
if cached:
result = json.loads(cached)
self.local_cache[cache_key] = result
return result
# Execute function
result = func(*args, **kwargs)
# Store in caches
self.redis.setex(cache_key, ttl, json.dumps(result))
self.local_cache[cache_key] = result
# Evict local cache if needed
if len(self.local_cache) > self.local_cache_size:
oldest = next(iter(self.local_cache))
del self.local_cache[oldest]
return result
return wrapper
return decorator
def invalidate(self, pattern):
"""Invalidate cache by pattern."""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
# Clear local cache matching pattern
self.local_cache = {
k: v for k, v in self.local_cache.items()
if not k.startswith(pattern.replace('*', ''))
}
Redis Cluster Setup
#!/usr/bin/env python3
"""Redis cluster configuration."""
from redis.cluster import RedisCluster
class RedisClusterManager:
"""Manage Redis cluster."""
def __init__(self, nodes):
"""
nodes: List of {'host', 'port'} dicts
"""
self.nodes = nodes
self.client = RedisCluster(
startup_nodes=nodes,
decode_responses=True
)
def get(self, key):
"""Get value from cluster."""
return self.client.get(key)
def set(self, key, value, ex=None):
"""Set value in cluster."""
return self.client.set(key, value, ex=ex)
def delete(self, key):
"""Delete key from cluster."""
return self.client.delete(key)
def invalidate_pattern(self, pattern):
"""Invalidate all keys matching pattern."""
for key in self.client.scan_iter(match=pattern):
self.client.delete(key)
Queue Processing
#!/usr/bin/env python3
"""Queue processing for async tasks."""
import redis
import json
from datetime import datetime
class TaskQueue:
"""Simple task queue using Redis."""
def __init__(self, redis_client, queue_name='tasks'):
self.redis = redis_client
self.queue_name = queue_name
def enqueue(self, task_type, payload):
"""Add task to queue."""
task = {
'id': str(uuid.uuid4()),
'type': task_type,
'payload': payload,
'created_at': datetime.utcnow().isoformat(),
'status': 'pending'
}
self.redis.lpush(f"{self.queue_name}:pending", json.dumps(task))
return task['id']
def dequeue(self):
"""Get task from queue."""
task_json = self.redis.rpop(f"{self.queue_name}:pending")
if task_json:
return json.loads(task_json)
return None
def complete(self, task_id, result=None):
"""Mark task as complete."""
task = self.redis.get(f"{self.queue_name}:active:{task_id}")
if task:
task = json.loads(task)
task['status'] = 'completed'
task['completed_at'] = datetime.utcnow().isoformat()
task['result'] = result
self.redis.set(
f"{self.queue_name}:completed:{task_id}",
json.dumps(task)
)
self.redis.delete(f"{self.queue_name}:active:{task_id}")
def process_tasks(self, handler, poll_interval=1):
"""Process tasks from queue."""
while True:
task = self.dequeue()
if task:
# Mark as active
self.redis.setex(
f"{self.queue_name}:active:{task['id']}",
300, # 5 minute TTL
json.dumps(task)
)
try:
result = handler(task)
self.complete(task['id'], result)
except Exception as e:
self.fail(task['id'], str(e))
time.sleep(poll_interval)
def fail(self, task_id, error):
"""Mark task as failed."""
task = self.redis.get(f"{self.queue_name}:active:{task_id}")
if task:
task = json.loads(task)
task['status'] = 'failed'
task['failed_at'] = datetime.utcnow().isoformat()
task['error'] = error
self.redis.set(
f"{self.queue_name}:failed:{task_id}",
json.dumps(task)
)
self.redis.delete(f"{self.queue_name}:active:{task_id}")
Health Checks
#!/usr/bin/env python3
"""Health check endpoints."""
import psutil
import redis
class HealthChecker:
"""Check system health."""
def __init__(self, redis_client):
self.redis = redis_client
def check_database(self):
"""Check database connectivity."""
try:
# Try a simple query
db.execute("SELECT 1")
return True
except:
return False
def check_redis(self):
"""Check Redis connectivity."""
try:
self.redis.ping()
return True
except:
return False
def check_disk_space(self, threshold=90):
"""Check disk space."""
usage = psutil.disk_usage('/')
return usage.percent < threshold
def check_memory(self, threshold=90):
"""Check memory usage."""
return psutil.virtual_memory().percent < threshold
def get_health_status(self):
"""Get comprehensive health status."""
status = {
'timestamp': datetime.utcnow().isoformat(),
'database': self.check_database(),
'redis': self.check_redis(),
'disk_space': self.check_disk_space(),
'memory': self.check_memory(),
'overall': True
}
status['overall'] = all([
status['database'],
status['redis'],
status['disk_space'],
status['memory']
])
return status
Stage 3: Scale Out (10K-100K)
Database Sharding
#!/usr/bin/env python3
"""Consistent hashing for database sharding."""
import hashlib
from bisect import bisect
class ShardRouter:
"""Route to database shards using consistent hashing."""
def __init__(self, shards):
self.shards = shards
self.ring = []
self.hash_to_shard = {}
# Build hash ring
for shard in shards:
for i in range(100): # Virtual nodes
key = f"{shard['name']}-{i}"
hash_value = self._hash(key)
self.ring.append((hash_value, shard))
self.hash_to_shard[hash_value] = shard
self.ring.sort(key=lambda x: x[0])
def _hash(self, key):
"""Generate consistent hash."""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def get_shard(self, key):
"""Get shard for key."""
hash_value = self._hash(key)
# Find first shard with hash >= key hash
idx = bisect(self.ring, (hash_value,))
if idx >= len(self.ring):
idx = 0
return self.ring[idx][1]
def add_shard(self, shard):
"""Add new shard."""
# Re-distribute keys (simplified - real implementation needs migration)
self.shards.append(shard)
def remove_shard(self, shard_name):
"""Remove shard."""
self.shards = [s for s in self.shards if s['name'] != shard_name]
Event-Driven Architecture
#!/usr/bin/env python3
"""Event-driven communication."""
from kafka import KafkaProducer, KafkaConsumer
import json
class EventBus:
"""Publish/subscribe event bus."""
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
self.consumers = {}
def publish(self, topic, event):
"""Publish event."""
self.producer.send(topic, value=event)
self.producer.flush()
def subscribe(self, topic, group_id, handler):
"""Subscribe to events."""
consumer = KafkaConsumer(
topic,
group_id=group_id,
value_deserializer=lambda v: json.loads(v.decode()),
auto_offset_reset='latest'
)
for message in consumer:
try:
handler(message.value)
except Exception as e:
# Handle error - retry or dead letter queue
print(f"Error processing: {e}")
# Usage
events = EventBus(['kafka:9092'])
# Publish events
events.publish('user.created', {
'type': 'user.created',
'user_id': '123',
'email': '[email protected]'
})
# Subscribe
def handle_user_created(event):
send_welcome_email(event['email'])
events.subscribe('user.created', 'email-service', handle_user_created)
Microservices Architecture
#!/usr/bin/env python3
"""Microservices communication."""
import requests
import json
from typing import Dict, Optional
class MicroserviceClient:
"""Client for microservices communication."""
def __init__(self, services: Dict[str, str]):
"""
services: Dict of service_name -> base_url
Example: {'users': 'http://users-service:8080', 'orders': 'http://orders-service:8080'}
"""
self.services = services
self.timeout = 5 # seconds
def call_service(self, service_name: str, endpoint: str,
method: str = 'GET', data: Optional[Dict] = None) -> Dict:
"""Call another microservice."""
base_url = self.services.get(service_name)
if not base_url:
raise ValueError(f"Unknown service: {service_name}")
url = f"{base_url}{endpoint}"
try:
if method == 'GET':
response = requests.get(url, timeout=self.timeout)
elif method == 'POST':
response = requests.post(url, json=data, timeout=self.timeout)
elif method == 'PUT':
response = requests.put(url, json=data, timeout=self.timeout)
elif method == 'DELETE':
response = requests.delete(url, timeout=self.timeout)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
raise ServiceTimeoutError(f"Service {service_name} timed out")
except requests.exceptions.RequestException as e:
raise ServiceError(f"Error calling {service_name}: {str(e)}")
# Usage
services = {
'users': 'http://users-service:8080',
'orders': 'http://orders-service:8080',
'payments': 'http://payments-service:8080'
}
client = MicroserviceClient(services)
# Get user from users service
user = client.call_service('users', '/api/v1/users/123')
# Create order in orders service
order = client.call_service('orders', '/api/v1/orders', method='POST', data={
'user_id': 123,
'items': [{'product_id': 456, 'quantity': 2}],
'total': 199.99
})
Service Discovery
#!/usr/bin/env python3
"""Service discovery using Consul."""
import requests
import json
class ServiceDiscovery:
"""Service discovery using Consul."""
def __init__(self, consul_url='http://localhost:8500'):
self.consul_url = consul_url
def register_service(self, service_name, service_host, service_port,
tags=None, check_interval='10s'):
"""Register service with Consul."""
service_id = f"{service_name}-{service_host}-{service_port}"
payload = {
'ID': service_id,
'Name': service_name,
'Address': service_host,
'Port': service_port,
'Tags': tags or [],
'Check': {
'HTTP': f"http://{service_host}:{service_port}/health",
'Interval': check_interval
}
}
response = requests.put(
f"{self.consul_url}/v1/agent/service/register",
json=payload
)
return response.status_code == 200
def deregister_service(self, service_id):
"""Deregister service from Consul."""
response = requests.put(
f"{self.consul_url}/v1/agent/service/deregister/{service_id}"
)
return response.status_code == 200
def get_service(self, service_name):
"""Get service instances."""
response = requests.get(
f"{self.consul_url}/v1/health/service/{service_name}"
)
if response.status_code == 200:
return response.json()
return []
def get_all_services(self):
"""Get all registered services."""
response = requests.get(
f"{self.consul_url}/v1/agent/services"
)
if response.status_code == 200:
return response.json()
return {}
Circuit Breaker Pattern
#!/usr/bin/env python3
"""Circuit breaker implementation."""
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = 'closed' # Normal operation
OPEN = 'open' # Circuit is open, requests fail fast
HALF_OPEN = 'half_open' # Testing if service recovered
class CircuitBreaker:
"""Circuit breaker for service calls."""
def __init__(self, failure_threshold=5, recovery_timeout=30,
half_open_max_calls=3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time = None
self._half_open_calls = 0
self._lock = Lock()
@property
def state(self):
"""Get current circuit state."""
with self._lock:
if self._state == CircuitState.OPEN:
# Check if recovery timeout has passed
if self._last_failure_time and \
time.time() - self._last_failure_time > self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
self._half_open_calls = 0
return self._state
def call(self, func, *args, **kwargs):
"""Call function with circuit breaker protection."""
if self.state == CircuitState.OPEN:
raise CircuitOpenError("Circuit is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call."""
with self._lock:
self._failure_count = 0
self._state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call."""
with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
elif self._state == CircuitState.HALF_OPEN:
self._half_open_calls += 1
if self._half_open_calls >= self.half_open_max_calls:
self._state = CircuitState.CLOSED
self._failure_count = 0
self._half_open_calls = 0
# Usage
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=60)
def call_external_service():
# Your service call logic
pass
try:
result = circuit.call(call_external_service)
except CircuitOpenError:
# Return cached data or fallback
pass
Load Balancing
#!/usr/bin/env python3
"""Load balancing strategies."""
import random
from collections import defaultdict
class LoadBalancer:
"""Load balancer with multiple strategies."""
def __init__(self, servers):
self.servers = servers
self.request_count = defaultdict(int)
def select_server_round_robin(self):
"""Select server using round-robin."""
server = self.servers[self.request_count['round_robin'] % len(self.servers)]
self.request_count['round_robin'] += 1
return server
def select_server_random(self):
"""Select server randomly."""
return random.choice(self.servers)
def select_server_least_connections(self):
"""Select server with least active connections."""
return min(self.servers, key=lambda s: s.get('active_connections', 0))
def select_server_weighted(self):
"""Select server based on weights."""
total_weight = sum(s.get('weight', 1) for s in self.servers)
random_weight = random.uniform(0, total_weight)
cumulative = 0
for server in self.servers:
cumulative += server.get('weight', 1)
if random_weight <= cumulative:
return server
return self.servers[-1]
# Usage
servers = [
{'host': 'server1.example.com', 'port': 8080, 'weight': 2},
{'host': 'server2.example.com', 'port': 8080, 'weight': 1},
{'host': 'server3.example.com', 'port': 8080, 'weight': 1}
]
lb = LoadBalancer(servers)
# Use different strategies
server = lb.select_server_round_robin()
server = lb.select_server_random()
server = lb.select_server_least_connections()
server = lb.select_server_weighted()
Stage 4: Hyperscale (100K-1M+)
Multi-Region Architecture
# Terraform multi-region
provider "aws" {
alias = "primary"
region = "us-east-1"
}
provider "aws" {
alias = "secondary"
region = "us-west-2"
}
# Primary region resources
resource "aws_db_instance" "primary" {
provider = aws.primary
identifier = "app-primary"
multi_az = true
engine = "postgres"
instance_class = "db.r6g.xlarge"
backup_retention_period = 7
}
# Read replica in secondary region
resource "aws_db_instance" "replica" {
provider = aws.secondary
identifier = "app-replica"
source_db_instance_identifier = aws_db_instance.primary.identifier
instance_class = "db.r6g.xlarge"
}
# Global Accelerator for DNS failover
resource "aws_globalaccelerator_accelerator" "main" {
name = "app-accelerator"
enabled = true
}
Performance Optimization
#!/usr/bin/env python3
"""Performance monitoring and auto-scaling."""
import boto3
import time
class AutoScaler:
"""Auto-scale based on metrics."""
def __init__(self, config):
self.asg = boto3.client('autoscaling')
self.cloudwatch = boto3.client('cloudwatch')
self.config = config
def check_and_scale(self):
"""Check metrics and scale if needed."""
metrics = self.get_metrics()
if metrics['cpu_utilization'] > self.config['scale_up_threshold']:
self.scale_up()
elif metrics['cpu_utilization'] < self.config['scale_down_threshold']:
self.scale_down()
def scale_up(self):
"""Scale out."""
self.asg.execute_policy(
AutoScalingGroupName=self.config['asg_name'],
PolicyName='scale-out',
HonorCooldown=True
)
def get_metrics(self):
"""Get current metrics."""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'AutoScalingGroupName', 'Value': self.config['asg_name']}
],
StartTime=time.time() - 300,
EndTime=time.time(),
Period=60,
Statistics=['Average']
)
return {
'cpu_utilization': response['Datapoints'][0]['Average'] if response['Datapoints'] else 0
}
Edge Computing
#!/usr/bin/env python3
"""Edge computing with Cloudflare Workers."""
# Edge function for caching and routing
async function handleRequest(request) {
const url = new URL(request.url);
// Check cache first
let response = await cache.match(request);
if (!response) {
// Cache miss - fetch from origin
response = await fetch(request);
// Cache successful responses
if (response.ok) {
response = new Response(response.body, response);
response.headers.set('Cache-Control', 's-maxage=3600');
event.waitUntil(cache.put(request, response.clone()));
}
}
return response;
}
// Add to every response
async function addSecurityHeaders(request, response) {
const headers = {
'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'Content-Security-Policy': "default-src 'self'",
'X-XSS-Protection': '1; mode=block'
};
const newResponse = new Response(response.body, response);
for (const [key, value] of Object.entries(headers)) {
newResponse.headers.set(key, value);
}
return newResponse;
}
Database Optimization for Hyperscale
#!/usr/bin/env python3
"""Advanced database optimization."""
import psycopg2
from psycopg2.extras import RealDictCursor
class HyperscaleDatabase:
"""Database optimized for hyperscale."""
def __init__(self, connection_string):
self.conn = psycopg2.connect(connection_string)
self.conn.autocommit = True
def execute_with_retry(self, query, params=None, max_retries=3):
"""Execute query with retry logic."""
for attempt in range(max_retries):
try:
cursor = self.conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(query, params)
if query.strip().upper().startswith('SELECT'):
result = cursor.fetchall()
else:
self.conn.commit()
result = cursor.rowcount
cursor.close()
return result
except psycopg2.OperationalError as e:
if attempt == max_retries - 1:
raise
# Wait and retry
time.sleep(2 ** attempt)
def batch_insert(self, table, data, batch_size=1000):
"""Insert data in batches."""
if not data:
return
columns = list(data[0].keys())
placeholders = ', '.join(['%s'] * len(columns))
column_names = ', '.join(columns)
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
values = [tuple(row[col] for col in columns) for row in batch]
query = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})"
cursor = self.conn.cursor()
cursor.executemany(query, values)
self.conn.commit()
cursor.close()
def get_connection_pool(self):
"""Get connection pool for high concurrency."""
from psycopg2 import pool
return pool.ThreadedConnectionPool(
minconn=10,
maxconn=100,
dsn=self.conn.dsn
)
def enable_connection_caching(self):
"""Enable connection caching."""
# Use pgBouncer for connection pooling
# Configure pgBouncer to pool connections
pass
def implement_read_replica_routing(self):
"""Route read queries to replicas."""
# Use pgBouncer with different pools for read/write
# Or use SQLAlchemy with read/write splitting
pass
CDN Optimization
#!/usr/bin/env python3
"""CDN optimization strategies."""
import boto3
import os
class CDNManager:
"""Manage CDN for static assets."""
def __init__(self, cloudfront_distribution_id):
self.cloudfront = boto3.client('cloudfront')
self.distribution_id = cloudfront_distribution_id
def invalidate_cache(self, paths):
"""Invalidate CDN cache."""
response = self.cloudfront.create_invalidation(
DistributionId=self.distribution_id,
InvalidationBatch={
'Paths': {
'Quantity': len(paths),
'Items': paths
},
'CallerReference': str(time.time())
}
)
return response['Invalidation']['Id']
def optimize_static_assets(self, directory):
"""Optimize static assets for CDN."""
# Minify CSS/JS
# Compress images
# Set proper cache headers
for root, dirs, files in os.walk(directory):
for file in files:
if file.endswith(('.css', '.js')):
self.minify_file(os.path.join(root, file))
elif file.endswith(('.png', '.jpg', '.jpeg', '.gif')):
self.compress_image(os.path.join(root, file))
def minify_file(self, filepath):
"""Minify CSS/JS file."""
with open(filepath, 'r') as f:
content = f.read()
# Remove whitespace, comments, etc.
minified = self._minify(content)
with open(filepath, 'w') as f:
f.write(minified)
def compress_image(self, filepath):
"""Compress image file."""
# Use PIL or similar to compress
pass
def set_cache_headers(self, filepath, max_age=31536000):
"""Set cache headers for file."""
# Configure S3 bucket with proper cache headers
# Or configure CDN to cache for specified duration
pass
Monitoring and Observability
#!/usr/bin/env python3
"""Advanced monitoring and observability."""
import time
import json
from datetime import datetime
from collections import defaultdict
class ObservabilitySystem:
"""Comprehensive observability system."""
def __init__(self):
self.metrics = defaultdict(list)
self.traces = []
self.logs = []
def record_metric(self, name, value, tags=None):
"""Record a metric."""
metric = {
'name': name,
'value': value,
'timestamp': datetime.utcnow().isoformat(),
'tags': tags or {}
}
self.metrics[name].append(metric)
def start_trace(self, name, context=None):
"""Start a new trace."""
return {
'trace_id': str(uuid.uuid4()),
'name': name,
'context': context or {},
'start_time': time.time(),
'spans': []
}
def add_span(self, trace, name, duration_ms, tags=None):
"""Add a span to trace."""
span = {
'name': name,
'duration_ms': duration_ms,
'start_time': time.time() - (duration_ms / 1000),
'tags': tags or {}
}
trace['spans'].append(span)
return span
def end_trace(self, trace, error=None):
"""End a trace."""
trace['end_time'] = time.time()
trace['total_duration_ms'] = (trace['end_time'] - trace['start_time']) * 1000
trace['error'] = error
self.traces.append(trace)
def log(self, level, message, context=None):
"""Log a message."""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'message': message,
'context': context or {}
}
self.logs.append(log_entry)
def get_alerts(self):
"""Get current alerts."""
alerts = []
# Check for high error rates
if self._error_rate() > 0.05:
alerts.append({
'type': 'high_error_rate',
'severity': 'critical',
'message': f"Error rate is {self._error_rate() * 100:.1f}%"
})
# Check for slow response times
if self._avg_response_time() > 1000:
alerts.append({
'type': 'slow_response',
'severity': 'warning',
'message': f"Average response time is {self._avg_response_time()}ms"
})
return alerts
def _error_rate(self):
"""Calculate error rate."""
total_requests = len(self.metrics.get('requests', []))
error_requests = len([
m for m in self.metrics.get('requests', [])
if m.get('tags', {}).get('status', '').startswith('5')
])
return error_requests / total_requests if total_requests > 0 else 0
def _avg_response_time(self):
"""Calculate average response time."""
response_times = [
m['value'] for m in self.metrics.get('response_time', [])
]
return sum(response_times) / len(response_times) if response_times else 0
Auto-Scaling Configuration
# Kubernetes Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: app-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: app
minReplicas: 2
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
External Resources
- AWS Auto Scaling
- Kubernetes Scaling
- Database Sharding
- The Scale Book
- High Performance Browser Networking
Related Articles
Conclusion
Scaling a SaaS from 100 to 1 million users is a journey of deliberate architectural decisions. Each stage requires different strategies, tools, and trade-offs. The key is to start simple, monitor your metrics, and scale when you hit bottlenecksโnot before.
Key takeaways:
- Start simple: Monolithic architecture works great for 100-1K users
- Monitor everything: You can’t scale what you don’t measure
- Scale horizontally: Add servers, not bigger servers
- Cache aggressively: Reduce database load with smart caching
- Decouple with events: Use event-driven architecture for scalability
- Plan for failure: Design for redundancy and failover
- Optimize databases: Indexes, read replicas, and sharding are essential
- Use CDNs: Offload static assets to edge networks
The most successful SaaS companies don’t try to predict their scaling needsโthey build systems that can adapt as they grow. Start with what works, measure your performance, and scale strategically.
Comments