Skip to main content
โšก Calmops

SaaS Scalability: From 100 to 1 Million Users

Introduction

Scaling a SaaS from 100 to 1 million users requires deliberate architectural decisions at each stage. What works at 100 users fails at 10,000. This guide covers the journey.

Key Statistics:

  • 90% of startups fail to scale past 100 users
  • Proper architecture reduces scaling costs by 60%
  • Database is the #1 scaling bottleneck
  • Caching can reduce load by 99%

What Is Scalability and Why It Matters

Scalability is the ability of a system to handle increased load without sacrificing performance or reliability. In SaaS, scalability means your application can grow with your businessโ€”from a handful of users to millionsโ€”without requiring a complete rewrite.

Vertical vs Horizontal Scaling

Vertical Scaling (Scaling Up)

  • Add more resources to existing servers (CPU, RAM, storage)
  • Simpler to implement
  • Limited by hardware constraints
  • Single point of failure

Horizontal Scaling (Scaling Out)

  • Add more servers to distribute load
  • More complex but unlimited potential
  • Better fault tolerance
  • Requires load balancing and state management

The Scalability Matrix

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Scalability Challenges                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Stage        Users     Requests/sec   Database Size            โ”‚
โ”‚  โ”€โ”€โ”€โ”€โ”€        โ”€โ”€โ”€โ”€โ”€     โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€   โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€              โ”‚
โ”‚  1. 100       100       10             1GB                      โ”‚
โ”‚  2. 1K        1,000     100            10GB                     โ”‚
โ”‚  3. 10K       10,000    1,000          100GB                    โ”‚
โ”‚  4. 100K      100,000   10,000         1TB                      โ”‚
โ”‚  5. 1M+       1M+       100K+          10TB+                    โ”‚
โ”‚                                                                  โ”‚
โ”‚  Each stage requires different architectural approaches           โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Common Scaling Anti-Patterns

1. Premature Optimization

  • โŒ Over-engineering for future scale
  • โœ… Optimize when you hit bottlenecks
  • โœ… Start simple and evolve

2. Single Point of Failure

  • โŒ Single database, single server
  • โœ… Redundancy at every layer
  • โœ… Multi-AZ and multi-region deployments

3. Stateful Applications

  • โŒ Storing session data in memory
  • โœ… Use external session stores (Redis)
  • โœ… Make applications stateless

4. No Monitoring

  • โŒ Flying blind without metrics
  • โœ… Instrument everything
  • โœ… Set up alerts for anomalies

Scaling Stages

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      SaaS Scaling Journey                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Stage 1: 100-1K users                                                 โ”‚
โ”‚  โ”œโ”€โ”€ Monolithic app                                          โ”‚
โ”‚  โ”œโ”€โ”€ Single PostgreSQL instance                               โ”‚
โ”‚  โ”œโ”€โ”€ Basic caching (Redis)                                   โ”‚
โ”‚  โ””โ”€โ”€ CDN for static assets                                    โ”‚
โ”‚                                                                  โ”‚
โ”‚  Stage 2: 1K-10K users                                              โ”‚
โ”‚  โ”œโ”€โ”€ Read replicas                                            โ”‚
โ”‚  โ”œโ”€โ”€ Redis cluster                                           โ”‚
โ”‚  โ”œโ”€โ”€ Application caching                                      โ”‚
โ”‚  โ””โ”€โ”€ Queue for async processing                               โ”‚
โ”‚                                                                  โ”‚
โ”‚  Stage 3: 10K-100K users                                           โ”‚
โ”‚  โ”œโ”€โ”€ Microservices                                           โ”‚
โ”‚  โ”œโ”€โ”€ Database sharding                                       โ”‚
โ”‚  โ”œโ”€โ”€ Multi-region deployment                                  โ”‚
โ”‚  โ””โ”€โ”€ Message queues for reliability                           โ”‚
โ”‚                                                                  โ”‚
โ”‚  Stage 4: 100K-1M+ users                                          โ”‚
โ”‚  โ”œโ”€โ”€ Event-driven architecture                                โ”‚
โ”‚  โ”œโ”€โ”€ Polyglot persistence                                    โ”‚
โ”‚  โ”œโ”€โ”€ Edge computing                                          โ”‚
โ”‚  โ””โ”€โ”€ Advanced optimization                                    โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Stage 1: Foundation (100-1K users)

Monolithic Optimization

# docker-compose.yml for small scale
services:
  app:
    image: myapp:latest
    deploy:
      replicas: 2
      resources:
        limits:
          cpu: 1000m
          memory: 1G
    environment:
      - DATABASE_URL=postgresql://db:5432/app
      - REDIS_URL=redis://cache:6379
    depends_on:
      - db
      - cache

  db:
    image: postgres:15
    volumes:
      - pgdata:/var/lib/postgresql/data
    environment:
      - POSTGRES_DB=app
    command: >
      postgres
      -c shared_buffers=256MB
      -c effective_cache_size=1GB
      -c work_mem=16MB

  cache:
    image: redis:7-alpine
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru

