Skip to main content
โšก Calmops

Secrets Management and Rotation: Vault, AWS Secrets Manager, Azure Key Vault

Secrets Management and Rotation: Vault, AWS Secrets Manager, Azure Key Vault

Secrets (passwords, API keys, certificates) need protection and rotation. This guide covers managing secrets across cloud platforms with automated rotation.


HashiCorp Vault Setup

Vault Initialization and Unsealing

import hvac
from typing import Dict, List

class VaultManager:
    """Manage secrets with HashiCorp Vault"""
    
    def __init__(self, vault_addr: str = "http://localhost:8200"):
        self.client = hvac.Client(url=vault_addr)
    
    def initialize_vault(self, shares: int = 5, threshold: int = 3) -> Dict:
        """Initialize Vault (one-time operation)"""
        
        result = self.client.sys.initialize(
            secret_shares=shares,
            secret_threshold=threshold
        )
        
        return {
            'keys': result['keys'],
            'root_token': result['root_token'],
            'keys_base64': result['keys_base64']
        }
    
    def unseal_vault(self, keys: List[str]):
        """Unseal Vault with key shares"""
        
        for key in keys:
            self.client.sys.submit_unseal_key(key)
    
    def enable_secret_engine(self, engine_type: str = "kv", path: str = "secret"):
        """Enable secret engine"""
        
        self.client.sys.enable_secrets_engine(
            backend_type=engine_type,
            path=path,
            options={'version': 2}
        )
    
    def store_secret(self, path: str, secret_data: Dict):
        """Store secret in Vault"""
        
        response = self.client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=secret_data
        )
        
        return response
    
    def retrieve_secret(self, path: str) -> Dict:
        """Retrieve secret from Vault"""
        
        response = self.client.secrets.kv.v2.read_secret_version(path=path)
        
        return response['data']['data']
    
    def list_secrets(self, path: str = ""):
        """List all secrets at path"""
        
        response = self.client.secrets.kv.v2.list_secrets(path=path)
        
        return response['data']['keys']

# Usage
vault = VaultManager()

# Store database credentials
vault.store_secret('database/prod', {
    'username': 'admin',
    'password': 'super_secure_password',
    'host': 'db.example.com',
    'port': 5432
})

# Retrieve when needed
db_creds = vault.retrieve_secret('database/prod')

Dynamic Database Credentials

class VaultDynamicSecrets:
    """Generate dynamic secrets that self-destruct"""
    
    def __init__(self, vault_client):
        self.client = vault_client
    
    def enable_database_engine(self):
        """Enable database secret engine"""
        
        self.client.sys.enable_secrets_engine(
            backend_type='database',
            path='database'
        )
    
    def configure_postgres_connection(self):
        """Configure PostgreSQL connection"""
        
        self.client.write(
            path='database/config/postgres',
            connection_url='postgresql://{{username}}:{{password}}@postgres.example.com:5432/mydb',
            allowed_roles=['readonly', 'readwrite'],
            username='vault_admin',
            password='admin_password'
        )
    
    def create_database_role(self, role_name: str, ttl: str = '1h'):
        """Create database role with automatic expiry"""
        
        # Read-only role
        self.client.write(
            path=f'database/roles/{role_name}',
            db_name='postgres',
            creation_statements=[
                "CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'",
                "GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\""
            ],
            default_ttl=ttl,
            max_ttl='24h'
        )
    
    def get_dynamic_credentials(self, role_name: str) -> Dict:
        """Get temporary database credentials"""
        
        response = self.client.read(f'database/creds/{role_name}')
        
        return {
            'username': response['data']['username'],
            'password': response['data']['password'],
            'ttl': response['lease_duration']  # Seconds
        }

# Usage
dynamic_secrets = VaultDynamicSecrets(vault.client)

# Create role
dynamic_secrets.create_database_role('app_readonly', ttl='1h')

# Get temporary credentials (valid for 1 hour)
creds = dynamic_secrets.get_dynamic_credentials('app_readonly')

# Use credentials...
# Vault automatically revokes after TTL

