Skip to main content
โšก Calmops

Database Migration Strategies: Complete Guide for 2026

Introduction

Database migrations are among the most challenging operations in software engineering. Whether you’re migrating from one database system to another, upgrading to a new version, or refactoring your schema, the complexity and risk are substantial. A poorly executed migration can result in data loss, service outages, and significant business impact.

This comprehensive guide covers everything you need to know about database migrations - from planning and strategy selection to implementation and validation. You’ll learn about different migration approaches, practical tools, and battle-tested patterns that minimize risk.

Modern applications often need to migrate databases for various reasons: scaling requirements, cost optimization, feature access, or legacy system modernization. Understanding the trade-offs between different approaches helps you choose the right strategy for your specific situation.

Migration Strategy Overview

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Database Migration Strategies                           โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                       โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚              Migration Approaches                              โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                       โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                     โ”‚
โ”‚   โ”‚   Big Bang    โ”‚         โ”‚ Incremental   โ”‚                     โ”‚
โ”‚   โ”‚   Migration   โ”‚         โ”‚ (Strangler)   โ”‚                     โ”‚
โ”‚   โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค         โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค                     โ”‚
โ”‚   โ”‚ โ€ข All at once โ”‚         โ”‚ โ€ข Piece by    โ”‚                     โ”‚
โ”‚   โ”‚ โ€ข Short       โ”‚         โ”‚   piece       โ”‚                     โ”‚
โ”‚   โ”‚   downtime    โ”‚         โ”‚ โ€ข Longer      โ”‚                     โ”‚
โ”‚   โ”‚ โ€ข High risk   โ”‚         โ”‚   timeline    โ”‚                     โ”‚
โ”‚   โ”‚ โ€ข Small DBs  โ”‚         โ”‚ โ€ข Lower risk  โ”‚                     โ”‚
โ”‚   โ”‚               โ”‚         โ”‚ โ€ข Large DBs   โ”‚                     โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                     โ”‚
โ”‚                                                                       โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚              Hybrid Approaches                                 โ”‚   โ”‚
โ”‚   โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”        โ”‚   โ”‚
โ”‚   โ”‚  โ”‚Blue-Green  โ”‚  โ”‚  Canary    โ”‚  โ”‚   Shadow   โ”‚        โ”‚   โ”‚
โ”‚   โ”‚  โ”‚Deployment  โ”‚  โ”‚  Release   โ”‚  โ”‚  Database  โ”‚        โ”‚   โ”‚
โ”‚   โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Strategy Comparison

Strategy Downtime Risk Complexity Best For
Big Bang Hours High Low Small databases
Incremental Minutes Medium Medium Large databases
Blue-Green Minutes Low High Mission critical
Canary None Very Low Very High Large scale
Shadow None Very Low Very High Validation

Zero-Downtime Migration Patterns

The Strangler Fig Pattern

The Strangler Fig pattern gradually replaces a system by running both old and new systems in parallel:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Strangler Fig Migration                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                       โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚                    Migration Router                           โ”‚   โ”‚
โ”‚   โ”‚              (Routes traffic between systems)                 โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                              โ”‚                                        โ”‚
โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                      โ”‚
โ”‚              โ–ผ                               โ–ผ                      โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”           โ”‚
โ”‚   โ”‚    Legacy System     โ”‚     โ”‚    New Database      โ”‚           โ”‚
โ”‚   โ”‚                       โ”‚     โ”‚                       โ”‚           โ”‚
โ”‚   โ”‚ โ€ข Original schema    โ”‚     โ”‚ โ€ข New schema         โ”‚           โ”‚
โ”‚   โ”‚ โ€ข Old DB connection โ”‚     โ”‚ โ€ข New DB connection  โ”‚           โ”‚
โ”‚   โ”‚ โ€ข Full functionalityโ”‚     โ”‚ โ€ข Migrated data     โ”‚           โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜           โ”‚
โ”‚              โ”‚                               โ”‚                      โ”‚
โ”‚              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                      โ”‚
โ”‚                              โ–ผ                                       โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚                    Data Synchronizer                         โ”‚   โ”‚
โ”‚   โ”‚         (Keeps new database in sync with legacy)             โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                       โ”‚
โ”‚   Phase 1: Read to both systems, write to legacy                    โ”‚
โ”‚   Phase 2: Read/write to both, validate                             โ”‚
โ”‚   Phase 3: Write to new, read from new                             โ”‚
โ”‚   Phase 4: Remove legacy                                            โ”‚
โ”‚                                                                       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Implementation