Database Optimization for Stage 1

At this stage, your database is the main bottleneck. Here’s how to optimize it:

1. Connection Pooling

#!/usr/bin/env python3
"""Database connection pooling."""

import psycopg2
from psycopg2 import pool

class ConnectionPool:
    """Manage database connection pool."""
    
    def __init__(self, min_conn=5, max_conn=20):
        self.pool = pool.ThreadedConnectionPool(
            min_conn,
            max_conn,
            host='localhost',
            database='app',
            user='app_user',
            password='password'
        )
    
    def get_connection(self):
        """Get connection from pool."""
        return self.pool.getconn()
    
    def return_connection(self, conn):
        """Return connection to pool."""
        self.pool.putconn(conn)
    
    def close_all(self):
        """Close all connections."""
        self.pool.closeall()

# Usage
pool = ConnectionPool()

def get_db_connection():
    conn = pool.get_connection()
    try:
        yield conn
    finally:
        pool.return_connection(conn)

2. Query Optimization

#!/usr/bin/env python3
"""Query optimization examples."""

import psycopg2

def optimize_queries():
    """Optimize database queries."""
    
    conn = psycopg2.connect(
        host='localhost',
        database='app',
        user='app_user',
        password='password'
    )
    cursor = conn.cursor()
    
    # Bad: SELECT * is slow and fetches unnecessary data
    cursor.execute("SELECT * FROM users WHERE email = %s", ('[email protected]',))
    
    # Good: Select only needed columns
    cursor.execute(
        "SELECT id, name, email, created_at FROM users WHERE email = %s",
        ('[email protected]',)
    )
    
    # Bad: No index on email column
    # Good: Create index
    cursor.execute("CREATE INDEX idx_users_email ON users(email)")
    
    # Bad: N+1 query problem
    users = cursor.execute("SELECT * FROM users").fetchall()
    for user in users:
        orders = cursor.execute(
            "SELECT * FROM orders WHERE user_id = %s", (user['id'],)
        ).fetchall()
    
    # Good: Use JOIN to fetch related data
    cursor.execute("""
        SELECT u.id, u.name, u.email, o.id as order_id, o.amount
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
        WHERE u.email = %s
    """, ('[email protected]',))
    
    conn.commit()
    cursor.close()
    conn.close()

3. Read Replicas

#!/usr/bin/env python3
"""Read replica configuration."""

import psycopg2
from psycopg2 import pool

class ReadReplicaPool:
    """Manage read replica connection pool."""
    
    def __init__(self, replicas):
        self.replicas = replicas
        self.current_replica = 0
        self.pools = {}
        
        for replica in replicas:
            self.pools[replica['name']] = pool.ThreadedConnectionPool(
                5, 20,
                host=replica['host'],
                database=replica['database'],
                user=replica['user'],
                password=replica['password']
            )
    
    def get_replica_connection(self):
        """Get connection from replica (round-robin)."""
        replica = self.replicas[self.current_replica]
        self.current_replica = (self.current_replica + 1) % len(self.replicas)
        
        return self.pools[replica['name']].getconn()
    
    def return_connection(self, conn, replica_name):
        """Return connection to replica pool."""
        self.pools[replica_name].putconn(conn)

Caching Strategy for Stage 1

#!/usr/bin/env python3
"""Caching implementation for small scale."""

import redis
import json
from functools import wraps

