Skip to main content
โšก Calmops

Zero Trust Architecture: Implementation Guide

Zero Trust Architecture: Implementation Guide

Zero Trust replaces implicit trust with explicit verification. This guide covers designing and implementing zero trust architectures that verify every access request.


Core Principles

  1. Verify Explicitly: Use all available data points
  2. Least Privileged Access: Limit access to minimum necessary
  3. Assume Breach: Design as if compromise is inevitable
  4. Secure Every Layer: Application, network, data, identity
  5. Continuous Verification: Ongoing authentication and authorization

Identity Verification Layer

Multi-Step Authentication

import hashlib
import hmac
import time
from cryptography.fernet import Fernet

class ZeroTrustAuthenticator:
    def __init__(self, secret_key):
        self.secret = secret_key.encode()
        self.cipher = Fernet(Fernet.generate_key())
    
    def generate_totp(self, user_id, secret):
        """Generate time-based one-time password"""
        import pyotp
        
        totp = pyotp.TOTP(secret)
        return totp.now()
    
    def verify_step_up(self, user_id, password, otp, device_id):
        """Multi-step verification"""
        
        # 1. Password verification
        stored_hash = self.get_password_hash(user_id)
        if not self.verify_password(password, stored_hash):
            return False, "Invalid password"
        
        # 2. TOTP verification
        if not self.verify_totp(user_id, otp):
            return False, "Invalid OTP"
        
        # 3. Device fingerprint
        if not self.verify_device(user_id, device_id):
            return False, "Unregistered device"
        
        # 4. Risk analysis
        risk_score = self.calculate_risk_score(user_id)
        if risk_score > 0.8:
            return False, "Suspicious activity detected"
        
        return True, "Authentication successful"
    
    def calculate_risk_score(self, user_id):
        """Risk-based adaptive authentication"""
        risk_factors = []
        
        # Check location change
        current_location = self.get_user_location()
        previous_location = self.get_previous_location(user_id)
        
        if self.calculate_distance(current_location, previous_location) > 1000:  # km
            risk_factors.append(0.3)  # 30% risk
        
        # Check time anomaly
        if not self.is_normal_access_time(user_id):
            risk_factors.append(0.2)
        
        # Check device
        if not self.is_known_device(user_id):
            risk_factors.append(0.4)
        
        # Check network
        if not self.is_safe_network(user_id):
            risk_factors.append(0.25)
        
        return sum(risk_factors) / len(risk_factors) if risk_factors else 0

Passwordless Authentication

from fastapi import FastAPI, HTTPException
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import base64

app = FastAPI()

class WebAuthnHandler:
    """WebAuthn (FIDO2) passwordless authentication"""
    
    def __init__(self):
        self.origins = ["https://example.com"]
        self.rp_id = "example.com"
        self.rp_name = "Example Corp"
    
    def initiate_registration(self, user_id: str, username: str):
        """Start passwordless registration"""
        import webauthn
        from webauthn.helpers.structs import (
            PublicKeyCredentialDescriptor,
            ResidentKeyRequirement,
            UserVerificationRequirement,
        )
        
        challenge = webauthn.generate_challenge(32)
        
        registration_data = webauthn.generate_registration_options(
            rp_id=self.rp_id,
            rp_name=self.rp_name,
            user_id=user_id,
            user_name=username,
            challenge=challenge,
            authenticator_selection=webauthn.AuthenticatorSelectionCriteria(
                authenticator_attachment="platform",
                resident_key=ResidentKeyRequirement.PREFERRED,
                user_verification=UserVerificationRequirement.REQUIRED,
            ),
        )
        
        # Store challenge temporarily
        self.store_challenge(user_id, challenge)
        
        return registration_data
    
    def complete_registration(self, user_id: str, credential):
        """Complete passwordless registration"""
        import webauthn
        
        challenge = self.retrieve_challenge(user_id)
        
        verified_credential = webauthn.verify_registration_response(
            credential=credential,
            expected_challenge=challenge,
            expected_origin=self.origins[0],
            expected_rp_id=self.rp_id,
        )
        
        # Store credential for user
        self.store_credential(
            user_id,
            verified_credential.credential_id,
            verified_credential.credential_public_key
        )
        
        return {"success": True}
    
    def initiate_authentication(self, username: str):
        """Start passwordless authentication"""
        import webauthn
        
        challenge = webauthn.generate_challenge(32)
        
        # Get registered credentials for user
        user_credentials = self.get_user_credentials(username)
        
        authentication_data = webauthn.generate_authentication_options(
            rp_id=self.rp_id,
            challenge=challenge,
            allow_credentials=[
                PublicKeyCredentialDescriptor(
                    type="public-key",
                    id=cred['credential_id']
                )
                for cred in user_credentials
            ],
            user_verification=UserVerificationRequirement.REQUIRED,
        )
        
        self.store_challenge(username, challenge)
        
        return authentication_data
    
    def complete_authentication(self, username: str, assertion):
        """Complete passwordless authentication"""
        import webauthn
        
        challenge = self.retrieve_challenge(username)
        credentials = self.get_user_credentials(username)
        
        verified_assertion = webauthn.verify_authentication_response(
            credential=assertion,
            expected_challenge=challenge,
            expected_origin=self.origins[0],
            expected_rp_id=self.rp_id,
            credential_public_key=credentials[0]['public_key'],
            credential_current_sign_count=credentials[0]['sign_count'],
        )
        
        # Update sign count
        self.update_sign_count(username, verified_assertion.sign_count)
        
        return {"success": True, "user_id": username}