from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
from enum import Enum
import threading
import time


class MigrationPhase(Enum):
    """Migration phases in strangler pattern."""
    INITIAL = "initial"
    DUAL_WRITE = "dual_write"
    DUAL_READ = "dual_read"
    SWITCHOVER = "switchover"
    COMPLETE = "complete"


@dataclass
class MigrationConfig:
    """Configuration for database migration."""
    source_db: str
    target_db: str
    batch_size: int = 1000
    validate_percentage: float = 1.0
    error_threshold: float = 0.01
    retry_attempts: int = 3


class ZeroDowntimeMigrator:
    """Implements zero-downtime database migration."""
    
    def __init__(self, config: MigrationConfig):
        self.config = config
        self.phase = MigrationPhase.INITIAL
        self.errors = []
        self.stats = {
            'records_migrated': 0,
            'records_validated': 0,
            'records_failed': 0
        }
        self._lock = threading.Lock()
    
    def begin_dual_write(self):
        """Start dual write phase."""
        self.phase = MigrationPhase.DUAL_WRITE
        print("Phase: Dual Write - Writing to both databases")
        
        # Enable dual write in application
        # All inserts/updates go to both databases
    
    def begin_dual_read(self):
        """Start dual read phase."""
        self.phase = MigrationPhase.DUAL_READ
        print("Phase: Dual Read - Reading from both databases")
        
        # Enable reads from both databases
        # Compare results and log discrepancies
    
    def begin_switchover(self):
        """Start switchover phase."""
        self.phase = MigrationPhase.SWITCHOVER
        print("Phase: Switchover - Switching to new database")
        
        # Route reads to new database
        # Keep writes to both for rollback capability
    
    def complete_migration(self):
        """Complete migration."""
        self.phase = MigrationPhase.COMPLETE
        print("Phase: Complete - Migration finished")
        
        # Remove dual write capability
    
    def migrate_data(self, table: str, transform: Callable = None):
        """Migrate data from source to target."""
        
        offset = 0
        while True:
            # Read batch from source
            rows = self.read_batch(table, offset, self.config.batch_size)
            
            if not rows:
                break
            
            # Transform if needed
            if transform:
                rows = [transform(row) for row in rows]
            
            # Write to target
            try:
                self.write_batch(table, rows)
                offset += len(rows)
                self.stats['records_migrated'] += len(rows)
                
                print(f"Migrated {offset} records from {table}")
                
            except Exception as e:
                self.handle_error(e, rows)
            
            # Small delay to reduce load
            time.sleep(0.1)
    
    def validate_migration(self, table: str) -> Dict:
        """Validate migrated data."""
        
        source_count = self.count_records(table, 'source')
        target_count = self.count_records(table, 'target')
        
        validation = {
            'table': table,
            'source_count': source_count,
            'target_count': target_count,
            'match': source_count == target_count,
            'difference': abs(source_count - target_count)
        }
        
        if validation['match']:
            self.stats['records_validated'] += target_count
        else:
            self.errors.append(f"Count mismatch in {table}: {validation}")
        
        return validation
    
    def read_batch(self, table: str, offset: int, limit: int) -> List[Dict]:
        """Read batch from database."""
        # Implementation depends on database
        pass
    
    def write_batch(self, table: str, rows: List[Dict]):
        """Write batch to database."""
        # Implementation depends on database
        pass
    
    def count_records(self, table: str, db: str) -> int:
        """Count records in table."""
        pass
    
    def handle_error(self, error: Exception, rows: List[Dict]):
        """Handle migration error."""
        with self._lock:
            self.stats['records_failed'] += len(rows)
            self.errors.append({
                'error': str(error),
                'rows': len(rows),
                'timestamp': time.time()
            })
            
            error_rate = self.stats['records_failed'] / self.stats['records_migrated']
            
            if error_rate > self.config.error_threshold:
                raise Exception(f"Error threshold exceeded: {error_rate:.2%}")