class SimpleCache:
    """Simple caching implementation."""
    
    def __init__(self, redis_url='redis://localhost:6379'):
        self.redis = redis.from_url(redis_url)
    
    def cache(self, key_prefix, ttl=300):
        """Cache decorator."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                cache_key = f"{key_prefix}:{str(args)}:{str(sorted(kwargs.items()))}"
                
                # Check cache
                cached = self.redis.get(cache_key)
                if cached:
                    return json.loads(cached)
                
                # Execute function
                result = func(*args, **kwargs)
                
                # Cache result
                self.redis.setex(cache_key, ttl, json.dumps(result))
                
                return result
            return wrapper
        return decorator
    
    def invalidate(self, pattern):
        """Invalidate cache by pattern."""
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)

# Usage
cache = SimpleCache()

@cache.cache('users', ttl=3600)
def get_user(user_id):
    """Get user from database with caching."""
    # Database query
    return db.query("SELECT * FROM users WHERE id = %s", (user_id,))

@cache.cache('products', ttl=1800)
def get_products(category=None):
    """Get products with caching."""
    query = "SELECT * FROM products"
    if category:
        query += " WHERE category = %s"
    return db.query(query, (category,) if category else ())

Monitoring for Stage 1

#!/usr/bin/env python3
"""Basic monitoring for small scale."""

import time
import psutil
from datetime import datetime

class SimpleMonitor:
    """Basic system monitoring."""
    
    def __init__(self):
        self.metrics = []
    
    def record_metrics(self):
        """Record current system metrics."""
        metrics = {
            'timestamp': datetime.utcnow().isoformat(),
            'cpu_percent': psutil.cpu_percent(),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'network_io': psutil.net_io_counters()._asdict()
        }
        
        self.metrics.append(metrics)
        
        return metrics
    
    def check_thresholds(self, cpu_threshold=80, memory_threshold=80):
        """Check if metrics exceed thresholds."""
        current = self.record_metrics()
        
        alerts = []
        
        if current['cpu_percent'] > cpu_threshold:
            alerts.append(f"High CPU: {current['cpu_percent']}%")
        
        if current['memory_percent'] > memory_threshold:
            alerts.append(f"High Memory: {current['memory_percent']}%")
        
        return alerts
    
    def get_history(self, minutes=60):
        """Get metrics history."""
        cutoff = datetime.utcnow().timestamp() - (minutes * 60)
        
        return [
            m for m in self.metrics
            if datetime.fromisoformat(m['timestamp']).timestamp() > cutoff
        ]

Stage 2: Read Scaling (1K-10K)

Read Replicas

#!/usr/bin/env python3
"""Read/write splitting for PostgreSQL."""

import random
from contextlib import contextmanager

class ReadWriteRouter:
    """Route queries to appropriate database."""
    
    def __init__(self, primary_config, replica_configs):
        self.primary = primary_config
        self.replicas = replica_configs
        self.current_replica = 0
    
    def get_replica(self):
        """Get next replica in round-robin."""
        if not self.replicas:
            return self.primary
        
        replica = self.replicas[self.current_replica]
        self.current_replica = (self.current_replica + 1) % len(self.replicas)
        return replica
    
    @contextmanager
    def get_connection(self, read_only=False):
        """Get database connection."""
        config = self.get_replica() if read_only else self.primary
        
        connection = create_connection(config)
        try:
            yield connection
        finally:
            connection.close()

# Usage in SQLAlchemy
class RoutingSession(Session):
    def get_bind(self):
        if self._flushing or self._query_changed():
            return self.primary_bind
        return self.get_replica_bind()

Application-Level Caching

#!/usr/bin/env python3
"""Multi-layer caching implementation."""

import redis
import json
from functools import wraps
import hashlib

class CacheManager:
    """Manage multi-layer caching."""
    
    def __init__(self, redis_client, local_cache_size=1000):
        self.redis = redis_client
        self.local_cache = {}
        self.local_cache_size = local_cache_size
    
    def cache(self, key_prefix, ttl=300):
        """Cache decorator."""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = self._generate_key(key_prefix, args, kwargs)
                
                # Check local cache first
                if cache_key in self.local_cache:
                    return self.local_cache[cache_key]
                
                # Check Redis
                cached = self.redis.get(cache_key)
                if cached:
                    result = json.loads(cached)
                    self.local_cache[cache_key] = result
                    return result
                
                # Execute function
                result = func(*args, **kwargs)
                
                # Store in caches
                self.redis.setex(cache_key, ttl, json.dumps(result))
                self.local_cache[cache_key] = result
                
                # Evict local cache if needed
                if len(self.local_cache) > self.local_cache_size:
                    oldest = next(iter(self.local_cache))
                    del self.local_cache[oldest]
                
                return result
            return wrapper
        return decorator
    
    def invalidate(self, pattern):
        """Invalidate cache by pattern."""
        keys = self.redis.keys(pattern)
        if keys:
            self.redis.delete(*keys)
        
        # Clear local cache matching pattern
        self.local_cache = {
            k: v for k, v in self.local_cache.items()
            if not k.startswith(pattern.replace('*', ''))
        }

Redis Cluster Setup

#!/usr/bin/env python3
"""Redis cluster configuration."""

from redis.cluster import RedisCluster

class RedisClusterManager:
    """Manage Redis cluster."""
    
    def __init__(self, nodes):
        """
        nodes: List of {'host', 'port'} dicts
        """
        self.nodes = nodes
        self.client = RedisCluster(
            startup_nodes=nodes,
            decode_responses=True
        )
    
    def get(self, key):
        """Get value from cluster."""
        return self.client.get(key)
    
    def set(self, key, value, ex=None):
        """Set value in cluster."""
        return self.client.set(key, value, ex=ex)
    
    def delete(self, key):
        """Delete key from cluster."""
        return self.client.delete(key)
    
    def invalidate_pattern(self, pattern):
        """Invalidate all keys matching pattern."""
        for key in self.client.scan_iter(match=pattern):
            self.client.delete(key)

Queue Processing

#!/usr/bin/env python3
"""Queue processing for async tasks."""

import redis
import json
from datetime import datetime

class TaskQueue:
    """Simple task queue using Redis."""
    
    def __init__(self, redis_client, queue_name='tasks'):
        self.redis = redis_client
        self.queue_name = queue_name
    
    def enqueue(self, task_type, payload):
        """Add task to queue."""
        task = {
            'id': str(uuid.uuid4()),
            'type': task_type,
            'payload': payload,
            'created_at': datetime.utcnow().isoformat(),
            'status': 'pending'
        }
        
        self.redis.lpush(f"{self.queue_name}:pending", json.dumps(task))
        
        return task['id']
    
    def dequeue(self):
        """Get task from queue."""
        task_json = self.redis.rpop(f"{self.queue_name}:pending")
        
        if task_json:
            return json.loads(task_json)
        
        return None
    
    def complete(self, task_id, result=None):
        """Mark task as complete."""
        task = self.redis.get(f"{self.queue_name}:active:{task_id}")
        
        if task:
            task = json.loads(task)
            task['status'] = 'completed'
            task['completed_at'] = datetime.utcnow().isoformat()
            task['result'] = result
            
            self.redis.set(
                f"{self.queue_name}:completed:{task_id}",
                json.dumps(task)
            )
            
            self.redis.delete(f"{self.queue_name}:active:{task_id}")
    
    def process_tasks(self, handler, poll_interval=1):
        """Process tasks from queue."""
        while True:
            task = self.dequeue()
            
            if task:
                # Mark as active
                self.redis.setex(
                    f"{self.queue_name}:active:{task['id']}",
                    300,  # 5 minute TTL
                    json.dumps(task)
                )
                
                try:
                    result = handler(task)
                    self.complete(task['id'], result)
                except Exception as e:
                    self.fail(task['id'], str(e))
            
            time.sleep(poll_interval)
    
    def fail(self, task_id, error):
        """Mark task as failed."""
        task = self.redis.get(f"{self.queue_name}:active:{task_id}")
        
        if task:
            task = json.loads(task)
            task['status'] = 'failed'
            task['failed_at'] = datetime.utcnow().isoformat()
            task['error'] = error
            
            self.redis.set(
                f"{self.queue_name}:failed:{task_id}",
                json.dumps(task)
            )
            
            self.redis.delete(f"{self.queue_name}:active:{task_id}")

Health Checks

#!/usr/bin/env python3
"""Health check endpoints."""

import psutil
import redis

class HealthChecker:
    """Check system health."""
    
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def check_database(self):
        """Check database connectivity."""
        try:
            # Try a simple query
            db.execute("SELECT 1")
            return True
        except:
            return False
    
    def check_redis(self):
        """Check Redis connectivity."""
        try:
            self.redis.ping()
            return True
        except:
            return False
    
    def check_disk_space(self, threshold=90):
        """Check disk space."""
        usage = psutil.disk_usage('/')
        return usage.percent < threshold
    
    def check_memory(self, threshold=90):
        """Check memory usage."""
        return psutil.virtual_memory().percent < threshold
    
    def get_health_status(self):
        """Get comprehensive health status."""
        status = {
            'timestamp': datetime.utcnow().isoformat(),
            'database': self.check_database(),
            'redis': self.check_redis(),
            'disk_space': self.check_disk_space(),
            'memory': self.check_memory(),
            'overall': True
        }
        
        status['overall'] = all([
            status['database'],
            status['redis'],
            status['disk_space'],
            status['memory']
        ])
        
        return status

Stage 3: Scale Out (10K-100K)

Database Sharding

#!/usr/bin/env python3
"""Consistent hashing for database sharding."""

import hashlib
from bisect import bisect

class ShardRouter:
    """Route to database shards using consistent hashing."""
    
    def __init__(self, shards):
        self.shards = shards
        self.ring = []
        self.hash_to_shard = {}
        
        # Build hash ring
        for shard in shards:
            for i in range(100):  # Virtual nodes
                key = f"{shard['name']}-{i}"
                hash_value = self._hash(key)
                self.ring.append((hash_value, shard))
                self.hash_to_shard[hash_value] = shard
        
        self.ring.sort(key=lambda x: x[0])
    
    def _hash(self, key):
        """Generate consistent hash."""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def get_shard(self, key):
        """Get shard for key."""
        hash_value = self._hash(key)
        
        # Find first shard with hash >= key hash
        idx = bisect(self.ring, (hash_value,))
        
        if idx >= len(self.ring):
            idx = 0
        
        return self.ring[idx][1]
    
    def add_shard(self, shard):
        """Add new shard."""
        # Re-distribute keys (simplified - real implementation needs migration)
        self.shards.append(shard)
    
    def remove_shard(self, shard_name):
        """Remove shard."""
        self.shards = [s for s in self.shards if s['name'] != shard_name]

Event-Driven Architecture

#!/usr/bin/env python3
"""Event-driven communication."""

from kafka import KafkaProducer, KafkaConsumer
import json

class EventBus:
    """Publish/subscribe event bus."""
    
    def __init__(self, bootstrap_servers):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
        self.consumers = {}
    
    def publish(self, topic, event):
        """Publish event."""
        self.producer.send(topic, value=event)
        self.producer.flush()
    
    def subscribe(self, topic, group_id, handler):
        """Subscribe to events."""
        consumer = KafkaConsumer(
            topic,
            group_id=group_id,
            value_deserializer=lambda v: json.loads(v.decode()),
            auto_offset_reset='latest'
        )
        
        for message in consumer:
            try:
                handler(message.value)
            except Exception as e:
                # Handle error - retry or dead letter queue
                print(f"Error processing: {e}")

# Usage
events = EventBus(['kafka:9092'])

# Publish events
events.publish('user.created', {
    'type': 'user.created',
    'user_id': '123',
    'email': '[email protected]'
})

# Subscribe
def handle_user_created(event):
    send_welcome_email(event['email'])

events.subscribe('user.created', 'email-service', handle_user_created)

Microservices Architecture

#!/usr/bin/env python3
"""Microservices communication."""

import requests
import json
from typing import Dict, Optional

class MicroserviceClient:
    """Client for microservices communication."""
    
    def __init__(self, services: Dict[str, str]):
        """
        services: Dict of service_name -> base_url
        Example: {'users': 'http://users-service:8080', 'orders': 'http://orders-service:8080'}
        """
        self.services = services
        self.timeout = 5  # seconds
    
    def call_service(self, service_name: str, endpoint: str, 
                    method: str = 'GET', data: Optional[Dict] = None) -> Dict:
        """Call another microservice."""
        
        base_url = self.services.get(service_name)
        if not base_url:
            raise ValueError(f"Unknown service: {service_name}")
        
        url = f"{base_url}{endpoint}"
        
        try:
            if method == 'GET':
                response = requests.get(url, timeout=self.timeout)
            elif method == 'POST':
                response = requests.post(url, json=data, timeout=self.timeout)
            elif method == 'PUT':
                response = requests.put(url, json=data, timeout=self.timeout)
            elif method == 'DELETE':
                response = requests.delete(url, timeout=self.timeout)
            else:
                raise ValueError(f"Unsupported method: {method}")
            
            response.raise_for_status()
            return response.json()
        
        except requests.exceptions.Timeout:
            raise ServiceTimeoutError(f"Service {service_name} timed out")
        except requests.exceptions.RequestException as e:
            raise ServiceError(f"Error calling {service_name}: {str(e)}")

# Usage
services = {
    'users': 'http://users-service:8080',
    'orders': 'http://orders-service:8080',
    'payments': 'http://payments-service:8080'
}

client = MicroserviceClient(services)

# Get user from users service
user = client.call_service('users', '/api/v1/users/123')

# Create order in orders service
order = client.call_service('orders', '/api/v1/orders', method='POST', data={
    'user_id': 123,
    'items': [{'product_id': 456, 'quantity': 2}],
    'total': 199.99
})

Service Discovery

#!/usr/bin/env python3
"""Service discovery using Consul."""

import requests
import json

class ServiceDiscovery:
    """Service discovery using Consul."""
    
    def __init__(self, consul_url='http://localhost:8500'):
        self.consul_url = consul_url
    
    def register_service(self, service_name, service_host, service_port, 
                        tags=None, check_interval='10s'):
        """Register service with Consul."""
        
        service_id = f"{service_name}-{service_host}-{service_port}"
        
        payload = {
            'ID': service_id,
            'Name': service_name,
            'Address': service_host,
            'Port': service_port,
            'Tags': tags or [],
            'Check': {
                'HTTP': f"http://{service_host}:{service_port}/health",
                'Interval': check_interval
            }
        }
        
        response = requests.put(
            f"{self.consul_url}/v1/agent/service/register",
            json=payload
        )
        
        return response.status_code == 200
    
    def deregister_service(self, service_id):
        """Deregister service from Consul."""
        
        response = requests.put(
            f"{self.consul_url}/v1/agent/service/deregister/{service_id}"
        )
        
        return response.status_code == 200
    
    def get_service(self, service_name):
        """Get service instances."""
        
        response = requests.get(
            f"{self.consul_url}/v1/health/service/{service_name}"
        )
        
        if response.status_code == 200:
            return response.json()
        
        return []
    
    def get_all_services(self):
        """Get all registered services."""
        
        response = requests.get(
            f"{self.consul_url}/v1/agent/services"
        )
        
        if response.status_code == 200:
            return response.json()
        
        return {}

Circuit Breaker Pattern

#!/usr/bin/env python3
"""Circuit breaker implementation."""

import time
from enum import Enum
from threading import Lock

class CircuitState(Enum):
    CLOSED = 'closed'  # Normal operation
    OPEN = 'open'      # Circuit is open, requests fail fast
    HALF_OPEN = 'half_open'  # Testing if service recovered

class CircuitBreaker:
    """Circuit breaker for service calls."""
    
    def __init__(self, failure_threshold=5, recovery_timeout=30,
                 half_open_max_calls=3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
        self._half_open_calls = 0
        self._lock = Lock()
    
    @property
    def state(self):
        """Get current circuit state."""
        with self._lock:
            if self._state == CircuitState.OPEN:
                # Check if recovery timeout has passed
                if self._last_failure_time and \
                   time.time() - self._last_failure_time > self.recovery_timeout:
                    self._state = CircuitState.HALF_OPEN
                    self._half_open_calls = 0
            
            return self._state
    
    def call(self, func, *args, **kwargs):
        """Call function with circuit breaker protection."""
        
        if self.state == CircuitState.OPEN:
            raise CircuitOpenError("Circuit is open")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        """Handle successful call."""
        with self._lock:
            self._failure_count = 0
            self._state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call."""
        with self._lock:
            self._failure_count += 1
            self._last_failure_time = time.time()
            
            if self._failure_count >= self.failure_threshold:
                self._state = CircuitState.OPEN
            elif self._state == CircuitState.HALF_OPEN:
                self._half_open_calls += 1
                
                if self._half_open_calls >= self.half_open_max_calls:
                    self._state = CircuitState.CLOSED
                    self._failure_count = 0
                    self._half_open_calls = 0