AWS Secrets Manager

Store and Rotate Secrets

import boto3
import json
from datetime import datetime

class AWSSecretsManager:
    """Manage secrets in AWS Secrets Manager"""
    
    def __init__(self):
        self.client = boto3.client('secretsmanager')
    
    def create_secret(self, name: str, secret_value: dict, tags: dict = None) -> str:
        """Create new secret"""
        
        response = self.client.create_secret(
            Name=name,
            Description='Application secret',
            SecretString=json.dumps(secret_value),
            Tags=[{'Key': k, 'Value': v} for k, v in (tags or {}).items()],
            AddReplicaRegions=[
                {'Region': 'us-west-2'},
                {'Region': 'eu-west-1'}
            ]
        )
        
        return response['ARN']
    
    def get_secret(self, secret_id: str) -> dict:
        """Retrieve secret value"""
        
        response = self.client.get_secret_value(SecretId=secret_id)
        
        return json.loads(response['SecretString'])
    
    def rotate_secret(self, secret_id: str, lambda_function_arn: str):
        """Enable automatic secret rotation"""
        
        response = self.client.rotate_secret(
            SecretId=secret_id,
            RotationLambdaARN=lambda_function_arn,
            RotationRules={
                'AutomaticallyAfterDays': 30,
                'Duration': '3h',
                'ScheduleExpression': 'rate(30 days)'
            }
        )
        
        return response
    
    def list_secrets(self) -> List[dict]:
        """List all secrets"""
        
        response = self.client.list_secrets()
        
        return response['SecretList']
    
    def delete_secret(self, secret_id: str, recovery_window: int = 7) -> str:
        """Schedule secret deletion"""
        
        response = self.client.delete_secret(
            SecretId=secret_id,
            RecoveryWindowInDays=recovery_window
        )
        
        return response['DeletionDate']

# Rotation Lambda function
def lambda_rotate_secret(event, context):
    """AWS Lambda for secret rotation"""
    
    service_client = boto3.client('secretsmanager')
    secret_id = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']
    
    # Get current secret
    current_secret = service_client.get_secret_value(
        SecretId=secret_id,
        VersionId=token
    )
    
    secret_dict = json.loads(current_secret['SecretString'])
    
    if step == "create":
        # Generate new secret
        new_password = generate_secure_password()
        secret_dict['password'] = new_password
        
        # Store with new version
        service_client.put_secret_value(
            SecretId=secret_id,
            ClientRequestToken=token,
            SecretString=json.dumps(secret_dict),
            VersionStages=['AWSPENDING']
        )
    
    elif step == "set":
        # Update actual resource
        update_database_password(
            username=secret_dict['username'],
            password=secret_dict['password']
        )
    
    elif step == "test":
        # Test new credentials
        test_database_connection(
            username=secret_dict['username'],
            password=secret_dict['password']
        )
    
    elif step == "finish":
        # Mark version as current
        service_client.update_secret_version_stage(
            SecretId=secret_id,
            VersionStage='AWSCURRENT',
            MoveToVersionId=token,
            RemoveFromVersionId=current_secret['VersionId']
        )
    
    return {"statusCode": 200}

def generate_secure_password(length: int = 32) -> str:
    """Generate cryptographically secure password"""
    import secrets
    import string
    
    alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
    return ''.join(secrets.choice(alphabet) for i in range(length))

Azure Key Vault

Secrets, Keys, and Certificates

from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.keyvault.keys import KeyClient
from azure.keyvault.certificates import CertificateClient
from datetime import datetime, timedelta

