Skip to main content

AI Agent Governance: Policies, Controls,ce for Production Deployments

Created: March 2, 2026 Larry Qu 7 min read

Introduction

Deploying AI ag’s the infrastructure that lets you scale AI safely.

This guide covers practical governance: the policies you need, the technical controls that enforce them, and how to implement governance-as-code so it doesn’t slow you down.

What Governance Actually Means

licies + Controls + Monitoring + Accountability

gents are allowed to do
Controls:  Technical enforcement of policies
Monitoring: Detecting when policies are violated
Accountability: Who is responsible when things go wrong

Classification: Start Here

fore writing policies, classify your agents by risk:

from enum import Enum
from dataclasses import dataclass

class RiskLevel(Enum):
    LOW    = "low"     # Read-only, no external actions
ions
    HIGH   = "high"    # Financial, medical, legal, or irreversible actions
    CRITICAL = "critical"  # Autonomous decisions affecting many people

@dataclass
class AgentRiskProfile:
tr
    risk_level: RiskLevel
 bool
    can_send_external_messages: bool
    can_execute_code: bool
    can_make_financial_transactions: bool
    max_transaction_amount: float
    requires_human_approval: bool
g > 100 users"

# Examples
AGENT_PROFILES = {
skProfile(
        name="Knowledge Base Q&A",
vel=RiskLevel.LOW,
        can_write_data=False,
        can_send_external_messages=False,
        can_execute_code=False,

        max_transaction_amount=0,
        requires_human_approval=False,
        approval_threshold="never",
    ),
    "customer-support": AgentRiskProfile(
tomer Support Agent",
        risk_level=RiskLevel.MEDIUM,

        can_send_external_messages=True,
 can_execute_code=False,
        can_make_financial_transactilse,
        max_transaction_amount=0,
        requires_human_approval=True,
l_threshold="refunds > $100",
    ),
   "financial-advisor": AgentRiskProfile(
        name="Financial Advisory Agent",
_level=RiskLevel.HIGH,
        can_write_data=True,
 can_send_external_messages=True,
        can_execute_code=False,
make_financial_transactions=True,
        max_transaction_amount=1000.0,
        requires_human_approval=True,
        approval_threshold="all transactions",
,
}

Policy Definition as Code

Define policies in code, not documents:

from typing import Callable
from dataclasses import dataclass, field
import json
import time

@dataclass
class Policy:
 name: str
    description: str
s
    check: Callable  # returns (allowed: bool, reason: str)

@dataclass
class PolicyViolation:
    policy_name: str
    agent_name: str
    action: str
tr
    timestamp: float = field(default_factory=time.time)
edium, high, critical

class PolicyEngine:
    def __init__(self):
        self.policies: list[Policy] = []
        self.violations: list[PolicyViolation] = []

    def register(self, policy: Policy):
        self.policies.append(policy)

tr, context: dict) -> tuple[bool, list[str]]:
        """Evaluate all applicable policies. Returns (allowed, reasons)."""
        reasons = []
        allowed = True

        for policy in self.policies:
:
                is_allowed, reason = policy.check(agent_name, action, context)
                if not is_allowed:
                    allowed = False
           reasons.append(f"{policy.name}: {reason}")
                    self.violations.append(PolicyViolation(
cy_name=policy.name,
                        agname=agent_name,
n=action,
                        reason=reason,
                    ))

ns


# Define concrete policies
licyEngine()