# Usage
circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=60)

def call_external_service():
    # Your service call logic
    pass

try:
    result = circuit.call(call_external_service)
except CircuitOpenError:
    # Return cached data or fallback
    pass

Load Balancing

#!/usr/bin/env python3
"""Load balancing strategies."""

import random
from collections import defaultdict

class LoadBalancer:
    """Load balancer with multiple strategies."""
    
    def __init__(self, servers):
        self.servers = servers
        self.request_count = defaultdict(int)
    
    def select_server_round_robin(self):
        """Select server using round-robin."""
        server = self.servers[self.request_count['round_robin'] % len(self.servers)]
        self.request_count['round_robin'] += 1
        return server
    
    def select_server_random(self):
        """Select server randomly."""
        return random.choice(self.servers)
    
    def select_server_least_connections(self):
        """Select server with least active connections."""
        return min(self.servers, key=lambda s: s.get('active_connections', 0))
    
    def select_server_weighted(self):
        """Select server based on weights."""
        total_weight = sum(s.get('weight', 1) for s in self.servers)
        random_weight = random.uniform(0, total_weight)
        
        cumulative = 0
        for server in self.servers:
            cumulative += server.get('weight', 1)
            if random_weight <= cumulative:
                return server
        
        return self.servers[-1]

# Usage
servers = [
    {'host': 'server1.example.com', 'port': 8080, 'weight': 2},
    {'host': 'server2.example.com', 'port': 8080, 'weight': 1},
    {'host': 'server3.example.com', 'port': 8080, 'weight': 1}
]

