Secrets Management and Rotation: Vault, AWS Secrets Manager, Azure Key Vault
Secrets (passwords, API keys, certificates) need protection and rotation. This guide covers managing secrets across cloud platforms with automated rotation.
HashiCorp Vault Setup
Vault Initialization and Unsealing
import hvac
from typing import Dict, List
class VaultManager:
"""Manage secrets with HashiCorp Vault"""
def __init__(self, vault_addr: str = "http://localhost:8200"):
self.client = hvac.Client(url=vault_addr)
def initialize_vault(self, shares: int = 5, threshold: int = 3) -> Dict:
"""Initialize Vault (one-time operation)"""
result = self.client.sys.initialize(
secret_shares=shares,
secret_threshold=threshold
)
return {
'keys': result['keys'],
'root_token': result['root_token'],
'keys_base64': result['keys_base64']
}
def unseal_vault(self, keys: List[str]):
"""Unseal Vault with key shares"""
for key in keys:
self.client.sys.submit_unseal_key(key)
def enable_secret_engine(self, engine_type: str = "kv", path: str = "secret"):
"""Enable secret engine"""
self.client.sys.enable_secrets_engine(
backend_type=engine_type,
path=path,
options={'version': 2}
)
def store_secret(self, path: str, secret_data: Dict):
"""Store secret in Vault"""
response = self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=secret_data
)
return response
def retrieve_secret(self, path: str) -> Dict:
"""Retrieve secret from Vault"""
response = self.client.secrets.kv.v2.read_secret_version(path=path)
return response['data']['data']
def list_secrets(self, path: str = ""):
"""List all secrets at path"""
response = self.client.secrets.kv.v2.list_secrets(path=path)
return response['data']['keys']
# Usage
vault = VaultManager()
# Store database credentials
vault.store_secret('database/prod', {
'username': 'admin',
'password': 'super_secure_password',
'host': 'db.example.com',
'port': 5432
})
# Retrieve when needed
db_creds = vault.retrieve_secret('database/prod')
Dynamic Database Credentials
class VaultDynamicSecrets:
"""Generate dynamic secrets that self-destruct"""
def __init__(self, vault_client):
self.client = vault_client
def enable_database_engine(self):
"""Enable database secret engine"""
self.client.sys.enable_secrets_engine(
backend_type='database',
path='database'
)
def configure_postgres_connection(self):
"""Configure PostgreSQL connection"""
self.client.write(
path='database/config/postgres',
connection_url='postgresql://{{username}}:{{password}}@postgres.example.com:5432/mydb',
allowed_roles=['readonly', 'readwrite'],
username='vault_admin',
password='admin_password'
)
def create_database_role(self, role_name: str, ttl: str = '1h'):
"""Create database role with automatic expiry"""
# Read-only role
self.client.write(
path=f'database/roles/{role_name}',
db_name='postgres',
creation_statements=[
"CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'",
"GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\""
],
default_ttl=ttl,
max_ttl='24h'
)
def get_dynamic_credentials(self, role_name: str) -> Dict:
"""Get temporary database credentials"""
response = self.client.read(f'database/creds/{role_name}')
return {
'username': response['data']['username'],
'password': response['data']['password'],
'ttl': response['lease_duration'] # Seconds
}
# Usage
dynamic_secrets = VaultDynamicSecrets(vault.client)
# Create role
dynamic_secrets.create_database_role('app_readonly', ttl='1h')
# Get temporary credentials (valid for 1 hour)
creds = dynamic_secrets.get_dynamic_credentials('app_readonly')
# Use credentials...
# Vault automatically revokes after TTL
AWS Secrets Manager
Store and Rotate Secrets
import boto3
import json
from datetime import datetime
class AWSSecretsManager:
"""Manage secrets in AWS Secrets Manager"""
def __init__(self):
self.client = boto3.client('secretsmanager')
def create_secret(self, name: str, secret_value: dict, tags: dict = None) -> str:
"""Create new secret"""
response = self.client.create_secret(
Name=name,
Description='Application secret',
SecretString=json.dumps(secret_value),
Tags=[{'Key': k, 'Value': v} for k, v in (tags or {}).items()],
AddReplicaRegions=[
{'Region': 'us-west-2'},
{'Region': 'eu-west-1'}
]
)
return response['ARN']
def get_secret(self, secret_id: str) -> dict:
"""Retrieve secret value"""
response = self.client.get_secret_value(SecretId=secret_id)
return json.loads(response['SecretString'])
def rotate_secret(self, secret_id: str, lambda_function_arn: str):
"""Enable automatic secret rotation"""
response = self.client.rotate_secret(
SecretId=secret_id,
RotationLambdaARN=lambda_function_arn,
RotationRules={
'AutomaticallyAfterDays': 30,
'Duration': '3h',
'ScheduleExpression': 'rate(30 days)'
}
)
return response
def list_secrets(self) -> List[dict]:
"""List all secrets"""
response = self.client.list_secrets()
return response['SecretList']
def delete_secret(self, secret_id: str, recovery_window: int = 7) -> str:
"""Schedule secret deletion"""
response = self.client.delete_secret(
SecretId=secret_id,
RecoveryWindowInDays=recovery_window
)
return response['DeletionDate']
# Rotation Lambda function
def lambda_rotate_secret(event, context):
"""AWS Lambda for secret rotation"""
service_client = boto3.client('secretsmanager')
secret_id = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
# Get current secret
current_secret = service_client.get_secret_value(
SecretId=secret_id,
VersionId=token
)
secret_dict = json.loads(current_secret['SecretString'])
if step == "create":
# Generate new secret
new_password = generate_secure_password()
secret_dict['password'] = new_password
# Store with new version
service_client.put_secret_value(
SecretId=secret_id,
ClientRequestToken=token,
SecretString=json.dumps(secret_dict),
VersionStages=['AWSPENDING']
)
elif step == "set":
# Update actual resource
update_database_password(
username=secret_dict['username'],
password=secret_dict['password']
)
elif step == "test":
# Test new credentials
test_database_connection(
username=secret_dict['username'],
password=secret_dict['password']
)
elif step == "finish":
# Mark version as current
service_client.update_secret_version_stage(
SecretId=secret_id,
VersionStage='AWSCURRENT',
MoveToVersionId=token,
RemoveFromVersionId=current_secret['VersionId']
)
return {"statusCode": 200}
def generate_secure_password(length: int = 32) -> str:
"""Generate cryptographically secure password"""
import secrets
import string
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(secrets.choice(alphabet) for i in range(length))
Azure Key Vault
Secrets, Keys, and Certificates
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from azure.keyvault.keys import KeyClient
from azure.keyvault.certificates import CertificateClient
from datetime import datetime, timedelta
class AzureKeyVaultManager:
"""Manage secrets in Azure Key Vault"""
def __init__(self, vault_url: str):
credential = DefaultAzureCredential()
self.secret_client = SecretClient(vault_url=vault_url, credential=credential)
self.key_client = KeyClient(vault_url=vault_url, credential=credential)
self.cert_client = CertificateClient(vault_url=vault_url, credential=credential)
# Secrets
def set_secret(self, name: str, value: str, expiration_days: int = 365) -> None:
"""Store secret with expiration"""
expires_on = datetime.utcnow() + timedelta(days=expiration_days)
self.secret_client.set_secret(
name=name,
value=value,
enabled=True,
expires_on=expires_on,
tags={
'environment': 'production',
'created_by': 'automation'
}
)
def get_secret(self, name: str) -> str:
"""Retrieve secret"""
secret = self.secret_client.get_secret(name)
return secret.value
def list_secrets(self):
"""List all secrets"""
return list(self.secret_client.list_properties_of_secrets())
# Keys (for encryption/signing)
def create_key(self, key_name: str) -> None:
"""Create RSA key for encryption"""
from azure.keyvault.keys import KeyType, KeyCurve
self.key_client.create_rsa_key(
name=key_name,
size=4096,
expires_on=datetime.utcnow() + timedelta(days=365),
tags={'purpose': 'data_encryption'}
)
def encrypt_data(self, key_name: str, data: bytes) -> bytes:
"""Encrypt data with key"""
from azure.keyvault.keys.crypto import EncryptionAlgorithm
key = self.key_client.get_key(key_name)
crypto_client = self.key_client.get_cryptography_client(key)
result = crypto_client.encrypt(
EncryptionAlgorithm.rsa_oaep,
data
)
return result.ciphertext
# Certificates
def import_certificate(self, cert_name: str, cert_bytes: bytes, password: str):
"""Import certificate"""
self.cert_client.import_certificate(
name=cert_name,
certificate_bytes=cert_bytes,
password=password
)
def get_certificate(self, cert_name: str):
"""Retrieve certificate"""
return self.cert_client.get_certificate(cert_name)
# Managed identity in Azure App Service
vault_url = "https://myvault.vault.azure.net/"
vault_manager = AzureKeyVaultManager(vault_url)
# Retrieve database password
db_password = vault_manager.get_secret("database-password")
# Use in app...
Encryption at Rest
Field-Level Encryption
from cryptography.fernet import Fernet
import base64
import hashlib
class FieldLevelEncryption:
"""Encrypt sensitive fields in database"""
def __init__(self, master_key: str):
# Derive key from master key
hash_obj = hashlib.sha256(master_key.encode())
key = base64.urlsafe_b64encode(hash_obj.digest())
self.cipher_suite = Fernet(key)
def encrypt_field(self, value: str) -> str:
"""Encrypt field value"""
encrypted = self.cipher_suite.encrypt(value.encode())
return encrypted.decode()
def decrypt_field(self, encrypted_value: str) -> str:
"""Decrypt field value"""
decrypted = self.cipher_suite.decrypt(encrypted_value.encode())
return decrypted.decode()
class User:
"""Database model with encrypted fields"""
def __init__(self, name: str, ssn: str, credit_card: str):
self.name = name
self.ssn = ssn # Will be encrypted
self.credit_card = credit_card # Will be encrypted
self.encryption = FieldLevelEncryption("master-secret-key")
def save_to_db(self):
"""Save with encrypted fields"""
encrypted_ssn = self.encryption.encrypt_field(self.ssn)
encrypted_cc = self.encryption.encrypt_field(self.credit_card)
# Save to database
db.execute("""
INSERT INTO users (name, ssn, credit_card)
VALUES (?, ?, ?)
""", (self.name, encrypted_ssn, encrypted_cc))
def load_from_db(self, user_id: int):
"""Load from database and decrypt"""
row = db.execute(
"SELECT * FROM users WHERE id = ?", (user_id,)
).fetchone()
self.name = row['name']
self.ssn = self.encryption.decrypt_field(row['ssn'])
self.credit_card = self.encryption.decrypt_field(row['credit_card'])
# Usage
user = User("John Doe", "123-45-6789", "1234-5678-9012-3456")
user.save_to_db()
# Retrieve and decrypt
loaded_user = User("", "", "")
loaded_user.load_from_db(1)
print(loaded_user.ssn) # Decrypted
Audit and Monitoring
Secret Access Logging
import logging
from datetime import datetime
from typing import Dict
class SecretAuditLog:
"""Log all secret access"""
def __init__(self, log_storage):
self.log_storage = log_storage
self.logger = logging.getLogger('secrets')
def log_access(self,
user_id: str,
secret_name: str,
action: str,
result: str,
context: Dict = None):
"""Log secret access"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'secret_name': secret_name,
'action': action, # GET, CREATE, DELETE, ROTATE
'result': result, # SUCCESS, FAILED, DENIED
'ip_address': context.get('ip') if context else None,
'user_agent': context.get('user_agent') if context else None
}
# Log to file
self.logger.info(json.dumps(log_entry))
# Store in audit storage
self.log_storage.store(log_entry)
def audit_report(self, days: int = 7) -> List[Dict]:
"""Generate audit report"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
return self.log_storage.query_logs(
timestamp_after=cutoff_date,
order_by='timestamp DESC'
)
def detect_anomalies(self) -> List[Dict]:
"""Detect suspicious access patterns"""
# Get recent logs
recent_logs = self.log_storage.query_logs(
timestamp_after=datetime.utcnow() - timedelta(hours=24)
)
anomalies = []
# Check for failed access attempts
failed_attempts = {}
for log in recent_logs:
if log['result'] == 'FAILED':
key = (log['user_id'], log['secret_name'])
failed_attempts[key] = failed_attempts.get(key, 0) + 1
# Alert if >5 failed attempts
for (user_id, secret_name), count in failed_attempts.items():
if count > 5:
anomalies.append({
'type': 'BRUTE_FORCE',
'user_id': user_id,
'secret_name': secret_name,
'attempts': count
})
# Check for access outside business hours
for log in recent_logs:
hour = datetime.fromisoformat(log['timestamp']).hour
if hour not in range(9, 18) and log['result'] == 'SUCCESS':
anomalies.append({
'type': 'UNUSUAL_HOUR',
'user_id': log['user_id'],
'timestamp': log['timestamp']
})
return anomalies
# Usage
audit = SecretAuditLog(log_storage)
# Log access
audit.log_access(
user_id='user123',
secret_name='database-password',
action='GET',
result='SUCCESS',
context={'ip': '192.168.1.1', 'user_agent': 'curl/7.68.0'}
)
# Check for anomalies
anomalies = audit.detect_anomalies()
if anomalies:
notify_security_team(anomalies)
Secret Scanning in CI/CD
# .github/workflows/secret-scanning.yml
name: Secret Scanning
on: [push, pull_request]
jobs:
secret-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0 # Full history for scanning
- name: Scan for secrets with GitGuardian
uses: GitGuardian/ggshield-action@master
env:
GITGUARDIAN_API_KEY: ${{ secrets.GITGUARDIAN_API_KEY }}
- name: Scan for secrets with TruffleHog
uses: trufflesecurity/trufflehog@main
with:
path: ./
base: ${{ github.event.repository.default_branch }}
head: HEAD
- name: Scan with Gitleaks
uses: gitleaks/gitleaks-action@v2
Glossary
- Secret Rotation: Regularly changing credentials
- TTL: Time To Live (expiration)
- Dynamic Secrets: Automatically generated and expired
- Master Key: Key that encrypts other keys
- Audit Log: Record of all access
Resources
- HashiCorp Vault Documentation
- AWS Secrets Manager Guide
- Azure Key Vault Documentation
- OWASP Secrets Management
Comments