@app.post("/register/start")
async def start_registration(username: str):
    handler = WebAuthnHandler()
    return handler.initiate_registration(username, username)

@app.post("/auth/start")
async def start_auth(username: str):
    handler = WebAuthnHandler()
    return handler.initiate_authentication(username)

Network Segmentation

Microsegmentation

import boto3
from typing import List, Dict

class MicrosegmentationManager:
    """Implement microsegmentation in cloud environment"""
    
    def __init__(self):
        self.ec2 = boto3.client('ec2')
    
    def create_isolation_groups(self, workloads: Dict[str, List[str]]):
        """Create security groups per workload isolation"""
        
        # Database tier
        db_sg = self.ec2.create_security_group(
            GroupName='db-tier-isolated',
            Description='Database tier - isolated'
        )
        
        # App tier
        app_sg = self.ec2.create_security_group(
            GroupName='app-tier-isolated',
            Description='Application tier - isolated'
        )
        
        # Frontend tier
        frontend_sg = self.ec2.create_security_group(
            GroupName='frontend-tier-isolated',
            Description='Frontend tier - isolated'
        )
        
        # Allow only necessary traffic
        # Frontend -> App: 443 only
        self.ec2.authorize_security_group_ingress(
            GroupId=app_sg['GroupId'],
            IpPermissions=[{
                'IpProtocol': 'tcp',
                'FromPort': 443,
                'ToPort': 443,
                'UserIdGroupPairs': [{'GroupId': frontend_sg['GroupId']}]
            }]
        )
        
        # App -> Database: 5432 (PostgreSQL) only
        self.ec2.authorize_security_group_ingress(
            GroupId=db_sg['GroupId'],
            IpPermissions=[{
                'IpProtocol': 'tcp',
                'FromPort': 5432,
                'ToPort': 5432,
                'UserIdGroupPairs': [{'GroupId': app_sg['GroupId']}]
            }]
        )
        
        # Deny all other traffic (implicit)
        return {
            'frontend': frontend_sg['GroupId'],
            'app': app_sg['GroupId'],
            'database': db_sg['GroupId']
        }
    
    def enforce_zero_trust_rules(self, sg_ids: Dict[str, str]):
        """Enforce zero trust at network level"""
        
        # Block all traffic by default
        for tier, sg_id in sg_ids.items():
            # Revoke default allow-all rule
            try:
                self.ec2.revoke_security_group_egress(
                    GroupId=sg_id,
                    IpPermissions=[{
                        'IpProtocol': '-1',
                        'CidrIp': '0.0.0.0/0'
                    }]
                )
            except:
                pass
            
            # Only allow specific egress
            if tier == 'frontend':
                # Only to app tier
                self.ec2.authorize_security_group_egress(
                    GroupId=sg_id,
                    IpPermissions=[{
                        'IpProtocol': 'tcp',
                        'FromPort': 443,
                        'ToPort': 443,
                        'UserIdGroupPairs': [{'GroupId': sg_ids['app']}]
                    }]
                )
            
            elif tier == 'app':
                # To database and external APIs
                self.ec2.authorize_security_group_egress(
                    GroupId=sg_id,
                    IpPermissions=[
                        {
                            'IpProtocol': 'tcp',
                            'FromPort': 5432,
                            'ToPort': 5432,
                            'UserIdGroupPairs': [{'GroupId': sg_ids['database']}]
                        },
                        {
                            'IpProtocol': 'tcp',
                            'FromPort': 443,
                            'ToPort': 443,
                            'CidrIp': '0.0.0.0/0'  # External APIs only
                        }
                    ]
                )