lb = LoadBalancer(servers)

# Use different strategies
server = lb.select_server_round_robin()
server = lb.select_server_random()
server = lb.select_server_least_connections()
server = lb.select_server_weighted()

Stage 4: Hyperscale (100K-1M+)

Multi-Region Architecture

# Terraform multi-region
provider "aws" {
  alias  = "primary"
  region = "us-east-1"
}

provider "aws" {
  alias  = "secondary"
  region = "us-west-2"
}

# Primary region resources
resource "aws_db_instance" "primary" {
  provider = aws.primary
  identifier = "app-primary"
  
  multi_az = true
  engine = "postgres"
  instance_class = "db.r6g.xlarge"
  
  backup_retention_period = 7
}

# Read replica in secondary region
resource "aws_db_instance" "replica" {
  provider = aws.secondary
  identifier = "app-replica"
  
  source_db_instance_identifier = aws_db_instance.primary.identifier
  instance_class = "db.r6g.xlarge"
}

# Global Accelerator for DNS failover
resource "aws_globalaccelerator_accelerator" "main" {
  name            = "app-accelerator"
  enabled         = true
}

Performance Optimization

#!/usr/bin/env python3
"""Performance monitoring and auto-scaling."""

import boto3
import time

class AutoScaler:
    """Auto-scale based on metrics."""
    
    def __init__(self, config):
        self.asg = boto3.client('autoscaling')
        self.cloudwatch = boto3.client('cloudwatch')
        self.config = config
    
    def check_and_scale(self):
        """Check metrics and scale if needed."""
        metrics = self.get_metrics()
        
        if metrics['cpu_utilization'] > self.config['scale_up_threshold']:
            self.scale_up()
        elif metrics['cpu_utilization'] < self.config['scale_down_threshold']:
            self.scale_down()
    
    def scale_up(self):
        """Scale out."""
        self.asg.execute_policy(
            AutoScalingGroupName=self.config['asg_name'],
            PolicyName='scale-out',
            HonorCooldown=True
        )
    
    def get_metrics(self):
        """Get current metrics."""
        response = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/EC2',
            MetricName='CPUUtilization',
            Dimensions=[
                {'Name': 'AutoScalingGroupName', 'Value': self.config['asg_name']}
            ],
            StartTime=time.time() - 300,
            EndTime=time.time(),
            Period=60,
            Statistics=['Average']
        )
        
        return {
            'cpu_utilization': response['Datapoints'][0]['Average'] if response['Datapoints'] else 0
        }

