Skip to main content
โšก Calmops

AI Agent Governance: Policies, Controls,ce for Production Deployments

Introduction

Deploying AI ag’s the infrastructure that lets you scale AI safely.

This guide covers practical governance: the policies you need, the technical controls that enforce them, and how to implement governance-as-code so it doesn’t slow you down.

What Governance Actually Means

licies + Controls + Monitoring + Accountability

gents are allowed to do
Controls:  Technical enforcement of policies
Monitoring: Detecting when policies are violated
Accountability: Who is responsible when things go wrong

Classification: Start Here

fore writing policies, classify your agents by risk:

from enum import Enum
from dataclasses import dataclass

class RiskLevel(Enum):
    LOW    = "low"     # Read-only, no external actions
ions
    HIGH   = "high"    # Financial, medical, legal, or irreversible actions
    CRITICAL = "critical"  # Autonomous decisions affecting many people

@dataclass
class AgentRiskProfile:
tr
    risk_level: RiskLevel
 bool
    can_send_external_messages: bool
    can_execute_code: bool
    can_make_financial_transactions: bool
    max_transaction_amount: float
    requires_human_approval: bool
g > 100 users"

# Examples
AGENT_PROFILES = {
skProfile(
        name="Knowledge Base Q&A",
vel=RiskLevel.LOW,
        can_write_data=False,
        can_send_external_messages=False,
        can_execute_code=False,

        max_transaction_amount=0,
        requires_human_approval=False,
        approval_threshold="never",
    ),
    "customer-support": AgentRiskProfile(
tomer Support Agent",
        risk_level=RiskLevel.MEDIUM,

        can_send_external_messages=True,
 can_execute_code=False,
        can_make_financial_transactilse,
        max_transaction_amount=0,
        requires_human_approval=True,
l_threshold="refunds > $100",
    ),
   "financial-advisor": AgentRiskProfile(
        name="Financial Advisory Agent",
_level=RiskLevel.HIGH,
        can_write_data=True,
 can_send_external_messages=True,
        can_execute_code=False,
make_financial_transactions=True,
        max_transaction_amount=1000.0,
        requires_human_approval=True,
        approval_threshold="all transactions",
,
}

Policy Definition as Code

Define policies in code, not documents:

from typing import Callable
from dataclasses import dataclass, field
import json
import time

@dataclass
class Policy:
 name: str
    description: str
s
    check: Callable  # returns (allowed: bool, reason: str)

@dataclass
class PolicyViolation:
    policy_name: str
    agent_name: str
    action: str
tr
    timestamp: float = field(default_factory=time.time)
edium, high, critical

class PolicyEngine:
    def __init__(self):
        self.policies: list[Policy] = []
        self.violations: list[PolicyViolation] = []

    def register(self, policy: Policy):
        self.policies.append(policy)

tr, context: dict) -> tuple[bool, list[str]]:
        """Evaluate all applicable policies. Returns (allowed, reasons)."""
        reasons = []
        allowed = True

        for policy in self.policies:
:
                is_allowed, reason = policy.check(agent_name, action, context)
                if not is_allowed:
                    allowed = False
           reasons.append(f"{policy.name}: {reason}")
                    self.violations.append(PolicyViolation(
cy_name=policy.name,
                        agname=agent_name,
n=action,
                        reason=reason,
                    ))

ns


# Define concrete policies
licyEngine()

