Introduction
Deploying AI ag’s the infrastructure that lets you scale AI safely.
This guide covers practical governance: the policies you need, the technical controls that enforce them, and how to implement governance-as-code so it doesn’t slow you down.
What Governance Actually Means
licies + Controls + Monitoring + Accountability
gents are allowed to do
Controls: Technical enforcement of policies
Monitoring: Detecting when policies are violated
Accountability: Who is responsible when things go wrong
Classification: Start Here
fore writing policies, classify your agents by risk:
from enum import Enum
from dataclasses import dataclass
class RiskLevel(Enum):
LOW = "low" # Read-only, no external actions
ions
HIGH = "high" # Financial, medical, legal, or irreversible actions
CRITICAL = "critical" # Autonomous decisions affecting many people
@dataclass
class AgentRiskProfile:
tr
risk_level: RiskLevel
bool
can_send_external_messages: bool
can_execute_code: bool
can_make_financial_transactions: bool
max_transaction_amount: float
requires_human_approval: bool
g > 100 users"
# Examples
AGENT_PROFILES = {
skProfile(
name="Knowledge Base Q&A",
vel=RiskLevel.LOW,
can_write_data=False,
can_send_external_messages=False,
can_execute_code=False,
max_transaction_amount=0,
requires_human_approval=False,
approval_threshold="never",
),
"customer-support": AgentRiskProfile(
tomer Support Agent",
risk_level=RiskLevel.MEDIUM,
can_send_external_messages=True,
can_execute_code=False,
can_make_financial_transactilse,
max_transaction_amount=0,
requires_human_approval=True,
l_threshold="refunds > $100",
),
"financial-advisor": AgentRiskProfile(
name="Financial Advisory Agent",
_level=RiskLevel.HIGH,
can_write_data=True,
can_send_external_messages=True,
can_execute_code=False,
make_financial_transactions=True,
max_transaction_amount=1000.0,
requires_human_approval=True,
approval_threshold="all transactions",
,
}
Policy Definition as Code
Define policies in code, not documents:
from typing import Callable
from dataclasses import dataclass, field
import json
import time
@dataclass
class Policy:
name: str
description: str
s
check: Callable # returns (allowed: bool, reason: str)
@dataclass
class PolicyViolation:
policy_name: str
agent_name: str
action: str
tr
timestamp: float = field(default_factory=time.time)
edium, high, critical
class PolicyEngine:
def __init__(self):
self.policies: list[Policy] = []
self.violations: list[PolicyViolation] = []
def register(self, policy: Policy):
self.policies.append(policy)
tr, context: dict) -> tuple[bool, list[str]]:
"""Evaluate all applicable policies. Returns (allowed, reasons)."""
reasons = []
allowed = True
for policy in self.policies:
:
is_allowed, reason = policy.check(agent_name, action, context)
if not is_allowed:
allowed = False
reasons.append(f"{policy.name}: {reason}")
self.violations.append(PolicyViolation(
cy_name=policy.name,
agname=agent_name,
n=action,
reason=reason,
))
ns
# Define concrete policies
licyEngine()
essages
engine.register(Policy(
l",
description="Agents cannot send PII to external services",
ies_to=["all"],
check=lambda agent, action, ctx: (
f action != "send_external_message"
else (not contains_pii(ctx.get("message", "")),
ally")
)
))
n limits
engine.register(Policy(
,
description="Agents cannot execute transactions above their limit",
check=lambda agent, action, ctx: (
else (ctx.get("amount", 0) <= AGENT_PROFILES[agent].max_transaction_amount,
f"Transaction ${ctx.get('amount')} exceeds limit ${AGENT_PROFILES[agent].max_transaction_amount}")
)
))
# Policy 3: Business hours for high-risk actions
engine.register(Policy(
name="business-hours-only",
descriptiss hours",
applies_to=["financial-advisor"],
check=lambda agent, action, ctx: (
(True, "OK") if action not in ["execute_transaction", "send_contract"]
else (is_business_hours(), "High-risk actions only allowed during business hours (9am-5pm UTC)")
thropic: Responsible Scaling Policy](https://www.anthropic.com/index.php/core-views-on-ai-safety)
-applications/)
- [Anttps://owasp.org/www-project-top-10-for-large-language-modelrtificialintelligenceact.eu/)
- [NIST AI Risk Management Framework](https://www.nist.gov/itl/ai-risk-management-framework)
- [OWASP LLM Top 10](hs reviewed
[ ] Incident response plan documented
Resources
- [EU AI Act Official Text](https://a [ ] Alerts for unusual behavior [ ] Dashboard for governance health
Compliance: [ ] EU AI Act requirements assessed (if applicable) [ ] GDPR data handling documented [ ] Sector-specific requirementfor high-risk actions [ ] Rate limiting on tool calls [ ] Output filtering for sensitive data
Monitoring: [ ] Prometheus metrics for policy violations [ ] Policy engine implemented and tested [ ] Audit logging enabled (min 6 months retention) [ ] Human approval gate d human approval requirements [ ] Documented data access restrictions
Technical Controls: its [ ] Specifie [ ] Defined allowed tools and actions [ ] Set transaction/action limPolicy Definition: ure modes [ ] Identified affected stakeholders
/medium/high/critical) [ ] Documented potential failploying any AI agent:
Risk Assessment: [ ] Classified agent risk level (low
Before deool access"
"""
Governance Checklist
severity: critical
annotations:
summary: "Agent attempted unauthorized t0
for: 1m
labels:
olations_total{policy_name="tool-access"} > ystem"
- alert: UnauthorizedToolAccess
expr: ai_agent_policy_viion s requests timing out โ check notificat annotations:
summary: "Many approval labels:
severity: warning
out expr: rate(ai_agent_approval_requests_total{status=“timeout”}[1h]) > 5 for: 10m agent_name }} has high policy violation rate"
- alert: ApprovalTimes:
severity: warning
annotations:
summary: "Agent {{ $labels.
expr: rate(ai_agent_policy_violations_total[5m]) > 0.1
for: 5m
labelULES = """
groups:
- name: ai-governance
rules:
- alert: HighPolicyViolationRatetal", “Total agent actions”, [“agent_name”, “action”, “allowed”] )
Alert rules (Prometheus)
ALERT_RCounter( “ai_agent_actions_to, rejected, timeout )
agent_actions = [“agent_name”, “status”] # approved “Total human approval requests”, sts = Counter( “ai_agent_approval_requests_total”, s”, [“agent_name”, “policy_name”, “severity”] )
approval_requeions = Counter( “ai_agent_policy_violations_total”, “Total policy violationt Counter, Histogram, Gauge
policy_violat for governance from prometheus_client impornance Dashboard
Monitor governance health across all agents:
# Prometheus metrics],
}
Govert vems() if no checks.itecks”: checks,
"gaps": [k for k, v int": passed == total,
"ch_score": f"{passed}/{total}",
"complian
"agent": agent_name,
"complianceing,
}
passed = sum(checks.values())
total = len(checks)
return {trics) and compliance.adversarial_test),
"accuracy": bool(compliance.accuracy_mebility and bool(compliance.human_oversight_measuresformed_of_ai,
"human_oversight": compliance.override_capa "transparency": compliance.users_iniance.log_retention_days >= 180,
stem_description and compliance.intended_purpose),
"record_keeping": compliance.audit_logs_enabled and complompleted,
"documentation": bool(compliance.sy "data_governance": compliance.training_data_documented and compliance.bias_testing_cd and compliance.residual_risks_documented,
k_management": compliance.risk_assessment_completect compliance report."""
checks = {
"ris -> dict:
"""Generate EU AI Aame: str, compliance: EUAIActCompliance)g: bool = False
def generate_compliance_report(agent_ny and robustness accuracy_metrics: dict = None adversarial_testiny: bool = False
# Article 15: Accurac = None
override_capabilit14: Human oversight
human_oversight_measures: list[str]xt: str = ""
# Article ers_informed_of_ai: bool = False
ai_disclosure_teisk
# Article 13: Transparency
us: Record-keeping
audit_logs_enabled: bool = False
log_retention_days: int = 0 # minimum 6 months for high-r
system_description: str = ""
intended_purpose: str = ""
performance_metrics: dict = None
# Article 12l = False
# Article 11: Technical documentation: bool = False
data_quality_measures: bool = False
bias_testing_completed: booresidual_risks_documented: bool = False
# Article 10: Data governance
training_data_documented
risk_assessment_date: str = ""
Act compliance requirements."""
# Article 9: Risk management system
risk_assessment_completed: bool = Falseaclasses import dataclass
@dataclass class EUAIActCompliance: “““Track EU AI
The EU AI Act (effective 2026) requires specific controls for high-risk AI systems:
from datecute the action
return await execute_action(action, context)
EU AI Act Compliance was {status.value}”)
# Ex raise PermissionError(f"Action {action}f status != ApprovalStatus.APPROVED:
tion, context=context, approver_email=“[email protected]”, )
iaction_id=action_id,
agent_name=agent_name,
action=ac:
action_id = str(uuid.uuid4())
status = await approval_gate.request_approval(
dict):
profile = AGENT_PROFILES[agent_name]
if profile.requires_human_approvalpproval(agent_name: str, action: str, context: econds=300)
async def execute_with_aal_gate = HumanApprovalGate(notification_service, timeout_sending[action_id]
Usage in agent
approvf.pending[action_id].set_result(status) del self.palStatus.REJECTED sel status = ApprovalStatus.APPROVED if approved else Approvs or rejects.””" if action_id in self.pending: “““Called when human approvetion_id: str, approved: bool): id] return ApprovalStatus.TIMEOUT
def resolve(self, ac del self.pending[action_status
except asyncio.TimeoutError:
out=self.timeout)
return Wait for approval with timeout
status = await asyncio.wait_for(future, timeion_id,
)
try:
# nutes.
""",
action_id=actres in {self.timeout // 60} mis://admin.example.com/reject/{action_id}
This request expiove: https://admin.example.com/approve/{action_id} Reject: http Agent: {agent_name} Action: {action} Context: {json.dumps(context, indent=2)}
Appr to {action}”, body=f”““pprover_email, subject=f"Approval Required: {agent_name} wantslf.notifier.send_approval_request( to=aeate_future() self.pending[action_id] = future
# Notify the approver
await sesponds
future = asyncio.get_event_loop().cr approval and wait for response."""
# Create a future that will be resolved when human reuest human ) -> ApprovalStatus:
"""Req str,
action: str,
context: dict,
approver_email: str,
ture] = {}
async def request_approval(
self,
action_id: str,
agent_name:ending: dict[str, asyncio.Fu self.timeout = timeout_seconds
self.ptifier = notification_service
, timeout_seconds: int = 300): self.no
def __init__(self, notification_service""Pause agent execution and wait for human approval."""ding"
APPROVED = "approved"
REJECTED = "rejected"
TIMEOUT = "timeout"
class HumanApprovalGate: “hon import asyncio from enum import Enum
class ApprovalStatus(Enum): PENDING = “pen actions, require human approval:
raise
return wrapper
return decorator
Human-in-the-Loop Controls
For high-risks=0, output_tokens=0, metadata={“violation_reason”: e.reason} model=””, input_tokenuration_ms=(time.time() - start) * 1000, _name, “result”: “blocked”}], allowed=False, d policy_checks=[{“policy”: e.policyNone, tool_result=None, tool_name=None, tool_args= action=func.name, .get(“user_id”), “, “unknown”), user_id=kwargsn, session_id=kwargs.get(“session_ident_name, agent_version=versiot, agent_name=ag event_id=event_id, timestamp=star audit_logger.log(AuditEvent( except PolicyViolation as e: return result
metadata={}
))
kwargs.get(“output_tokens”, 0), t_tokens”, 0), output_tokens= input_tokens=kwargs.get(“inpu model=kwargs.get(“model”, “unknown”), olicy_checks=[], allowed=True, duration_ms=duration, one, presult=str(result)[:500] if result else Ns”), tool_ tool_args=kwargs.get(“tool_arg tool_name=kwargs.get(“tool_name”),
action=func.__name__,
user_id=kwargs.get("user_id"),kwargs.get("session_id", "unknown"),
session_id=,
me, agent_version=version agent_name=agent_na timestamp=start, g(AuditEvent( event_id=event_id, - start) * 1000
audit_logger.lo**kwargs)
duration = (time.time() try:
result = await func(*args, event_id = str(uuid.uuid4())
**kwargs): start = time.time() nc def wrapper(*args, str): def decorator(func): asy def audited(agent_name: str, version: message”: json.dumps(record) }] )
Decorator for automatic audit logging “nt(event.timestamp * 1000),
ogEvents=[{
"timestamp": igStreamName=event.agent_name,
lroupName="/ai-agents/audit",
lologs.put_log_events(
logGto3
logs = boto3.client("logs")
f.backend == "cloudwatch":
import boES_TOKEN}"}
)
elif sel headers={"Authorization": f"Bearer { _doc",
json=record,
{ES_URL}/ai-audit-{time.strftime('%Y.%m')}/ httpx.post(
f"earchable audit trail
import httpx
# Send to Elasticsearch for s":
self.backend == "elasticsearch.dumps(record) + "\n")
elif with open(self.log_path, "a") as f:
f.write(jsonf self.backend == "file":
ent):
record = asdict(event)
ilog_path
def log(self, event: AuditEvt.jsonl"):
self.backend = backend
self.log_path =
output_tokens: int
metadata: dict
class AuditLogger: def init(self, backend=“file”, log_path="/var/log/ai-agents/audition_ms: float model: str input_tokens: intn_id: str user_id: str | None action: str tool_name: str | None tool_args: dict | None tool_result: str | None policy_checks: list[dict] allowed: bool dura @dataclass class AuditEvent: event_id: str timestamp: float agent_name: str agent_version: str sessioaclasses import dataclass, asdict from typing import Any must be logged for compliance and debugging:
import json
import uuid
import time
from dat
print(f"Action blocked: {reasons}")
Audit Logging
Every agent action action=“execute_transaction”, context={“amount”: 5000, “recipient”: “[email protected]”} )
if not allowed:.evaluate( agent_name=“financial-advisor”, ) ))
Usage
allowed, reasons = engine
Comments