Edge Computing

#!/usr/bin/env python3
"""Edge computing with Cloudflare Workers."""

# Edge function for caching and routing
async function handleRequest(request) {
    const url = new URL(request.url);
    
    // Check cache first
    let response = await cache.match(request);
    
    if (!response) {
        // Cache miss - fetch from origin
        response = await fetch(request);
        
        // Cache successful responses
        if (response.ok) {
            response = new Response(response.body, response);
            response.headers.set('Cache-Control', 's-maxage=3600');
            event.waitUntil(cache.put(request, response.clone()));
        }
    }
    
    return response;
}

// Add to every response
async function addSecurityHeaders(request, response) {
    const headers = {
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'X-Content-Type-Options': 'nosniff',
        'X-Frame-Options': 'DENY',
        'Content-Security-Policy': "default-src 'self'",
        'X-XSS-Protection': '1; mode=block'
    };
    
    const newResponse = new Response(response.body, response);
    for (const [key, value] of Object.entries(headers)) {
        newResponse.headers.set(key, value);
    }
    
    return newResponse;
}

Database Optimization for Hyperscale

#!/usr/bin/env python3
"""Advanced database optimization."""

import psycopg2
from psycopg2.extras import RealDictCursor

class HyperscaleDatabase:
    """Database optimized for hyperscale."""
    
    def __init__(self, connection_string):
        self.conn = psycopg2.connect(connection_string)
        self.conn.autocommit = True
    
    def execute_with_retry(self, query, params=None, max_retries=3):
        """Execute query with retry logic."""
        
        for attempt in range(max_retries):
            try:
                cursor = self.conn.cursor(cursor_factory=RealDictCursor)
                cursor.execute(query, params)
                
                if query.strip().upper().startswith('SELECT'):
                    result = cursor.fetchall()
                else:
                    self.conn.commit()
                    result = cursor.rowcount
                
                cursor.close()
                return result
            
            except psycopg2.OperationalError as e:
                if attempt == max_retries - 1:
                    raise
                # Wait and retry
                time.sleep(2 ** attempt)
    
    def batch_insert(self, table, data, batch_size=1000):
        """Insert data in batches."""
        
        if not data:
            return
        
        columns = list(data[0].keys())
        placeholders = ', '.join(['%s'] * len(columns))
        column_names = ', '.join(columns)
        
        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            values = [tuple(row[col] for col in columns) for row in batch]
            
            query = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})"
            
            cursor = self.conn.cursor()
            cursor.executemany(query, values)
            self.conn.commit()
            cursor.close()
    
    def get_connection_pool(self):
        """Get connection pool for high concurrency."""
        
        from psycopg2 import pool
        
        return pool.ThreadedConnectionPool(
            minconn=10,
            maxconn=100,
            dsn=self.conn.dsn
        )
    
    def enable_connection_caching(self):
        """Enable connection caching."""
        
        # Use pgBouncer for connection pooling
        # Configure pgBouncer to pool connections
        pass
    
    def implement_read_replica_routing(self):
        """Route read queries to replicas."""
        
        # Use pgBouncer with different pools for read/write
        # Or use SQLAlchemy with read/write splitting
        pass