essages
engine.register(Policy(
l",
    description="Agents cannot send PII to external services",
ies_to=["all"],
    check=lambda agent, action, ctx: (
f action != "send_external_message"
        else (not contains_pii(ctx.get("message", "")),
ally")
    )
))

n limits
engine.register(Policy(
,
    description="Agents cannot execute transactions above their limit",

    check=lambda agent, action, ctx: (

        else (ctx.get("amount", 0) <= AGENT_PROFILES[agent].max_transaction_amount,
              f"Transaction ${ctx.get('amount')} exceeds limit ${AGENT_PROFILES[agent].max_transaction_amount}")
    )
))

# Policy 3: Business hours for high-risk actions
engine.register(Policy(
    name="business-hours-only",
    descriptiss hours",
    applies_to=["financial-advisor"],
    check=lambda agent, action, ctx: (
(True, "OK") if action not in ["execute_transaction", "send_contract"]
        else (is_business_hours(), "High-risk actions only allowed during business hours (9am-5pm UTC)")
thropic: Responsible Scaling Policy](https://www.anthropic.com/index.php/core-views-on-ai-safety)
-applications/)
- [Anttps://owasp.org/www-project-top-10-for-large-language-modelrtificialintelligenceact.eu/)
- [NIST AI Risk Management Framework](https://www.nist.gov/itl/ai-risk-management-framework)
- [OWASP LLM Top 10](hs reviewed
  [ ] Incident response plan documented

Resources

  • [EU AI Act Official Text](https://a [ ] Alerts for unusual behavior [ ] Dashboard for governance health

Compliance: [ ] EU AI Act requirements assessed (if applicable) [ ] GDPR data handling documented [ ] Sector-specific requirementfor high-risk actions [ ] Rate limiting on tool calls [ ] Output filtering for sensitive data

Monitoring: [ ] Prometheus metrics for policy violations [ ] Policy engine implemented and tested [ ] Audit logging enabled (min 6 months retention) [ ] Human approval gate d human approval requirements [ ] Documented data access restrictions

Technical Controls: its [ ] Specifie [ ] Defined allowed tools and actions [ ] Set transaction/action limPolicy Definition: ure modes [ ] Identified affected stakeholders

/medium/high/critical) [ ] Documented potential failploying any AI agent:

Risk Assessment: [ ] Classified agent risk level (low

Before deool access"
"""

Governance Checklist

  severity: critical
    annotations:
      summary: "Agent attempted unauthorized t0
    for: 1m
    labels:
olations_total{policy_name="tool-access"} > ystem"

  - alert: UnauthorizedToolAccess
    expr: ai_agent_policy_viion s requests timing out โ€” check notificat        annotations:
      summary: "Many approval labels:
      severity: warning

out expr: rate(ai_agent_approval_requests_total{status=“timeout”}[1h]) > 5 for: 10m agent_name }} has high policy violation rate"

  - alert: ApprovalTimes:
      severity: warning
    annotations:
      summary: "Agent {{ $labels.
    expr: rate(ai_agent_policy_violations_total[5m]) > 0.1
    for: 5m
    labelULES = """

groups:

  • name: ai-governance rules:
    • alert: HighPolicyViolationRatetal", “Total agent actions”, [“agent_name”, “action”, “allowed”] )

Alert rules (Prometheus)

ALERT_RCounter( “ai_agent_actions_to, rejected, timeout )

agent_actions = [“agent_name”, “status”] # approved “Total human approval requests”, sts = Counter( “ai_agent_approval_requests_total”, s”, [“agent_name”, “policy_name”, “severity”] )

approval_requeions = Counter( “ai_agent_policy_violations_total”, “Total policy violationt Counter, Histogram, Gauge

policy_violat for governance from prometheus_client impornance Dashboard

Monitor governance health across all agents:

# Prometheus metrics],
    }

Govert vems() if no checks.itecks”: checks,

    "gaps": [k for k, v int": passed == total,
    "ch_score": f"{passed}/{total}",
    "complian
    "agent": agent_name,
    "complianceing,
}

passed = sum(checks.values())
total = len(checks)

return {trics) and compliance.adversarial_test),
    "accuracy": bool(compliance.accuracy_mebility and bool(compliance.human_oversight_measuresformed_of_ai,
    "human_oversight": compliance.override_capa   "transparency": compliance.users_iniance.log_retention_days >= 180,
 stem_description and compliance.intended_purpose),
    "record_keeping": compliance.audit_logs_enabled and complompleted,
    "documentation": bool(compliance.sy   "data_governance": compliance.training_data_documented and compliance.bias_testing_cd and compliance.residual_risks_documented,
 k_management": compliance.risk_assessment_completect compliance report."""

checks = {
    "ris -> dict:
"""Generate EU AI Aame: str, compliance: EUAIActCompliance)g: bool = False

def generate_compliance_report(agent_ny and robustness accuracy_metrics: dict = None adversarial_testiny: bool = False

# Article 15: Accurac = None
override_capabilit14: Human oversight
human_oversight_measures: list[str]xt: str = ""

# Article ers_informed_of_ai: bool = False
ai_disclosure_teisk

# Article 13: Transparency
us: Record-keeping
audit_logs_enabled: bool = False
log_retention_days: int = 0  # minimum 6 months for high-r
system_description: str = ""
intended_purpose: str = ""
performance_metrics: dict = None

# Article 12l = False

# Article 11: Technical documentation: bool = False
data_quality_measures: bool = False
bias_testing_completed: booresidual_risks_documented: bool = False

# Article 10: Data governance
training_data_documented
risk_assessment_date: str = ""
Act compliance requirements."""

# Article 9: Risk management system
risk_assessment_completed: bool = Falseaclasses import dataclass

@dataclass class EUAIActCompliance: “““Track EU AI

The EU AI Act (effective 2026) requires specific controls for high-risk AI systems:

from datecute the action
    return await execute_action(action, context)

EU AI Act Compliance was {status.value}”)

# Ex           raise PermissionError(f"Action {action}f status != ApprovalStatus.APPROVED:

tion, context=context, approver_email=“[email protected]”, )

    iaction_id=action_id,
        agent_name=agent_name,
        action=ac:
    action_id = str(uuid.uuid4())
    status = await approval_gate.request_approval(
        dict):
profile = AGENT_PROFILES[agent_name]

if profile.requires_human_approvalpproval(agent_name: str, action: str, context: econds=300)

async def execute_with_aal_gate = HumanApprovalGate(notification_service, timeout_sending[action_id]

Usage in agent

approvf.pending[action_id].set_result(status) del self.palStatus.REJECTED sel status = ApprovalStatus.APPROVED if approved else Approvs or rejects.””" if action_id in self.pending: “““Called when human approvetion_id: str, approved: bool): id] return ApprovalStatus.TIMEOUT

def resolve(self, ac   del self.pending[action_status
    except asyncio.TimeoutError:
     out=self.timeout)
        return Wait for approval with timeout
        status = await asyncio.wait_for(future, timeion_id,
    )

    try:
        # nutes.
        """,
        action_id=actres in {self.timeout // 60} mis://admin.example.com/reject/{action_id}

This request expiove: https://admin.example.com/approve/{action_id} Reject: http Agent: {agent_name} Action: {action} Context: {json.dumps(context, indent=2)}

Appr to {action}”, body=f”““pprover_email, subject=f"Approval Required: {agent_name} wantslf.notifier.send_approval_request( to=aeate_future() self.pending[action_id] = future

    # Notify the approver
    await sesponds
    future = asyncio.get_event_loop().cr approval and wait for response."""

    # Create a future that will be resolved when human reuest human    ) -> ApprovalStatus:
    """Req str,
    action: str,
    context: dict,
    approver_email: str,

ture] = {}

async def request_approval(
    self,
    action_id: str,
    agent_name:ending: dict[str, asyncio.Fu     self.timeout = timeout_seconds
    self.ptifier = notification_service

, timeout_seconds: int = 300): self.no

def __init__(self, notification_service""Pause agent execution and wait for human approval."""ding"
APPROVED = "approved"
REJECTED = "rejected"
TIMEOUT  = "timeout"

class HumanApprovalGate: “hon import asyncio from enum import Enum

class ApprovalStatus(Enum): PENDING = “pen actions, require human approval:

                raise

        return wrapper
    return decorator

Human-in-the-Loop Controls

For high-risks=0, output_tokens=0, metadata={“violation_reason”: e.reason} model=””, input_tokenuration_ms=(time.time() - start) * 1000, _name, “result”: “blocked”}], allowed=False, d policy_checks=[{“policy”: e.policyNone, tool_result=None, tool_name=None, tool_args= action=func.name, .get(“user_id”), “, “unknown”), user_id=kwargsn, session_id=kwargs.get(“session_ident_name, agent_version=versiot, agent_name=ag event_id=event_id, timestamp=star audit_logger.log(AuditEvent( except PolicyViolation as e: return result

          metadata={}
            ))

kwargs.get(“output_tokens”, 0), t_tokens”, 0), output_tokens= input_tokens=kwargs.get(“inpu model=kwargs.get(“model”, “unknown”), olicy_checks=[], allowed=True, duration_ms=duration, one, presult=str(result)[:500] if result else Ns”), tool_ tool_args=kwargs.get(“tool_arg tool_name=kwargs.get(“tool_name”),

                action=func.__name__,
                      user_id=kwargs.get("user_id"),kwargs.get("session_id", "unknown"),
                 session_id=,

me, agent_version=version agent_name=agent_na timestamp=start, g(AuditEvent( event_id=event_id, - start) * 1000

            audit_logger.lo**kwargs)
            duration = (time.time()           try:
            result = await func(*args,        event_id = str(uuid.uuid4())

**kwargs): start = time.time() nc def wrapper(*args, str): def decorator(func): asy def audited(agent_name: str, version: message”: json.dumps(record) }] )

Decorator for automatic audit logging “nt(event.timestamp * 1000),

        ogEvents=[{
                "timestamp": igStreamName=event.agent_name,
            lroupName="/ai-agents/audit",
            lologs.put_log_events(
            logGto3
        logs = boto3.client("logs")
        f.backend == "cloudwatch":
        import boES_TOKEN}"}
        )

    elif sel      headers={"Authorization": f"Bearer {   _doc",
            json=record,
   {ES_URL}/ai-audit-{time.strftime('%Y.%m')}/   httpx.post(
            f"earchable audit trail
        import httpx
         # Send to Elasticsearch for s":
     self.backend == "elasticsearch.dumps(record) + "\n")

    elif with open(self.log_path, "a") as f:
            f.write(jsonf self.backend == "file":
       ent):
    record = asdict(event)

    ilog_path

def log(self, event: AuditEvt.jsonl"):
    self.backend = backend
    self.log_path = 
output_tokens: int
metadata: dict

class AuditLogger: def init(self, backend=“file”, log_path="/var/log/ai-agents/audition_ms: float model: str input_tokens: intn_id: str user_id: str | None action: str tool_name: str | None tool_args: dict | None tool_result: str | None policy_checks: list[dict] allowed: bool dura @dataclass class AuditEvent: event_id: str timestamp: float agent_name: str agent_version: str sessioaclasses import dataclass, asdict from typing import Any must be logged for compliance and debugging:

import json
import uuid
import time
from dat
    print(f"Action blocked: {reasons}")

Audit Logging

Every agent action action=“execute_transaction”, context={“amount”: 5000, “recipient”: “[email protected]”} )

if not allowed:.evaluate( agent_name=“financial-advisor”, ) ))

Usage

allowed, reasons = engine

Comments