class AzureKeyVaultManager:
    """Manage secrets in Azure Key Vault"""
    
    def __init__(self, vault_url: str):
        credential = DefaultAzureCredential()
        
        self.secret_client = SecretClient(vault_url=vault_url, credential=credential)
        self.key_client = KeyClient(vault_url=vault_url, credential=credential)
        self.cert_client = CertificateClient(vault_url=vault_url, credential=credential)
    
    # Secrets
    def set_secret(self, name: str, value: str, expiration_days: int = 365) -> None:
        """Store secret with expiration"""
        
        expires_on = datetime.utcnow() + timedelta(days=expiration_days)
        
        self.secret_client.set_secret(
            name=name,
            value=value,
            enabled=True,
            expires_on=expires_on,
            tags={
                'environment': 'production',
                'created_by': 'automation'
            }
        )
    
    def get_secret(self, name: str) -> str:
        """Retrieve secret"""
        
        secret = self.secret_client.get_secret(name)
        
        return secret.value
    
    def list_secrets(self):
        """List all secrets"""
        
        return list(self.secret_client.list_properties_of_secrets())
    
    # Keys (for encryption/signing)
    def create_key(self, key_name: str) -> None:
        """Create RSA key for encryption"""
        
        from azure.keyvault.keys import KeyType, KeyCurve
        
        self.key_client.create_rsa_key(
            name=key_name,
            size=4096,
            expires_on=datetime.utcnow() + timedelta(days=365),
            tags={'purpose': 'data_encryption'}
        )
    
    def encrypt_data(self, key_name: str, data: bytes) -> bytes:
        """Encrypt data with key"""
        
        from azure.keyvault.keys.crypto import EncryptionAlgorithm
        
        key = self.key_client.get_key(key_name)
        crypto_client = self.key_client.get_cryptography_client(key)
        
        result = crypto_client.encrypt(
            EncryptionAlgorithm.rsa_oaep,
            data
        )
        
        return result.ciphertext
    
    # Certificates
    def import_certificate(self, cert_name: str, cert_bytes: bytes, password: str):
        """Import certificate"""
        
        self.cert_client.import_certificate(
            name=cert_name,
            certificate_bytes=cert_bytes,
            password=password
        )
    
    def get_certificate(self, cert_name: str):
        """Retrieve certificate"""
        
        return self.cert_client.get_certificate(cert_name)

# Managed identity in Azure App Service
vault_url = "https://myvault.vault.azure.net/"
vault_manager = AzureKeyVaultManager(vault_url)

# Retrieve database password
db_password = vault_manager.get_secret("database-password")

# Use in app...

Encryption at Rest

Field-Level Encryption

from cryptography.fernet import Fernet
import base64
import hashlib

class FieldLevelEncryption:
    """Encrypt sensitive fields in database"""
    
    def __init__(self, master_key: str):
        # Derive key from master key
        hash_obj = hashlib.sha256(master_key.encode())
        key = base64.urlsafe_b64encode(hash_obj.digest())
        self.cipher_suite = Fernet(key)
    
    def encrypt_field(self, value: str) -> str:
        """Encrypt field value"""
        
        encrypted = self.cipher_suite.encrypt(value.encode())
        return encrypted.decode()
    
    def decrypt_field(self, encrypted_value: str) -> str:
        """Decrypt field value"""
        
        decrypted = self.cipher_suite.decrypt(encrypted_value.encode())
        return decrypted.decode()

class User:
    """Database model with encrypted fields"""
    
    def __init__(self, name: str, ssn: str, credit_card: str):
        self.name = name
        self.ssn = ssn  # Will be encrypted
        self.credit_card = credit_card  # Will be encrypted
        
        self.encryption = FieldLevelEncryption("master-secret-key")
    
    def save_to_db(self):
        """Save with encrypted fields"""
        
        encrypted_ssn = self.encryption.encrypt_field(self.ssn)
        encrypted_cc = self.encryption.encrypt_field(self.credit_card)
        
        # Save to database
        db.execute("""
            INSERT INTO users (name, ssn, credit_card)
            VALUES (?, ?, ?)
        """, (self.name, encrypted_ssn, encrypted_cc))
    
    def load_from_db(self, user_id: int):
        """Load from database and decrypt"""
        
        row = db.execute(
            "SELECT * FROM users WHERE id = ?", (user_id,)
        ).fetchone()
        
        self.name = row['name']
        self.ssn = self.encryption.decrypt_field(row['ssn'])
        self.credit_card = self.encryption.decrypt_field(row['credit_card'])