CDN Optimization

#!/usr/bin/env python3
"""CDN optimization strategies."""

import boto3
import os

class CDNManager:
    """Manage CDN for static assets."""
    
    def __init__(self, cloudfront_distribution_id):
        self.cloudfront = boto3.client('cloudfront')
        self.distribution_id = cloudfront_distribution_id
    
    def invalidate_cache(self, paths):
        """Invalidate CDN cache."""
        
        response = self.cloudfront.create_invalidation(
            DistributionId=self.distribution_id,
            InvalidationBatch={
                'Paths': {
                    'Quantity': len(paths),
                    'Items': paths
                },
                'CallerReference': str(time.time())
            }
        )
        
        return response['Invalidation']['Id']
    
    def optimize_static_assets(self, directory):
        """Optimize static assets for CDN."""
        
        # Minify CSS/JS
        # Compress images
        # Set proper cache headers
        
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith(('.css', '.js')):
                    self.minify_file(os.path.join(root, file))
                elif file.endswith(('.png', '.jpg', '.jpeg', '.gif')):
                    self.compress_image(os.path.join(root, file))
    
    def minify_file(self, filepath):
        """Minify CSS/JS file."""
        
        with open(filepath, 'r') as f:
            content = f.read()
        
        # Remove whitespace, comments, etc.
        minified = self._minify(content)
        
        with open(filepath, 'w') as f:
            f.write(minified)
    
    def compress_image(self, filepath):
        """Compress image file."""
        
        # Use PIL or similar to compress
        pass
    
    def set_cache_headers(self, filepath, max_age=31536000):
        """Set cache headers for file."""
        
        # Configure S3 bucket with proper cache headers
        # Or configure CDN to cache for specified duration
        pass