essages
engine.register(Policy(
l",
    description="Agents cannot send PII to external services",
ies_to=["all"],
    check=lambda agent, action, ctx: (
f action != "send_external_message"
        else (not contains_pii(ctx.get("message", "")),
ally")
    )
))

n limits
engine.register(Policy(
,
    description="Agents cannot execute transactions above their limit",

    check=lambda agent, action, ctx: (

        else (ctx.get("amount", 0) <= AGENT_PROFILES[agent].max_transaction_amount,
              f"Transaction ${ctx.get('amount')} exceeds limit ${AGENT_PROFILES[agent].max_transaction_amount}")
    )
))

# Policy 3: Business hours for high-risk actions
engine.register(Policy(
    name="business-hours-only",
    descriptiss hours",
    applies_to=["financial-advisor"],
    check=lambda agent, action, ctx: (
(True, "OK") if action not in ["execute_transaction", "send_contract"]
        else (is_business_hours(), "High-risk actions only allowed during business hours (9am-5pm UTC)")
thropic: Responsible Scaling Policy](https://www.anthropic.com/index.php/core-views-on-ai-safety)
-applications/)
- [Anttps://owasp.org/www-project-top-10-for-large-language-modelrtificialintelligenceact.eu/)
- [NIST AI Risk Management Framework](https://www.nist.gov/itl/ai-risk-management-framework)
- [OWASP LLM Top 10](hs reviewed
  [ ] Incident response plan documented

Resources

  • [EU AI Act Official Text](https://a [ ] Alerts for unusual behavior [ ] Dashboard for governance health

Compliance: [ ] EU AI Act requirements assessed (if applicable) [ ] GDPR data handling documented [ ] Sector-specific requirementfor high-risk actions [ ] Rate limiting on tool calls [ ] Output filtering for sensitive data

Monitoring: [ ] Prometheus metrics for policy violations [ ] Policy engine implemented and tested [ ] Audit logging enabled (min 6 months retention) [ ] Human approval gate d human approval requirements [ ] Documented data access restrictions

Technical Controls: its [ ] Specifie [ ] Defined allowed tools and actions [ ] Set transaction/action limPolicy Definition: ure modes [ ] Identified affected stakeholders

/medium/high/critical) [ ] Documented potential failploying any AI agent:

Risk Assessment: [ ] Classified agent risk level (low

Before deool access"
"""

Governance Checklist

  severity: critical
    annotations:
      summary: "Agent attempted unauthorized t0
    for: 1m
    labels:
olations_total{policy_name="tool-access"} > ystem"

  - alert: UnauthorizedToolAccess
    expr: ai_agent_policy_viion s requests timing out — check notificat        annotations:
      summary: "Many approval labels:
      severity: warning

out expr: rate(ai_agent_approval_requests_total{status=“timeout”}[1h]) > 5 for: 10m agent_name }} has high policy violation rate"

  - alert: ApprovalTimes:
      severity: warning
    annotations:
      summary: "Agent {{ $labels.
    expr: rate(ai_agent_policy_violations_total[5m]) > 0.1
    for: 5m
    labelULES = """

groups:

  • name: ai-governance rules:
    • alert: HighPolicyViolationRatetal", “Total agent actions”, [“agent_name”, “action”, “allowed”] )

Alert rules (Prometheus)

ALERT_RCounter( “ai_agent_actions_to, rejected, timeout )

agent_actions = [“agent_name”, “status”] # approved “Total human approval requests”, sts = Counter( “ai_agent_approval_requests_total”, s”, [“agent_name”, “policy_name”, “severity”] )

approval_requeions = Counter( “ai_agent_policy_violations_total”, “Total policy violationt Counter, Histogram, Gauge

policy_violat for governance from prometheus_client impornance Dashboard

Monitor governance health across all agents:

# Prometheus metrics],
    }

Govert vems() if no checks.itecks”: checks

    "gaps": [k for k, v int": passed == total,
    "ch_score": f"{passed}/{total}",
    "complian
    "agent": agent_name,
    "complianceing,
}

passed = sum(checks.values())
total = len(checks)

return {trics) and compliance.adversarial_test),
    "accuracy": bool(compliance.accuracy_mebility and bool(compliance.human_oversight_measuresformed_of_ai,
    "human_oversight": compliance.override_capa   "transparency": compliance.users_iniance.log_retention_days >= 180,
 stem_description and compliance.intended_purpose),
    "record_keeping": compliance.audit_logs_enabled and complompleted,
    "documentation": bool(compliance.sy   "data_governance": compliance.training_data_documented and compliance.bias_testing_cd and compliance.residual_risks_documented,
 k_management": compliance.risk_assessment_completect compliance report."""

checks = {
    "ris -> dict:
"""Generate EU AI Aame: str, compliance: EUAIActCompliance)g: bool = False

def generate_compliance_report(agent_ny and robustness accuracy_metrics: dict = None adversarial_testiny: bool = False

# Article 15: Accurac = None
override_capabilit14: Human oversight
human_oversight_measures: list[str]xt: str = ""

# Article ers_informed_of_ai: bool = False
ai_disclosure_teisk

# Article 13: Transparency
us: Record-keeping
audit_logs_enabled: bool = False
log_retention_days: int = 0  # minimum 6 months for high-r
system_description: str = ""
intended_purpose: str = ""
performance_metrics: dict = None

# Article 12l = False

# Article 11: Technical documentation: bool = False
data_quality_measures: bool = False
bias_testing_completed: booresidual_risks_documented: bool = False

# Article 10: Data governance
training_data_documented
risk_assessment_date: str = ""
Act compliance requirements."""

# Article 9: Risk management system
risk_assessment_completed: bool = Falseaclasses import dataclass

@dataclass class EUAIActCompliance: “““Track EU AI

The EU AI Act (effective 2026) requires specific controls for high-risk AI systems:

from datecute the action
    return await execute_action(action, context)

EU AI Act Compliance was {status.value}”)

# Ex           raise PermissionError(f"Action {action}f status != ApprovalStatus.APPROVED:

tion, context=context, approver_email="[email protected]”, )

    iaction_id=action_id,
        agent_name=agent_name,
        action=ac:
    action_id = str(uuid.uuid4())
    status = await approval_gate.request_approval(
        dict):
profile = AGENT_PROFILES[agent_name]

if profile.requires_human_approvalpproval(agent_name: str, action: str, context: econds=300)

async def execute_with_aal_gate = HumanApprovalGate(notification_service, timeout_sending[action_id]

Usage in agent

approvf.pending[action_id].set_result(status) del self.palStatus.REJECTED sel status = ApprovalStatus.APPROVED if approved else Approvs or rejects.”"" if action_id in self.pending: “““Called when human approvetion_id: str, approved: bool): id] return ApprovalStatus.TIMEOUT

def resolve(self, ac   del self.pending[action_status
    except asyncio.TimeoutError:
     out=self.timeout)
        return Wait for approval with timeout
        status = await asyncio.wait_for(future, timeion_id,
    )

    try:
        # nutes.
        """,
        action_id=actres in {self.timeout // 60} mis://admin.example.com/reject/{action_id}

This request expiove: https://admin.example.com/approve/{action_id} Reject: http Agent: {agent_name} Action: {action} Context: {json.dumps(context, indent=2)}

Appr to {action}”, body=f”““pprover_email, subject=f"Approval Required: {agent_name} wantslf.notifier.send_approval_request( to=aeate_future() self.pending[action_id] = future

    # Notify the approver
    await sesponds
    future = asyncio.get_event_loop().cr approval and wait for response."""

    # Create a future that will be resolved when human reuest human    ) -> ApprovalStatus:
    """Req str,
    action: str,
    context: dict,
    approver_email: str,

ture] = {}

async def request_approval(
    self,
    action_id: str,
    agent_name:ending: dict[str, asyncio.Fu     self.timeout = timeout_seconds
    self.ptifier = notification_service

, timeout_seconds: int = 300): self.no

def __init__(self, notification_service""Pause agent execution and wait for human approval."""ding"
APPROVED = "approved"
REJECTED = "rejected"
TIMEOUT  = "timeout"

class HumanApprovalGate: “hon import asyncio from enum import Enum

class ApprovalStatus(Enum): PENDING = “pen actions, require human approval:

                raise

        return wrapper
    return decorator
```python
## Human-in-the-Loop Controls

For high-risks=0,
                    output_tokens=0,
                    metadata={"violation_reason": e.reason}
         model="",
                    input_tokenuration_ms=(time.time() - start) * 1000,
            _name, "result": "blocked"}],
                    allowed=False,
                    d                 policy_checks=[{"policy": e.policyNone,
                    tool_result=None,
                       tool_name=None,
                    tool_args=     action=func.__name__,
.get("user_id"),
               ", "unknown"),
                    user_id=kwargsn,
                    session_id=kwargs.get("session_ident_name,
                    agent_version=versiot,
                    agent_name=ag     event_id=event_id,
                    timestamp=star             audit_logger.log(AuditEvent(
                        except PolicyViolation as e:
                  return result

              metadata={}
                ))
 kwargs.get("output_tokens", 0),
         t_tokens", 0),
                    output_tokens=                    input_tokens=kwargs.get("inpu          model=kwargs.get("model", "unknown"),
olicy_checks=[],
                    allowed=True,
                    duration_ms=duration,
          one,
                    presult=str(result)[:500] if result else Ns"),
                    tool_              tool_args=kwargs.get("tool_arg             tool_name=kwargs.get("tool_name"),
      
                    action=func.__name__,
                          user_id=kwargs.get("user_id"),kwargs.get("session_id", "unknown"),
                     session_id=,
me,
                    agent_version=version                   agent_name=agent_na              timestamp=start,
 g(AuditEvent(
                    event_id=event_id,
      - start) * 1000

                audit_logger.lo**kwargs)
                duration = (time.time()           try:
                result = await func(*args,        event_id = str(uuid.uuid4())

  **kwargs):
            start = time.time()
     nc def wrapper(*args, str):
    def decorator(func):
        asy
def audited(agent_name: str, version: message": json.dumps(record)
                }]
            )

# Decorator for automatic audit logging        "nt(event.timestamp * 1000),
            ogEvents=[{
                    "timestamp": igStreamName=event.agent_name,
                lroupName="/ai-agents/audit",
                lologs.put_log_events(
                logGto3
            logs = boto3.client("logs")
            f.backend == "cloudwatch":
            import boES_TOKEN}"}
            )

        elif sel      headers={"Authorization": f"Bearer {   _doc",
                json=record,
       {ES_URL}/ai-audit-{time.strftime('%Y.%m')}/   httpx.post(
                f"earchable audit trail
            import httpx
             # Send to Elasticsearch for s":
         self.backend == "elasticsearch.dumps(record) + "\n")

        elif with open(self.log_path, "a") as f:
                f.write(jsonf self.backend == "file":
           ent):
        record = asdict(event)

        ilog_path

    def log(self, event: AuditEvt.jsonl"):
        self.backend = backend
        self.log_path = 
    output_tokens: int
    metadata: dict

class AuditLogger:
    def __init__(self, backend="file", log_path="/var/log/ai-agents/audition_ms: float
    model: str
    input_tokens: intn_id: str
    user_id: str | None
    action: str
    tool_name: str | None
    tool_args: dict | None
    tool_result: str | None
    policy_checks: list[dict]
    allowed: bool
    dura
@dataclass
class AuditEvent:
    event_id: str
    timestamp: float
    agent_name: str
    agent_version: str
    sessioaclasses import dataclass, asdict
from typing import Any
 must be logged for compliance and debugging:

```text

import json
import uuid
import time
from dat
    print(f"Action blocked: {reasons}")

Audit Logging

Every agent action action=“execute_transaction”, context={“amount”: 5000, “recipient”: “[email protected]”} )

if not allowed:.evaluate( agent_name=“financial-advisor”, ) ))

Usage

allowed, reasons = engine

Comments

👍 Was this article helpful?