# Usage
user = User("John Doe", "123-45-6789", "1234-5678-9012-3456")
user.save_to_db()

# Retrieve and decrypt
loaded_user = User("", "", "")
loaded_user.load_from_db(1)
print(loaded_user.ssn)  # Decrypted

Audit and Monitoring

Secret Access Logging

import logging
from datetime import datetime
from typing import Dict

class SecretAuditLog:
    """Log all secret access"""
    
    def __init__(self, log_storage):
        self.log_storage = log_storage
        self.logger = logging.getLogger('secrets')
    
    def log_access(self, 
                   user_id: str,
                   secret_name: str,
                   action: str,
                   result: str,
                   context: Dict = None):
        """Log secret access"""
        
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': user_id,
            'secret_name': secret_name,
            'action': action,  # GET, CREATE, DELETE, ROTATE
            'result': result,  # SUCCESS, FAILED, DENIED
            'ip_address': context.get('ip') if context else None,
            'user_agent': context.get('user_agent') if context else None
        }
        
        # Log to file
        self.logger.info(json.dumps(log_entry))
        
        # Store in audit storage
        self.log_storage.store(log_entry)
    
    def audit_report(self, days: int = 7) -> List[Dict]:
        """Generate audit report"""
        
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        
        return self.log_storage.query_logs(
            timestamp_after=cutoff_date,
            order_by='timestamp DESC'
        )
    
    def detect_anomalies(self) -> List[Dict]:
        """Detect suspicious access patterns"""
        
        # Get recent logs
        recent_logs = self.log_storage.query_logs(
            timestamp_after=datetime.utcnow() - timedelta(hours=24)
        )
        
        anomalies = []
        
        # Check for failed access attempts
        failed_attempts = {}
        for log in recent_logs:
            if log['result'] == 'FAILED':
                key = (log['user_id'], log['secret_name'])
                failed_attempts[key] = failed_attempts.get(key, 0) + 1
        
        # Alert if >5 failed attempts
        for (user_id, secret_name), count in failed_attempts.items():
            if count > 5:
                anomalies.append({
                    'type': 'BRUTE_FORCE',
                    'user_id': user_id,
                    'secret_name': secret_name,
                    'attempts': count
                })
        
        # Check for access outside business hours
        for log in recent_logs:
            hour = datetime.fromisoformat(log['timestamp']).hour
            if hour not in range(9, 18) and log['result'] == 'SUCCESS':
                anomalies.append({
                    'type': 'UNUSUAL_HOUR',
                    'user_id': log['user_id'],
                    'timestamp': log['timestamp']
                })
        
        return anomalies

# Usage
audit = SecretAuditLog(log_storage)

# Log access
audit.log_access(
    user_id='user123',
    secret_name='database-password',
    action='GET',
    result='SUCCESS',
    context={'ip': '192.168.1.1', 'user_agent': 'curl/7.68.0'}
)

# Check for anomalies
anomalies = audit.detect_anomalies()
if anomalies:
    notify_security_team(anomalies)

Secret Scanning in CI/CD

# .github/workflows/secret-scanning.yml
name: Secret Scanning

on: [push, pull_request]

jobs:
  secret-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
        with:
          fetch-depth: 0  # Full history for scanning
      
      - name: Scan for secrets with GitGuardian
        uses: GitGuardian/ggshield-action@master
        env:
          GITGUARDIAN_API_KEY: ${{ secrets.GITGUARDIAN_API_KEY }}
      
      - name: Scan for secrets with TruffleHog
        uses: trufflesecurity/trufflehog@main
        with:
          path: ./
          base: ${{ github.event.repository.default_branch }}
          head: HEAD
      
      - name: Scan with Gitleaks
        uses: gitleaks/gitleaks-action@v2

Glossary

  • Secret Rotation: Regularly changing credentials
  • TTL: Time To Live (expiration)
  • Dynamic Secrets: Automatically generated and expired
  • Master Key: Key that encrypts other keys
  • Audit Log: Record of all access

Resources

Comments