Network Access Control

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class AccessPolicy:
    source: str  # User/service
    resource: str  # Target resource
    action: str  # Read/write/delete
    conditions: dict  # Time, location, device
    expires_at: datetime
    justification: str

class ZeroTrustPolicyEngine:
    """Policy engine for zero trust access"""
    
    def __init__(self):
        self.policies: List[AccessPolicy] = []
        self.audit_log = []
    
    def evaluate_access(self, 
                       source: str, 
                       resource: str, 
                       action: str,
                       context: dict) -> tuple[bool, str]:
        """Evaluate access request against policies"""
        
        # 1. Check time condition
        current_time = datetime.now()
        if not self.is_within_time_window(context.get('time')):
            return False, "Access outside allowed time window"
        
        # 2. Check location
        if not self.is_allowed_location(source, context.get('location')):
            return False, "Access from unauthorized location"
        
        # 3. Check device
        if not self.is_trusted_device(source, context.get('device_id')):
            return False, "Unregistered device"
        
        # 4. Check network
        if not self.is_safe_network(context.get('network')):
            return False, "Unsafe network detected"
        
        # 5. Check policy exists and is valid
        matching_policy = self.find_matching_policy(source, resource, action)
        if not matching_policy:
            return False, "No matching policy"
        
        if matching_policy.expires_at < datetime.now():
            return False, "Policy expired"
        
        # 6. Log access
        self.log_access(source, resource, action, "ALLOWED", context)
        
        return True, "Access granted"
    
    def request_access(self, 
                      source: str,
                      resource: str,
                      action: str,
                      justification: str,
                      duration_hours: int = 1) -> Optional[AccessPolicy]:
        """Request just-in-time (JIT) access"""
        
        # Verify request
        if not self.verify_requester(source):
            return None
        
        # Create policy
        policy = AccessPolicy(
            source=source,
            resource=resource,
            action=action,
            conditions={},
            expires_at=datetime.now() + timedelta(hours=duration_hours),
            justification=justification
        )
        
        # Check with approval workflow
        if self.requires_approval(source, resource):
            approval = self.request_approval(policy)
            if not approval:
                self.log_access(source, resource, action, "DENIED - APPROVAL REJECTED", {})
                return None
        
        # Grant access
        self.policies.append(policy)
        self.log_access(source, resource, action, "APPROVED", {})
        
        return policy
    
    def is_within_time_window(self, access_time: Optional[str]) -> bool:
        """Check if access time is within allowed window"""
        if not access_time:
            return False
        
        current_hour = datetime.now().hour
        # Allow access during business hours (9-17)
        return 9 <= current_hour < 17
    
    def find_matching_policy(self, source: str, resource: str, action: str) -> Optional[AccessPolicy]:
        """Find matching access policy"""
        for policy in self.policies:
            if (policy.source == source and 
                policy.resource == resource and 
                policy.action == action):
                return policy
        return None
    
    def log_access(self, source, resource, action, result, context):
        """Audit log all access attempts"""
        self.audit_log.append({
            'timestamp': datetime.now(),
            'source': source,
            'resource': resource,
            'action': action,
            'result': result,
            'context': context
        })

Continuous Verification

Behavioral Analytics

import numpy as np
from typing import Dict, List
from collections import defaultdict

