Skip to main content
โšก Calmops

Cloud Security: AWS, Azure, and GCP Best Practices

Cloud Security: AWS, Azure, and GCP Best Practices

Cloud security requires understanding native security services, identity management, encryption, and compliance frameworks. This guide covers practical security implementations across all three major cloud providers.


Table of Contents

  1. Identity and Access Management (IAM)
  2. Network Security
  3. Data Encryption and Protection
  4. Compliance and Audit Logging
  5. Incident Response and Recovery
  6. Multi-Cloud Security Strategy

1. Identity and Access Management (IAM)

Principle of Least Privilege

The foundation of cloud security is granting minimal necessary permissions:

AWS IAM Implementation:

import boto3
import json

iam_client = boto3.client('iam')

# Create policy with least privilege
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::my-bucket/*",
                "arn:aws:s3:::my-bucket"
            ],
            "Condition": {
                "IpAddress": {
                    "aws:SourceIp": "10.0.0.0/8"
                }
            }
        }
    ]
}

# Attach to role
iam_client.put_role_policy(
    RoleName='app-role',
    PolicyName='s3-read-only',
    PolicyDocument=json.dumps(policy_document)
)

# Audit: List all permissions
roles = iam_client.list_roles()['Roles']
for role in roles:
    print(f"Role: {role['RoleName']}")
    policies = iam_client.list_attached_role_policies(RoleName=role['RoleName'])
    for policy in policies['AttachedPolicies']:
        print(f"  - {policy['PolicyName']}")

Azure RBAC (Role-Based Access Control):

from azure.identity import DefaultAzureCredential
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.mgmt.resource import ResourceManagementClient

credential = DefaultAzureCredential()
auth_client = AuthorizationManagementClient(credential, subscription_id)

# Assign custom role with minimal permissions
role_definition = {
    "properties": {
        "roleName": "Custom Storage Reader",
        "description": "Read-only access to specific storage accounts",
        "type": "CustomRole",
        "permissions": [
            {
                "actions": ["Microsoft.Storage/storageAccounts/read"],
                "notActions": []
            }
        ],
        "assignableScopes": ["/subscriptions/{subscriptionId}"]
    }
}

# Create custom role
created_role = auth_client.role_definitions.create_or_update(
    scope=f"/subscriptions/{subscription_id}",
    role_definition_name=role_name,
    role_definition=role_definition
)

# Assign role to principal
auth_client.role_assignments.create(
    scope=f"/subscriptions/{subscription_id}",
    role_assignment_name=assignment_name,
    parameters={
        "roleDefinitionId": created_role.id,
        "principalId": principal_id
    }
)

GCP IAM with Custom Roles:

from google.cloud import iam_v1
from google.iam.v1 import iam_pb2

def create_custom_role(project_id, role_id):
    """Create custom role with specific permissions"""
    client = iam_v1.IAMClient()
    
    role = iam_pb2.Role(
        title="Custom Storage Reader",
        description="Read-only access to Cloud Storage",
        includedPermissions=[
            "storage.buckets.get",
            "storage.objects.get",
            "storage.objects.list"
        ]
    )
    
    request = iam_v1.CreateRoleRequest(
        parent=f"projects/{project_id}",
        role_id=role_id,
        role=role
    )
    
    return client.create_role(request=request)

# Audit role assignments
def audit_iam_bindings(project_id):
    """Audit all IAM bindings in project"""
    from google.cloud import resourcemanager_v3
    
    client = resourcemanager_v3.ProjectsClient()
    resource = resourcemanager_v3.GetIamPolicyRequest(
        resource=f"projects/{project_id}"
    )
    
    policy = client.get_iam_policy(request=resource)
    
    for binding in policy.bindings:
        print(f"Role: {binding.role}")
        for member in binding.members:
            print(f"  - {member}")

MFA and Credential Management

AWS MFA Enforcement:

# Force MFA for console access
mfa_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowViewAccountInfo",
            "Effect": "Allow",
            "Action": [
                "iam:GetAccountSummary",
                "iam:ListVirtualMFADevices"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowManageOwnVirtualMFADevice",
            "Effect": "Allow",
            "Action": [
                "iam:CreateVirtualMFADevice",
                "iam:DeleteVirtualMFADevice"
            ],
            "Resource": "arn:aws:iam::*:mfa/${aws:username}"
        },
        {
            "Sid": "AllowManageOwnUserMFA",
            "Effect": "Allow",
            "Action": [
                "iam:DeactivateMFADevice",
                "iam:ListMFADevices",
                "iam:ResyncMFADevice"
            ],
            "Resource": "arn:aws:iam::*:user/${aws:username}"
        },
        {
            "Sid": "DenyAllExceptListedIfNoMFA",
            "Effect": "Deny",
            "NotAction": [
                "iam:CreateVirtualMFADevice",
                "iam:EnableMFADevice",
                "iam:ListMFADevices",
                "iam:ListUsers",
                "iam:ListVirtualMFADevices",
                "iam:ResyncMFADevice",
                "sts:GetSessionToken"
            ],
            "Resource": "*",
            "Condition": {
                "BoolIfExists": {
                    "aws:MultiFactorAuthPresent": "false"
                }
            }
        }
    ]
}

Service Account Key Rotation (GCP):

from google.cloud import iam_v1
from google.oauth2 import service_account
import json

def rotate_service_account_keys(project_id, service_account_email):
    """Rotate service account keys - delete old, create new"""
    iam_client = iam_v1.IAMClient()
    
    # List existing keys
    request = iam_v1.ListServiceAccountKeysRequest(
        name=f"projects/-/serviceAccounts/{service_account_email}"
    )
    keys = iam_client.list_service_account_keys(request=request)
    
    # Delete keys older than 90 days
    import datetime
    cutoff_date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=90)
    
    for key in keys.keys:
        if key.valid_after_time and key.valid_after_time.timestamp() < cutoff_date.timestamp():
            print(f"Deleting old key: {key.name}")
            iam_client.delete_service_account_key(name=key.name)
    
    # Create new key
    new_key = iam_client.create_service_account_key(
        name=f"projects/-/serviceAccounts/{service_account_email}"
    )
    
    return new_key

2. Network Security

Virtual Private Cloud (VPC) Configuration

AWS VPC with Security Groups:

import boto3

ec2 = boto3.client('ec2')

# Create security group with strict ingress rules
sg_response = ec2.create_security_group(
    GroupName='app-sg',
    Description='Security group for application',
    VpcId='vpc-12345678'
)

sg_id = sg_response['GroupId']

# Allow only HTTPS from CloudFront
ec2.authorize_security_group_ingress(
    GroupId=sg_id,
    IpPermissions=[
        {
            'IpProtocol': 'tcp',
            'FromPort': 443,
            'ToPort': 443,
            'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTPS from CloudFront'}]
        },
        {
            'IpProtocol': 'tcp',
            'FromPort': 80,
            'ToPort': 80,
            'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'HTTP from CloudFront'}]
        }
    ]
)

# Deny all by default (implicit)
# Only explicit allows go through

Azure Network Security Groups (NSG):

from azure.mgmt.network import NetworkManagementClient

network_client = NetworkManagementClient(credential, subscription_id)

nsg_parameters = {
    "location": "eastus",
    "security_rules": [
        {
            "name": "AllowHTTPS",
            "properties": {
                "protocol": "Tcp",
                "sourcePortRange": "*",
                "destinationPortRange": "443",
                "sourceAddressPrefix": "*",
                "destinationAddressPrefix": "*",
                "access": "Allow",
                "priority": 100,
                "direction": "Inbound"
            }
        },
        {
            "name": "DenySSH",
            "properties": {
                "protocol": "Tcp",
                "sourcePortRange": "*",
                "destinationPortRange": "22",
                "sourceAddressPrefix": "*",
                "destinationAddressPrefix": "*",
                "access": "Deny",
                "priority": 200,
                "direction": "Inbound"
            }
        }
    ]
}

network_client.network_security_groups.begin_create_or_update(
    resource_group_name='my-rg',
    network_security_group_name='app-nsg',
    parameters=nsg_parameters
)

GCP Firewall Rules:

from google.cloud import compute_v1

def create_firewall_rule(project_id, rule_name):
    """Create firewall rule allowing only HTTPS"""
    firewall_rule = compute_v1.Firewall(
        name=rule_name,
        direction="INGRESS",
        priority=1000,
        network=f"projects/{project_id}/global/networks/default",
        allowed=[
            compute_v1.Allowed(
                I_p_protocol="tcp",
                ports=["443"]
            )
        ],
        source_ranges=["0.0.0.0/0"],
        target_tags=["web-server"]
    )
    
    client = compute_v1.FirewallsClient()
    operation = client.insert(project=project_id, firewall_resource=firewall_rule)
    
    return operation

DDoS Protection and WAF

AWS WAF Configuration:

import boto3

waf = boto3.client('wafv2')

# Create IP set for rate limiting
ip_set = waf.create_ip_set(
    Name='rate-limit-ips',
    Scope='CLOUDFRONT',
    IPAddressVersion='IPV4',
    Addresses=[]
)

# Create WAF rules
rule_group = waf.create_rule_group(
    Name='rate-limit-rules',
    Scope='CLOUDFRONT',
    Capacity=2,
    Rules=[
        {
            'Name': 'RateLimitRule',
            'Priority': 1,
            'Statement': {
                'RateBasedStatement': {
                    'Limit': 2000,
                    'AggregateKeyType': 'IP',
                    'ScopeDownStatement': {
                        'ByteMatchStatement': {
                            'FieldToMatch': {'UriPath': {}},
                            'TextTransformations': [
                                {'Priority': 0, 'Type': 'NONE'}
                            ],
                            'PositionalConstraint': 'STARTS_WITH',
                            'SearchString': b'/api/'
                        }
                    }
                }
            },
            'Action': {'Block': {'CustomResponse': {}}},
            'VisibilityConfig': {
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'RateLimitRule'
            }
        }
    ],
    VisibilityConfig={
        'SampledRequestsEnabled': True,
        'CloudWatchMetricsEnabled': True,
        'MetricName': 'rate-limit-rules'
    }
)

# Attach to CloudFront
acl = waf.create_web_acl(
    Name='api-protection',
    Scope='CLOUDFRONT',
    DefaultAction={'Allow': {}},
    Rules=[
        {
            'Name': 'RateLimitRule',
            'Priority': 1,
            'Statement': {
                'RuleGroupReferenceStatement': {
                    'Arn': rule_group['Summary']['ARN']
                }
            },
            'OverrideAction': {'None': {}},
            'VisibilityConfig': {
                'SampledRequestsEnabled': True,
                'CloudWatchMetricsEnabled': True,
                'MetricName': 'RateLimitRule'
            }
        }
    ],
    VisibilityConfig={
        'SampledRequestsEnabled': True,
        'CloudWatchMetricsEnabled': True,
        'MetricName': 'api-protection'
    }
)

3. Data Encryption and Protection

Encryption at Rest

AWS S3 Encryption:

import boto3

s3 = boto3.client('s3')

# Enable default encryption on bucket
s3.put_bucket_encryption(
    Bucket='my-sensitive-data',
    ServerSideEncryptionConfiguration={
        'Rules': [
            {
                'ApplyServerSideEncryptionByDefault': {
                    'SSEAlgorithm': 'aws:kms',
                    'KMSMasterKeyID': 'arn:aws:kms:us-east-1:123456789012:key/12345678'
                },
                'BucketKeyEnabled': True
            }
        ]
    }
)

# Block unencrypted uploads
s3.put_bucket_policy(
    Bucket='my-sensitive-data',
    Policy=json.dumps({
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "DenyUnencryptedObjectUploads",
                "Effect": "Deny",
                "Principal": "*",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::my-sensitive-data/*",
                "Condition": {
                    "StringNotEquals": {
                        "s3:x-amz-server-side-encryption": "aws:kms"
                    }
                }
            }
        ]
    })
)

# Enable versioning for compliance
s3.put_bucket_versioning(
    Bucket='my-sensitive-data',
    VersioningConfiguration={'Status': 'Enabled'}
)

Azure Encryption with Customer-Managed Keys:

from azure.mgmt.storage import StorageManagementClient
from azure.keyvault.keys import KeyClient

# Create storage account with encryption
storage_client = StorageManagementClient(credential, subscription_id)

storage_account = storage_client.storage_accounts.begin_create(
    resource_group_name='my-rg',
    account_name='mystorage',
    parameters={
        "location": "eastus",
        "kind": "StorageV2",
        "sku": {"name": "Standard_GRS"},
        "encryption": {
            "services": {
                "blob": {"enabled": True, "keyType": "Account"},
                "file": {"enabled": True, "keyType": "Account"}
            },
            "keySource": "Microsoft.Keyvault",
            "keyvaultproperties": {
                "keyvaulturi": "https://myvault.vault.azure.net/",
                "keyname": "storage-key",
                "keyversion": "12345678901234567890"
            }
        }
    }
)

GCP Customer-Managed Encryption Keys (CMEK):

from google.cloud import storage
from google.cloud import kms_v1

def create_bucket_with_cmek(project_id, bucket_name, kms_key_name):
    """Create storage bucket with CMEK"""
    storage_client = storage.Client(project=project_id)
    
    bucket = storage.Bucket(storage_client, name=bucket_name)
    bucket.location = "US"
    bucket.encryption_key_name = kms_key_name
    
    created_bucket = storage_client.create_bucket(bucket)
    
    print(f"Created bucket {created_bucket.name} with CMEK")
    return created_bucket

# Verify encryption
bucket = storage_client.get_bucket('my-bucket')
if bucket.encryption_key_name:
    print(f"Encryption key: {bucket.encryption_key_name}")

Encryption in Transit

AWS Certificate Management with Auto-Renewal:

import boto3

acm = boto3.client('acm')

# Request certificate
cert_response = acm.request_certificate(
    DomainName='example.com',
    SubjectAlternativeNames=['*.example.com'],
    ValidationMethod='DNS',
    Tags=[
        {'Key': 'Environment', 'Value': 'Production'}
    ]
)

cert_arn = cert_response['CertificateArn']

# Monitor certificate renewal
def check_certificate_renewal(cert_arn):
    response = acm.describe_certificate(CertificateArn=cert_arn)
    cert = response['Certificate']
    
    print(f"Status: {cert['Status']}")
    print(f"Expiration: {cert['NotAfter']}")
    
    if 'RenewalEligibility' in cert:
        print(f"Renewal Eligible: {cert['RenewalEligibility']}")

Azure TLS/HTTPS Configuration:

from azure.mgmt.web import WebSiteManagementClient

app_service_client = WebSiteManagementClient(credential, subscription_id)

# Force HTTPS only
app_service_client.web_apps.update(
    resource_group_name='my-rg',
    name='my-app',
    site_envelope={
        "https_only": True,
        "client_cert_mode": "Required"
    }
)

# Bind custom domain with SSL
app_service_client.web_apps.create_or_update_host_name_binding(
    resource_group_name='my-rg',
    name='my-app',
    host_name='example.com',
    host_name_binding_resource={
        "ssl_state": "SniEnabled",
        "thumbprint": "certificate-thumbprint"
    }
)

4. Compliance and Audit Logging

Centralized Logging

AWS CloudTrail and CloudWatch:

import boto3

cloudtrail = boto3.client('cloudtrail')
s3 = boto3.client('s3')

# Enable CloudTrail
cloudtrail.start_logging(Name='organization-trail')

# Create S3 bucket for logs
s3.create_bucket(Bucket='cloudtrail-logs-bucket')

# Apply bucket policy for CloudTrail
s3.put_bucket_policy(
    Bucket='cloudtrail-logs-bucket',
    Policy=json.dumps({
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AWSCloudTrailAclCheck",
                "Effect": "Allow",
                "Principal": {
                    "Service": "cloudtrail.amazonaws.com"
                },
                "Action": "s3:GetBucketAcl",
                "Resource": "arn:aws:s3:::cloudtrail-logs-bucket"
            },
            {
                "Sid": "AWSCloudTrailWrite",
                "Effect": "Allow",
                "Principal": {
                    "Service": "cloudtrail.amazonaws.com"
                },
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::cloudtrail-logs-bucket/*",
                "Condition": {
                    "StringEquals": {
                        "s3:x-amz-acl": "bucket-owner-full-control"
                    }
                }
            }
        ]
    })
)

# Create CloudWatch alarm for suspicious activity
cloudwatch = boto3.client('cloudwatch')
logs = boto3.client('logs')

# Create metric filter
logs.put_metric_filter(
    logGroupName='/aws/cloudtrail/organization-trail',
    filterName='UnauthorizedAPICallsMetricFilter',
    filterPattern='{ ($.errorCode = "*UnauthorizedOperation") || ($.errorCode = "AccessDenied*") }',
    metricTransformations=[
        {
            'metricName': 'UnauthorizedAPICallsCount',
            'metricNamespace': 'CloudTrailMetrics',
            'metricValue': '1'
        }
    ]
)

# Alarm on suspicious activity
cloudwatch.put_metric_alarm(
    AlarmName='UnauthorizedAPICallsAlarm',
    MetricName='UnauthorizedAPICallsCount',
    Namespace='CloudTrailMetrics',
    Statistic='Sum',
    Period=300,
    EvaluationPeriods=1,
    Threshold=1,
    ComparisonOperator='GreaterThanOrEqualToThreshold',
    AlarmActions=['arn:aws:sns:us-east-1:123456789012:security-alerts']
)

Azure Monitor and Log Analytics:

from azure.mgmt.monitor import MonitorManagementClient
from azure.mgmt.loganalytics import LogAnalyticsManagementClient

# Create Log Analytics workspace
loganalytics_client = LogAnalyticsManagementClient(credential, subscription_id)

workspace = loganalytics_client.workspaces.begin_create_or_update(
    resource_group_name='my-rg',
    workspace_name='security-logs',
    parameters={
        "location": "eastus",
        "sku": {
            "name": "PerGB2018"
        },
        "retention_in_days": 90
    }
)

# Configure diagnostic settings for multiple resources
monitor_client = MonitorManagementClient(credential, subscription_id)

diagnostic_settings = {
    "location": "eastus",
    "logs": [
        {
            "category": "Administrative",
            "enabled": True,
            "retention_policy": {
                "enabled": True,
                "days": 90
            }
        },
        {
            "category": "Security",
            "enabled": True,
            "retention_policy": {
                "enabled": True,
                "days": 90
            }
        }
    ],
    "metrics": [
        {
            "enabled": True,
            "retention_policy": {
                "enabled": True,
                "days": 30
            }
        }
    ],
    "workspace_id": workspace.id
}

monitor_client.diagnostic_settings.create_or_update(
    resource_uri=f"/subscriptions/{subscription_id}",
    name="security-diagnostics",
    parameters=diagnostic_settings
)

GCP Cloud Audit Logs:

from google.cloud import logging_v2

def enable_audit_logs(project_id):
    """Enable audit logging for security investigation"""
    client = logging_v2.Client(project=project_id)
    
    # Get cloud logger
    logger = client.logger('projects/{}/logs/cloudaudit.googleapis.com'.format(project_id))
    
    # Create log sink for data access events
    sink_name = 'audit-log-sink'
    destination = 'bigquery.googleapis.com/projects/{}/datasets/audit_logs'.format(project_id)
    
    sink = client.sink(sink_name, filter_="""
        resource.type="k8s_cluster"
        AND protoPayload.serviceName="container.googleapis.com"
        AND protoPayload.methodName=~"io.k8s.core.*"
    """, destination_bucket=destination)
    
    if not sink.exists():
        sink.create()
    
    return sink

5. Incident Response and Recovery

Automated Response to Threats

AWS Lambda for Automated Response:

import boto3
import json

def lambda_handler(event, context):
    """Automatically respond to suspicious CloudTrail events"""
    
    ec2 = boto3.client('ec2')
    sns = boto3.client('sns')
    
    # Parse CloudTrail event
    cloudtrail_event = json.loads(event['Records'][0]['Sns']['Message'])
    
    source_ip = cloudtrail_event['sourceIPAddress']
    event_name = cloudtrail_event['eventName']
    principal = cloudtrail_event['userIdentity']['principalId']
    
    # Quarantine suspicious instance
    if event_name == 'RunInstances':
        instance_ids = [r['instanceId'] for r in cloudtrail_event.get('responseElements', {}).get('instancesSet', [])]
        
        # Create security group that blocks all traffic
        sg_response = ec2.create_security_group(
            GroupName='quarantine',
            Description='Quarantine group'
        )
        
        # Revoke all ingress/egress rules
        ec2.revoke_security_group_egress(
            GroupId=sg_response['GroupId'],
            IpPermissions=[{
                'IpProtocol': '-1',
                'FromPort': 0,
                'ToPort': 65535,
                'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
            }]
        )
        
        # Apply quarantine group
        for instance_id in instance_ids:
            ec2.modify_instance_attribute(
                InstanceId=instance_id,
                Groups=[sg_response['GroupId']]
            )
        
        # Alert security team
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789012:security-alerts',
            Subject='SECURITY ALERT: Potential Suspicious Instance',
            Message=f"""
            Suspected suspicious instance activity detected.
            
            Principal: {principal}
            Event: {event_name}
            Source IP: {source_ip}
            Instances: {', '.join(instance_ids)}
            
            Instances have been moved to quarantine group.
            """
        )
    
    return {
        'statusCode': 200,
        'body': json.dumps('Incident response triggered')
    }

Backup and Recovery Strategy

AWS Backup Plan:

import boto3

backup = boto3.client('backup')

# Create backup vault
vault = backup.create_backup_vault(
    BackupVaultName='production-vault',
    BackupVaultTags={
        'Environment': 'Production',
        'Compliance': 'PCI-DSS'
    }
)

# Create backup plan with encryption
backup_plan = {
    "BackupPlanName": "production-daily",
    "Rules": [
        {
            "RuleName": "DailyBackups",
            "TargetBackupVaultName": vault['BackupVaultArn'],
            "ScheduleExpression": "cron(0 2 * * ? *)",  # 2 AM daily
            "StartWindowMinutes": 60,
            "CompletionWindowMinutes": 120,
            "Lifecycle": {
                "DeleteAfterDays": 30,
                "MoveToColdStorageAfterDays": 7
            },
            "EnableContinuousBackup": True,
            "RecoveryPointTags": {
                "BackupType": "Automated"
            }
        }
    ]
}

backup.create_backup_plan(BackupPlan=backup_plan)

# Assign resources to backup plan
backup.start_backup_job(
    BackupVaultName='production-vault',
    ResourceArn='arn:aws:rds:us-east-1:123456789012:db:production-db',
    RecoveryPointTags={
        'BackupType': 'Manual',
        'Reason': 'Pre-deployment'
    }
)

6. Multi-Cloud Security Strategy

Cross-Cloud Identity Federation

AWS STS AssumeRole with External Identity:

import boto3

sts = boto3.client('sts')

# Assume role in another AWS account
response = sts.assume_role(
    RoleArn='arn:aws:iam::OTHER-ACCOUNT:role/CrossAccountRole',
    RoleSessionName='cross-account-session',
    DurationSeconds=3600,
    SerialNumber='arn:aws:iam::THIS-ACCOUNT:mfa/[email protected]',
    TokenCode='123456'  # MFA token
)

# Use temporary credentials
temporary_credentials = response['Credentials']

# Switch to other account
cross_account_s3 = boto3.client(
    's3',
    aws_access_key_id=temporary_credentials['AccessKeyId'],
    aws_secret_access_key=temporary_credentials['SecretAccessKey'],
    aws_session_token=temporary_credentials['SessionToken']
)

Unified Security Posture

import boto3
from concurrent.futures import ThreadPoolExecutor

class MultiCloudSecurityAudit:
    def __init__(self):
        self.aws_client = boto3.client('securityhub')
        self.findings = []
    
    def audit_all_clouds(self):
        """Aggregate security findings from all clouds"""
        
        with ThreadPoolExecutor(max_workers=3) as executor:
            # AWS Security Hub
            executor.submit(self.get_aws_findings)
            # Azure Security Center
            executor.submit(self.get_azure_findings)
            # GCP Security Command Center
            executor.submit(self.get_gcp_findings)
    
    def get_aws_findings(self):
        """Get AWS Security Hub findings"""
        response = self.aws_client.get_findings(
            Filters={
                'SeverityLabel': [{'Value': 'CRITICAL', 'Comparison': 'EQUALS'}],
                'RecordState': [{'Value': 'ACTIVE', 'Comparison': 'EQUALS'}]
            }
        )
        
        for finding in response['Findings']:
            self.findings.append({
                'cloud': 'AWS',
                'severity': finding['Severity']['Label'],
                'type': finding['Types'],
                'resource': finding['Resources'][0]['Id']
            })
    
    def get_azure_findings(self):
        """Get Azure Security Center findings"""
        from azure.mgmt.security import SecurityCenterClient
        
        client = SecurityCenterClient(credential, subscription_id)
        alerts = client.alerts.list()
        
        for alert in alerts:
            self.findings.append({
                'cloud': 'Azure',
                'severity': alert.alert_severity,
                'type': alert.alert_type,
                'resource': alert.affected_resource
            })
    
    def get_gcp_findings(self):
        """Get GCP Security Command Center findings"""
        from google.cloud import securitycenter_v1
        
        client = securitycenter_v1.SecurityCenterClient()
        parent = f"organizations/{ORG_ID}"
        
        findings = client.list_findings(request={
            "parent": parent,
            "filter": 'state="ACTIVE" AND severity="CRITICAL"'
        })
        
        for result in findings:
            self.findings.append({
                'cloud': 'GCP',
                'severity': result.finding.severity.name,
                'type': result.finding.finding_class,
                'resource': result.finding.resource_name
            })
    
    def generate_report(self):
        """Generate unified security report"""
        by_cloud = {}
        for finding in self.findings:
            cloud = finding['cloud']
            by_cloud[cloud] = by_cloud.get(cloud, 0) + 1
        
        print("Security Findings Summary")
        print("=" * 50)
        for cloud, count in by_cloud.items():
            print(f"{cloud}: {count} critical findings")

Key Takeaways

  1. Least Privilege: Always grant minimum necessary permissions
  2. Multi-Factor Authentication: Enforce MFA across all clouds
  3. Encryption: Encrypt data at rest and in transit
  4. Centralized Logging: Aggregate logs for security analysis
  5. Automated Response: Use functions/serverless for incident response
  6. Regular Audits: Continuous compliance monitoring
  7. Disaster Recovery: Maintain backup and recovery plans
  8. Multi-Cloud Strategy: Implement consistent security across clouds

Glossary

  • IAM: Identity and Access Management
  • MFA: Multi-Factor Authentication
  • VPC: Virtual Private Cloud
  • WAF: Web Application Firewall
  • CMEK: Customer-Managed Encryption Keys
  • SIEM: Security Information and Event Management
  • DLP: Data Loss Prevention
  • RTO: Recovery Time Objective
  • RPO: Recovery Point Objective

Resources

Comments