Schema Migration

Using Alembic

# alembic/env.py
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
import logging

# Import your models
from myapp.models import Base
from myapp import models

# Alembic Config object
config = context.config

# Setup logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode."""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Migration Examples

# migrations/versions/001_add_users.py
"""Add users table

Revision ID: 001
Revises: 
Create Date: 2026-03-13 10:00:00

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers
revision = '001'
down_revision = None
branch_labels = None
depends_on = None


def upgrade() -> None:
    # Create users table
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('name', sa.String(100), nullable=True),
        sa.Column('password_hash', sa.String(255), nullable=False),
        sa.Column('is_active', sa.Boolean(), default=True),
        sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime(), server_default=sa.func.now(), onupdate=sa.func.now()),
        sa.PrimaryKeyConstraint('id')
    )
    
    # Add indexes
    op.create_index('ix_users_email', 'users', ['email'], unique=True)
    op.create_index('ix_users_created_at', 'users', ['created_at'])
    
    # Add constraints
    op.create_check_constraint(
        'ck_users_email_valid',
        'users',
        sa.text("email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'")
    )


def downgrade() -> None:
    op.drop_index('ix_users_created_at', 'users')
    op.drop_index('ix_users_email', 'users')
    op.drop_table('users')

Advanced Schema Changes

# Safe migration patterns

def upgrade():
    """Safe patterns for schema changes."""
    
    # 1. Adding columns (safe - always)
    op.add_column('users', sa.Column('phone', sa.String(20)))
    
    # 2. Adding NOT NULL column (need default)
    op.add_column('users', sa.Column('status', sa.String(20), server_default='active'))
    op.alter_column('users', 'status', nullable=False)
    
    # 3. Renaming tables (need care)
    op.rename_table('user', 'users')
    
    # 4. Adding foreign key (need index first)
    # First create index (if not exists)
    op.create_index('ix_orders_user_id', 'orders', ['user_id'])
    
    # Then add foreign key
    op.create_foreign_key(
        'fk_orders_user_id',
        'orders', 'users',
        ['user_id'], ['id']
    )
    
    # 5. Changing column type (expensive - use batch)
    # For PostgreSQL, use USING clause
    op.alter_column('users', 'email',
                    existing_type=sa.String(100),
                    type_=sa.String(255),
                    postgresql_using='email::character varying(255)')
    
    # 6. Dropping columns (need care - backup first)
    # Add new column, sync data, then drop
    op.add_column('users', sa.Column('new_field', sa.String(100)))
    op.execute("UPDATE users SET new_field = old_field")
    op.alter_column('users', 'new_field', nullable=False)
    op.drop_column('users', 'old_field')


def add_column_with_backfill():
    """Add column with data backfill."""
    
    # Step 1: Add nullable column
    op.add_column('orders', sa.Column('customer_name', sa.String(255)))
    
    # Step 2: Backfill data
    op.execute("""
        UPDATE orders 
        SET customer_name = customers.name
        FROM customers
        WHERE orders.customer_id = customers.id
    """)
    
    # Step 3: Add NOT NULL constraint
    op.alter_column('orders', 'customer_name', nullable=False)

Data Migration

ETL Pipeline

import csv
import io
from typing import Iterator, Dict, Any
import logging


class DataMigration:
    """Data migration from source to target database."""
    
    def __init__(self, source_config: Dict, target_config: Dict):
        self.source_config = source_config
        self.target_config = target_config
        self.logger = logging.getLogger(__name__)
        self.stats = {'inserted': 0, 'updated': 0, 'skipped': 0, 'errors': 0}
    
    def migrate_table(
        self,
        table_name: str,
        transform: bool = True,
        batch_size: int = 1000
    ):
        """Migrate entire table."""
        
        self.logger.info(f"Starting migration for table: {table_name}")
        
        offset = 0
        while True:
            # Read batch
            rows = self.read_batch(table_name, offset, batch_size)
            
            if not rows:
                break
            
            # Transform if needed
            if transform:
                rows = [self.transform_row(row) for row in rows]
            
            # Write batch
            try:
                self.write_batch(table_name, rows)
                offset += len(rows)
                self.stats['inserted'] += len(rows)
                
                self.logger.info(f"Migrated {offset} rows from {table_name}")
                
            except Exception as e:
                self.logger.error(f"Error writing batch: {e}")
                self.stats['errors'] += len(rows)
            
            # Progress reporting
            if offset % 10000 == 0:
                self.log_progress(table_name, offset)
        
        return self.stats
    
    def read_batch(self, table: str, offset: int, limit: int) -> list:
        """Read batch from source database."""
        # Implementation depends on database
        pass
    
    def write_batch(self, table: str, rows: list):
        """Write batch to target database."""
        # Implementation depends on database
        pass
    
    def transform_row(self, row: Dict) -> Dict:
        """Transform row for target schema."""
        # Apply transformations
        transformed = {
            'id': row['id'],
            'email': row['email'].lower(),
            'name': row['name'].strip(),
            'created_at': row['created_date'],
            'updated_at': row.get('modified_date', row['created_date'])
        }
        
        return transformed
    
    def validate_migration(self, table_name: str) -> Dict:
        """Validate migrated data."""
        
        source_count = self.get_count(table_name, 'source')
        target_count = self.get_count(table_name, 'target')
        
        return {
            'table': table_name,
            'source_count': source_count,
            'target_count': target_count,
            'match': source_count == target_count
        }

Parallel Migration

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading


class ParallelMigrator:
    """Parallel data migration for large datasets."""
    
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self.lock = threading.Lock()
    
    def migrate_parallel(
        self,
        table: str,
        partition_column: str,
        partitions: int
    ):
        """Migrate data in parallel using partitions."""
        
        futures = []
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit partition jobs
            for partition_id in range(partitions):
                future = executor.submit(
                    self.migrate_partition,
                    table,
                    partition_column,
                    partition_id,
                    partitions
                )
                futures.append(future)
            
            # Wait for completion
            results = []
            for future in as_completed(futures):
                result = future.result()
                results.append(result)
        
        return results
    
    def migrate_partition(self, table: str, column: str, partition_id: int, total: int) -> Dict:
        """Migrate a single partition."""
        
        # Calculate partition range
        min_id = self.get_partition_min(column, partition_id, total)
        max_id = self.get_partition_max(column, partition_id, total)
        
        # Migrate rows in range
        offset = 0
        batch_size = 1000
        migrated = 0
        
        while True:
            rows = self.read_range(table, column, min_id, max_id, offset, batch_size)
            
            if not rows:
                break
            
            self.write_batch(table, rows)
            migrated += len(rows)
            offset += batch_size
        
        return {
            'partition': partition_id,
            'migrated': migrated
        }

Migration Best Practices

Practice Implementation
Plan thoroughly Document every step
Test in staging Mirror production setup
Use transactions Ensure atomicity
Monitor progress Track every metric
Have rollback plan Test rollback procedures
Communicate Keep stakeholders informed
Do dry runs Practice before production
Use checksums Verify data integrity

Tools and Resources

Category Tools
Schema Migration Alembic, Flyway, Liquibase
Data Migration AWS DMS, Azure DM, Google Database Migration
ETL Apache Airflow, dbt, Singer
Validation Great Expectations, dbt tests
Monitoring PMM, Datadog, New Relic

Conclusion

Database migrations are complex but manageable with proper planning and execution. The key is choosing the right strategy for your specific situation and following proven patterns.

Key takeaways:

  1. Choose the right strategy - Big Bang for small DBs, incremental for large
  2. Minimize downtime - Use blue-green or canary releases
  3. Validate continuously - Check data integrity at every step
  4. Plan for rollback - Always have a way to revert
  5. Test thoroughly - Practice in staging before production
  6. Monitor everything - Track progress and errors
  7. Communicate - Keep everyone informed

By following these patterns and practices, you’ll execute successful database migrations with minimal risk.

Resources

Comments