class BehavioralAnalytics:
    """Detect anomalous behavior in real-time"""
    
    def __init__(self, baseline_window_days: int = 30):
        self.baseline_window = baseline_window_days
        self.user_profiles: Dict[str, dict] = defaultdict(lambda: {
            'access_times': [],
            'locations': [],
            'resources': [],
            'access_patterns': defaultdict(int)
        })
        self.anomaly_threshold = 0.85  # 85% confidence threshold
    
    def build_baseline(self, user_id: str, historical_events: List[dict]):
        """Build user behavioral baseline"""
        
        profile = self.user_profiles[user_id]
        
        for event in historical_events:
            profile['access_times'].append(event['timestamp'].hour)
            profile['locations'].append(event['location'])
            profile['resources'].append(event['resource'])
            
            # Build access pattern (which resources accessed from which location)
            key = f"{event['location']}::{event['resource']}"
            profile['access_patterns'][key] += 1
        
        return self._calculate_statistics(profile)
    
    def detect_anomaly(self, user_id: str, event: dict) -> tuple[bool, float]:
        """Detect anomalous access in real-time"""
        
        profile = self.user_profiles.get(user_id)
        if not profile:
            return True, 1.0  # Unknown user = anomaly
        
        anomaly_scores = []
        
        # 1. Time anomaly
        time_score = self._score_time_anomaly(event['timestamp'].hour, profile)
        anomaly_scores.append(time_score)
        
        # 2. Location anomaly
        location_score = self._score_location_anomaly(event['location'], profile)
        anomaly_scores.append(location_score)
        
        # 3. Resource anomaly
        resource_score = self._score_resource_anomaly(event['resource'], profile)
        anomaly_scores.append(resource_score)
        
        # 4. Pattern anomaly
        pattern_key = f"{event['location']}::{event['resource']}"
        pattern_score = self._score_pattern_anomaly(pattern_key, profile)
        anomaly_scores.append(pattern_score)
        
        # Average anomaly score
        avg_anomaly = np.mean(anomaly_scores)
        
        is_anomaly = avg_anomaly > self.anomaly_threshold
        
        if is_anomaly:
            # Trigger adaptive authentication
            return True, avg_anomaly
        
        return False, avg_anomaly
    
    def _score_time_anomaly(self, hour: int, profile: dict) -> float:
        """Score time-based anomaly (0-1, 1=anomaly)"""
        access_times = profile['access_times']
        if not access_times:
            return 0.0
        
        # Check if access hour is unusual
        mean_hour = np.mean(access_times)
        std_hour = np.std(access_times)
        
        if std_hour == 0:
            return 0.0 if hour == mean_hour else 1.0
        
        z_score = abs((hour - mean_hour) / std_hour)
        return min(1.0, z_score / 3)  # Normalize to 0-1
    
    def _score_location_anomaly(self, location: str, profile: dict) -> float:
        """Score location-based anomaly"""
        locations = profile['locations']
        if not locations:
            return 0.0
        
        known_locations = set(locations)
        return 0.0 if location in known_locations else 1.0
    
    def _score_resource_anomaly(self, resource: str, profile: dict) -> float:
        """Score resource access anomaly"""
        resources = profile['resources']
        if not resources:
            return 0.0
        
        known_resources = set(resources)
        return 0.0 if resource in known_resources else 1.0
    
    def _score_pattern_anomaly(self, pattern: str, profile: dict) -> float:
        """Score access pattern anomaly"""
        patterns = profile['access_patterns']
        
        if pattern in patterns:
            # Known pattern
            return 0.0
        else:
            # Unknown pattern
            return 1.0

# Usage
analytics = BehavioralAnalytics()

# Build baseline
historical_events = [...]  # 30 days of access logs
analytics.build_baseline('user123', historical_events)

# Monitor access
new_access = {'timestamp': datetime.now(), 'location': 'NYC', 'resource': 'database'}
is_anomaly, score = analytics.detect_anomaly('user123', new_access)

if is_anomaly:
    print(f"Anomalous access detected (score: {score:.2f})")
    # Trigger MFA, require approval, etc.

Putting It Together

class ZeroTrustAccessController:
    """Unified zero trust access control"""
    
    def __init__(self):
        self.authenticator = ZeroTrustAuthenticator(secret_key="...")
        self.policy_engine = ZeroTrustPolicyEngine()
        self.analytics = BehavioralAnalytics()
    
    def grant_access(self, user_id: str, resource: str, action: str, context: dict) -> bool:
        """Unified access decision"""
        
        # 1. Verify identity
        auth_result, auth_msg = self.authenticator.verify_step_up(
            user_id,
            context.get('password'),
            context.get('otp'),
            context.get('device_id')
        )
        if not auth_result:
            return False
        
        # 2. Check behavioral anomaly
        is_anomaly, anomaly_score = self.analytics.detect_anomaly(user_id, {
            'timestamp': datetime.now(),
            'location': context.get('location'),
            'resource': resource
        })
        if is_anomaly and anomaly_score > 0.95:
            # Require additional verification
            print(f"High anomaly score ({anomaly_score}), requiring additional verification")
            context['requires_approval'] = True
        
        # 3. Check policy
        can_access, policy_msg = self.policy_engine.evaluate_access(
            user_id, resource, action, context
        )
        
        return can_access

Glossary

  • Zero Trust: Security model verifying every access request
  • Microsegmentation: Dividing network into isolated security zones
  • JIT: Just-In-Time temporary access
  • MFA: Multi-Factor Authentication
  • RBAC: Role-Based Access Control
  • WebAuthn: Passwordless authentication standard

Resources

Comments