Zero Trust Architecture: Implementation Guide
Zero Trust replaces implicit trust with explicit verification. This guide covers designing and implementing zero trust architectures that verify every access request.
Core Principles
- Verify Explicitly: Use all available data points
- Least Privileged Access: Limit access to minimum necessary
- Assume Breach: Design as if compromise is inevitable
- Secure Every Layer: Application, network, data, identity
- Continuous Verification: Ongoing authentication and authorization
Identity Verification Layer
Multi-Step Authentication
import hashlib
import hmac
import time
from cryptography.fernet import Fernet
class ZeroTrustAuthenticator:
def __init__(self, secret_key):
self.secret = secret_key.encode()
self.cipher = Fernet(Fernet.generate_key())
def generate_totp(self, user_id, secret):
"""Generate time-based one-time password"""
import pyotp
totp = pyotp.TOTP(secret)
return totp.now()
def verify_step_up(self, user_id, password, otp, device_id):
"""Multi-step verification"""
# 1. Password verification
stored_hash = self.get_password_hash(user_id)
if not self.verify_password(password, stored_hash):
return False, "Invalid password"
# 2. TOTP verification
if not self.verify_totp(user_id, otp):
return False, "Invalid OTP"
# 3. Device fingerprint
if not self.verify_device(user_id, device_id):
return False, "Unregistered device"
# 4. Risk analysis
risk_score = self.calculate_risk_score(user_id)
if risk_score > 0.8:
return False, "Suspicious activity detected"
return True, "Authentication successful"
def calculate_risk_score(self, user_id):
"""Risk-based adaptive authentication"""
risk_factors = []
# Check location change
current_location = self.get_user_location()
previous_location = self.get_previous_location(user_id)
if self.calculate_distance(current_location, previous_location) > 1000: # km
risk_factors.append(0.3) # 30% risk
# Check time anomaly
if not self.is_normal_access_time(user_id):
risk_factors.append(0.2)
# Check device
if not self.is_known_device(user_id):
risk_factors.append(0.4)
# Check network
if not self.is_safe_network(user_id):
risk_factors.append(0.25)
return sum(risk_factors) / len(risk_factors) if risk_factors else 0
Passwordless Authentication
from fastapi import FastAPI, HTTPException
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import base64
app = FastAPI()
class WebAuthnHandler:
"""WebAuthn (FIDO2) passwordless authentication"""
def __init__(self):
self.origins = ["https://example.com"]
self.rp_id = "example.com"
self.rp_name = "Example Corp"
def initiate_registration(self, user_id: str, username: str):
"""Start passwordless registration"""
import webauthn
from webauthn.helpers.structs import (
PublicKeyCredentialDescriptor,
ResidentKeyRequirement,
UserVerificationRequirement,
)
challenge = webauthn.generate_challenge(32)
registration_data = webauthn.generate_registration_options(
rp_id=self.rp_id,
rp_name=self.rp_name,
user_id=user_id,
user_name=username,
challenge=challenge,
authenticator_selection=webauthn.AuthenticatorSelectionCriteria(
authenticator_attachment="platform",
resident_key=ResidentKeyRequirement.PREFERRED,
user_verification=UserVerificationRequirement.REQUIRED,
),
)
# Store challenge temporarily
self.store_challenge(user_id, challenge)
return registration_data
def complete_registration(self, user_id: str, credential):
"""Complete passwordless registration"""
import webauthn
challenge = self.retrieve_challenge(user_id)
verified_credential = webauthn.verify_registration_response(
credential=credential,
expected_challenge=challenge,
expected_origin=self.origins[0],
expected_rp_id=self.rp_id,
)
# Store credential for user
self.store_credential(
user_id,
verified_credential.credential_id,
verified_credential.credential_public_key
)
return {"success": True}
def initiate_authentication(self, username: str):
"""Start passwordless authentication"""
import webauthn
challenge = webauthn.generate_challenge(32)
# Get registered credentials for user
user_credentials = self.get_user_credentials(username)
authentication_data = webauthn.generate_authentication_options(
rp_id=self.rp_id,
challenge=challenge,
allow_credentials=[
PublicKeyCredentialDescriptor(
type="public-key",
id=cred['credential_id']
)
for cred in user_credentials
],
user_verification=UserVerificationRequirement.REQUIRED,
)
self.store_challenge(username, challenge)
return authentication_data
def complete_authentication(self, username: str, assertion):
"""Complete passwordless authentication"""
import webauthn
challenge = self.retrieve_challenge(username)
credentials = self.get_user_credentials(username)
verified_assertion = webauthn.verify_authentication_response(
credential=assertion,
expected_challenge=challenge,
expected_origin=self.origins[0],
expected_rp_id=self.rp_id,
credential_public_key=credentials[0]['public_key'],
credential_current_sign_count=credentials[0]['sign_count'],
)
# Update sign count
self.update_sign_count(username, verified_assertion.sign_count)
return {"success": True, "user_id": username}
@app.post("/register/start")
async def start_registration(username: str):
handler = WebAuthnHandler()
return handler.initiate_registration(username, username)
@app.post("/auth/start")
async def start_auth(username: str):
handler = WebAuthnHandler()
return handler.initiate_authentication(username)
Network Segmentation
Microsegmentation
import boto3
from typing import List, Dict
class MicrosegmentationManager:
"""Implement microsegmentation in cloud environment"""
def __init__(self):
self.ec2 = boto3.client('ec2')
def create_isolation_groups(self, workloads: Dict[str, List[str]]):
"""Create security groups per workload isolation"""
# Database tier
db_sg = self.ec2.create_security_group(
GroupName='db-tier-isolated',
Description='Database tier - isolated'
)
# App tier
app_sg = self.ec2.create_security_group(
GroupName='app-tier-isolated',
Description='Application tier - isolated'
)
# Frontend tier
frontend_sg = self.ec2.create_security_group(
GroupName='frontend-tier-isolated',
Description='Frontend tier - isolated'
)
# Allow only necessary traffic
# Frontend -> App: 443 only
self.ec2.authorize_security_group_ingress(
GroupId=app_sg['GroupId'],
IpPermissions=[{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'UserIdGroupPairs': [{'GroupId': frontend_sg['GroupId']}]
}]
)
# App -> Database: 5432 (PostgreSQL) only
self.ec2.authorize_security_group_ingress(
GroupId=db_sg['GroupId'],
IpPermissions=[{
'IpProtocol': 'tcp',
'FromPort': 5432,
'ToPort': 5432,
'UserIdGroupPairs': [{'GroupId': app_sg['GroupId']}]
}]
)
# Deny all other traffic (implicit)
return {
'frontend': frontend_sg['GroupId'],
'app': app_sg['GroupId'],
'database': db_sg['GroupId']
}
def enforce_zero_trust_rules(self, sg_ids: Dict[str, str]):
"""Enforce zero trust at network level"""
# Block all traffic by default
for tier, sg_id in sg_ids.items():
# Revoke default allow-all rule
try:
self.ec2.revoke_security_group_egress(
GroupId=sg_id,
IpPermissions=[{
'IpProtocol': '-1',
'CidrIp': '0.0.0.0/0'
}]
)
except:
pass
# Only allow specific egress
if tier == 'frontend':
# Only to app tier
self.ec2.authorize_security_group_egress(
GroupId=sg_id,
IpPermissions=[{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'UserIdGroupPairs': [{'GroupId': sg_ids['app']}]
}]
)
elif tier == 'app':
# To database and external APIs
self.ec2.authorize_security_group_egress(
GroupId=sg_id,
IpPermissions=[
{
'IpProtocol': 'tcp',
'FromPort': 5432,
'ToPort': 5432,
'UserIdGroupPairs': [{'GroupId': sg_ids['database']}]
},
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'CidrIp': '0.0.0.0/0' # External APIs only
}
]
)
Network Access Control
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
@dataclass
class AccessPolicy:
source: str # User/service
resource: str # Target resource
action: str # Read/write/delete
conditions: dict # Time, location, device
expires_at: datetime
justification: str
class ZeroTrustPolicyEngine:
"""Policy engine for zero trust access"""
def __init__(self):
self.policies: List[AccessPolicy] = []
self.audit_log = []
def evaluate_access(self,
source: str,
resource: str,
action: str,
context: dict) -> tuple[bool, str]:
"""Evaluate access request against policies"""
# 1. Check time condition
current_time = datetime.now()
if not self.is_within_time_window(context.get('time')):
return False, "Access outside allowed time window"
# 2. Check location
if not self.is_allowed_location(source, context.get('location')):
return False, "Access from unauthorized location"
# 3. Check device
if not self.is_trusted_device(source, context.get('device_id')):
return False, "Unregistered device"
# 4. Check network
if not self.is_safe_network(context.get('network')):
return False, "Unsafe network detected"
# 5. Check policy exists and is valid
matching_policy = self.find_matching_policy(source, resource, action)
if not matching_policy:
return False, "No matching policy"
if matching_policy.expires_at < datetime.now():
return False, "Policy expired"
# 6. Log access
self.log_access(source, resource, action, "ALLOWED", context)
return True, "Access granted"
def request_access(self,
source: str,
resource: str,
action: str,
justification: str,
duration_hours: int = 1) -> Optional[AccessPolicy]:
"""Request just-in-time (JIT) access"""
# Verify request
if not self.verify_requester(source):
return None
# Create policy
policy = AccessPolicy(
source=source,
resource=resource,
action=action,
conditions={},
expires_at=datetime.now() + timedelta(hours=duration_hours),
justification=justification
)
# Check with approval workflow
if self.requires_approval(source, resource):
approval = self.request_approval(policy)
if not approval:
self.log_access(source, resource, action, "DENIED - APPROVAL REJECTED", {})
return None
# Grant access
self.policies.append(policy)
self.log_access(source, resource, action, "APPROVED", {})
return policy
def is_within_time_window(self, access_time: Optional[str]) -> bool:
"""Check if access time is within allowed window"""
if not access_time:
return False
current_hour = datetime.now().hour
# Allow access during business hours (9-17)
return 9 <= current_hour < 17
def find_matching_policy(self, source: str, resource: str, action: str) -> Optional[AccessPolicy]:
"""Find matching access policy"""
for policy in self.policies:
if (policy.source == source and
policy.resource == resource and
policy.action == action):
return policy
return None
def log_access(self, source, resource, action, result, context):
"""Audit log all access attempts"""
self.audit_log.append({
'timestamp': datetime.now(),
'source': source,
'resource': resource,
'action': action,
'result': result,
'context': context
})
Continuous Verification
Behavioral Analytics
import numpy as np
from typing import Dict, List
from collections import defaultdict
class BehavioralAnalytics:
"""Detect anomalous behavior in real-time"""
def __init__(self, baseline_window_days: int = 30):
self.baseline_window = baseline_window_days
self.user_profiles: Dict[str, dict] = defaultdict(lambda: {
'access_times': [],
'locations': [],
'resources': [],
'access_patterns': defaultdict(int)
})
self.anomaly_threshold = 0.85 # 85% confidence threshold
def build_baseline(self, user_id: str, historical_events: List[dict]):
"""Build user behavioral baseline"""
profile = self.user_profiles[user_id]
for event in historical_events:
profile['access_times'].append(event['timestamp'].hour)
profile['locations'].append(event['location'])
profile['resources'].append(event['resource'])
# Build access pattern (which resources accessed from which location)
key = f"{event['location']}::{event['resource']}"
profile['access_patterns'][key] += 1
return self._calculate_statistics(profile)
def detect_anomaly(self, user_id: str, event: dict) -> tuple[bool, float]:
"""Detect anomalous access in real-time"""
profile = self.user_profiles.get(user_id)
if not profile:
return True, 1.0 # Unknown user = anomaly
anomaly_scores = []
# 1. Time anomaly
time_score = self._score_time_anomaly(event['timestamp'].hour, profile)
anomaly_scores.append(time_score)
# 2. Location anomaly
location_score = self._score_location_anomaly(event['location'], profile)
anomaly_scores.append(location_score)
# 3. Resource anomaly
resource_score = self._score_resource_anomaly(event['resource'], profile)
anomaly_scores.append(resource_score)
# 4. Pattern anomaly
pattern_key = f"{event['location']}::{event['resource']}"
pattern_score = self._score_pattern_anomaly(pattern_key, profile)
anomaly_scores.append(pattern_score)
# Average anomaly score
avg_anomaly = np.mean(anomaly_scores)
is_anomaly = avg_anomaly > self.anomaly_threshold
if is_anomaly:
# Trigger adaptive authentication
return True, avg_anomaly
return False, avg_anomaly
def _score_time_anomaly(self, hour: int, profile: dict) -> float:
"""Score time-based anomaly (0-1, 1=anomaly)"""
access_times = profile['access_times']
if not access_times:
return 0.0
# Check if access hour is unusual
mean_hour = np.mean(access_times)
std_hour = np.std(access_times)
if std_hour == 0:
return 0.0 if hour == mean_hour else 1.0
z_score = abs((hour - mean_hour) / std_hour)
return min(1.0, z_score / 3) # Normalize to 0-1
def _score_location_anomaly(self, location: str, profile: dict) -> float:
"""Score location-based anomaly"""
locations = profile['locations']
if not locations:
return 0.0
known_locations = set(locations)
return 0.0 if location in known_locations else 1.0
def _score_resource_anomaly(self, resource: str, profile: dict) -> float:
"""Score resource access anomaly"""
resources = profile['resources']
if not resources:
return 0.0
known_resources = set(resources)
return 0.0 if resource in known_resources else 1.0
def _score_pattern_anomaly(self, pattern: str, profile: dict) -> float:
"""Score access pattern anomaly"""
patterns = profile['access_patterns']
if pattern in patterns:
# Known pattern
return 0.0
else:
# Unknown pattern
return 1.0
# Usage
analytics = BehavioralAnalytics()
# Build baseline
historical_events = [...] # 30 days of access logs
analytics.build_baseline('user123', historical_events)
# Monitor access
new_access = {'timestamp': datetime.now(), 'location': 'NYC', 'resource': 'database'}
is_anomaly, score = analytics.detect_anomaly('user123', new_access)
if is_anomaly:
print(f"Anomalous access detected (score: {score:.2f})")
# Trigger MFA, require approval, etc.
Putting It Together
class ZeroTrustAccessController:
"""Unified zero trust access control"""
def __init__(self):
self.authenticator = ZeroTrustAuthenticator(secret_key="...")
self.policy_engine = ZeroTrustPolicyEngine()
self.analytics = BehavioralAnalytics()
def grant_access(self, user_id: str, resource: str, action: str, context: dict) -> bool:
"""Unified access decision"""
# 1. Verify identity
auth_result, auth_msg = self.authenticator.verify_step_up(
user_id,
context.get('password'),
context.get('otp'),
context.get('device_id')
)
if not auth_result:
return False
# 2. Check behavioral anomaly
is_anomaly, anomaly_score = self.analytics.detect_anomaly(user_id, {
'timestamp': datetime.now(),
'location': context.get('location'),
'resource': resource
})
if is_anomaly and anomaly_score > 0.95:
# Require additional verification
print(f"High anomaly score ({anomaly_score}), requiring additional verification")
context['requires_approval'] = True
# 3. Check policy
can_access, policy_msg = self.policy_engine.evaluate_access(
user_id, resource, action, context
)
return can_access
Glossary
- Zero Trust: Security model verifying every access request
- Microsegmentation: Dividing network into isolated security zones
- JIT: Just-In-Time temporary access
- MFA: Multi-Factor Authentication
- RBAC: Role-Based Access Control
- WebAuthn: Passwordless authentication standard
Resources
- Zero Trust Architecture (NIST SP 800-207)
- Zero Trust Maturity Model
- BeyondCorp: A new approach to enterprise security
Comments