Monitoring and Observability

#!/usr/bin/env python3
"""Advanced monitoring and observability."""

import time
import json
from datetime import datetime
from collections import defaultdict

class ObservabilitySystem:
    """Comprehensive observability system."""
    
    def __init__(self):
        self.metrics = defaultdict(list)
        self.traces = []
        self.logs = []
    
    def record_metric(self, name, value, tags=None):
        """Record a metric."""
        
        metric = {
            'name': name,
            'value': value,
            'timestamp': datetime.utcnow().isoformat(),
            'tags': tags or {}
        }
        
        self.metrics[name].append(metric)
    
    def start_trace(self, name, context=None):
        """Start a new trace."""
        
        return {
            'trace_id': str(uuid.uuid4()),
            'name': name,
            'context': context or {},
            'start_time': time.time(),
            'spans': []
        }
    
    def add_span(self, trace, name, duration_ms, tags=None):
        """Add a span to trace."""
        
        span = {
            'name': name,
            'duration_ms': duration_ms,
            'start_time': time.time() - (duration_ms / 1000),
            'tags': tags or {}
        }
        
        trace['spans'].append(span)
        
        return span
    
    def end_trace(self, trace, error=None):
        """End a trace."""
        
        trace['end_time'] = time.time()
        trace['total_duration_ms'] = (trace['end_time'] - trace['start_time']) * 1000
        trace['error'] = error
        
        self.traces.append(trace)
    
    def log(self, level, message, context=None):
        """Log a message."""
        
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': level,
            'message': message,
            'context': context or {}
        }
        
        self.logs.append(log_entry)
    
    def get_alerts(self):
        """Get current alerts."""
        
        alerts = []
        
        # Check for high error rates
        if self._error_rate() > 0.05:
            alerts.append({
                'type': 'high_error_rate',
                'severity': 'critical',
                'message': f"Error rate is {self._error_rate() * 100:.1f}%"
            })
        
        # Check for slow response times
        if self._avg_response_time() > 1000:
            alerts.append({
                'type': 'slow_response',
                'severity': 'warning',
                'message': f"Average response time is {self._avg_response_time()}ms"
            })
        
        return alerts
    
    def _error_rate(self):
        """Calculate error rate."""
        
        total_requests = len(self.metrics.get('requests', []))
        error_requests = len([
            m for m in self.metrics.get('requests', [])
            if m.get('tags', {}).get('status', '').startswith('5')
        ])
        
        return error_requests / total_requests if total_requests > 0 else 0
    
    def _avg_response_time(self):
        """Calculate average response time."""
        
        response_times = [
            m['value'] for m in self.metrics.get('response_time', [])
        ]
        
        return sum(response_times) / len(response_times) if response_times else 0

Auto-Scaling Configuration

# Kubernetes Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: app-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: app
  minReplicas: 2
  maxReplicas: 50
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80
    - type: Pods
      pods:
        metric:
          name: http_requests_per_second
        target:
          type: AverageValue
          averageValue: "100"

External Resources



Conclusion

Scaling a SaaS from 100 to 1 million users is a journey of deliberate architectural decisions. Each stage requires different strategies, tools, and trade-offs. The key is to start simple, monitor your metrics, and scale when you hit bottlenecksโ€”not before.

Key takeaways:

  1. Start simple: Monolithic architecture works great for 100-1K users
  2. Monitor everything: You can’t scale what you don’t measure
  3. Scale horizontally: Add servers, not bigger servers
  4. Cache aggressively: Reduce database load with smart caching
  5. Decouple with events: Use event-driven architecture for scalability
  6. Plan for failure: Design for redundancy and failover
  7. Optimize databases: Indexes, read replicas, and sharding are essential
  8. Use CDNs: Offload static assets to edge networks

The most successful SaaS companies don’t try to predict their scaling needsโ€”they build systems that can adapt as they grow. Start with what works, measure your performance, and scale strategically.

Comments