Skip to main content
โšก Calmops

OPA/Rego: Policy as Code Deep Dive

Introduction

Policy as code enables organizations to define, version, and enforce policies programmatically. The Open Policy Agent (OPA) provides a unified framework for policy enforcement across the stack.

This guide covers OPA’s architecture, the Rego policy language, and practical implementation for cloud-native security.


Understanding OPA

What is OPA?

OPA is an open-source policy engine that enables:

  • Decentralized policy: Policies defined close to applications
  • Runtime enforcement: Check policies at API calls
  • Static analysis: Validate configs before deployment
  • Custom logic: Express complex policies easily

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Policy Decision                      โ”‚
โ”‚                         Point                           โ”‚
โ”‚                                                          โ”‚
โ”‚   Request โ”€โ”€โ–บ OPA Engine โ”€โ”€โ–บ Decision (Allow/Deny)   โ”‚
โ”‚                                                          โ”‚
โ”‚   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚   โ”‚                 Rego Policies                   โ”‚   โ”‚
โ”‚   โ”‚  package kubernetes.admission                 โ”‚   โ”‚
โ”‚   โ”‚                                          โ”‚   โ”‚
โ”‚   โ”‚  deny[msg] {                             โ”‚   โ”‚
โ”‚   โ”‚    input.request.kind.kind == "Pod"        โ”‚   โ”‚
โ”‚   โ”‚    not input.request.object.spec.privileged โ”‚   โ”‚
โ”‚   โ”‚    msg := "Privileged pods not allowed"    โ”‚   โ”‚
โ”‚   โ”‚  }                                        โ”‚   โ”‚
โ”‚   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

The Rego Language

Basic Syntax

# Simple rule
package example

allow {
    input.method == "GET"
}

# Rule with condition
deny[msg] {
    input.user.role == "admin"
    msg := "Admin cannot perform this action"
}

# Complex rule
allowed_resources[resource] {
    role := input.user.roles[_]
    resource := role.permissions[_]
    resource.type == input.request.type
}

Data Structures

# Objects
user := {
    "name": "john",
    "roles": ["admin", "developer"]
}

# Arrays
names := ["alice", "bob", "charlie"]

# Array comprehensions
squares := [x*x | x := numbers[_]; x > 0]

# Sets
unique_roles := {role | role := input.user.roles[_]}

Built-in Functions

# String functions
lower("HELLO")                      # "hello"
upper("hello")                      # "HELLO"
contains("hello world", "world")    # true
trim("  hello  ")                   # "hello"

# Array functions
count([1, 2, 3])                  # 3
sum([1, 2, 3])                     # 6
sort([3, 1, 2])                    # [1, 2, 3]

# Object functions
keys({"a": 1, "b": 2})            # ["a", "b"]
values({"a": 1, "b": 2})            # [1, 2]

# Input functions
input.request.time                # timestamp
input.request.sourceIP             # IP address

Kubernetes Admission Control

Gatekeeper Installation

# Install Gatekeeper
kubectl apply -f https://raw.githubusercontent.com/open-policy-agent/gatekeeper/release-3.15/deploy/gatekeeper.yaml

# Install ConstraintTemplates
kubectl apply -f https://raw.githubusercontent.com/open-policy-agent/gatekeeper/release-3.15/deploy/gatekeeper-crds.yaml

# Verify installation
kubectl get pods -n gatekeeper-system

Constraint Templates

# k8srequiredlabels-template.yaml
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
  name: k8srequiredlabels
spec:
  crd:
    spec:
      names:
        kind: K8sRequiredLabels
      validation:
        openAPIV3Schema:
          type: object
          properties:
            labels:
              type: array
              items:
                type: object
                properties:
                  key:
                    type: string
                  value:
                    type: string
  targets:
    - target: admission.k8s.gatekeeper.sh
      rego: |
        package k8srequiredlabels

        violation[{"msg": msg}] {
          provided := {k | k := input.review.object.metadata.labels[_].key}
          required := {k | k := input.parameters.labels[_].key}
          missing := required - provided
          count(missing) > 0
          msg := sprintf("Required labels %v not found", [missing])
        }

Constraint

# require-labels-constraint.yaml
apiVersion: constraints.gatekeeper.sh/v1
kind: K8sRequiredLabels
metadata:
  name: require-namespace-labels
spec:
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Namespace"]
  parameters:
    labels:
      - key: "environment"
      - key: "team"
      - key: "cost-center"

Practical Policy Examples

Prevent Privileged Containers

# privileged-pod-template.yaml
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
  name: k8sprivilegedcontainer
spec:
  crd:
    spec:
      names:
        kind: K8sPrivilegedContainer
  targets:
    - target: admission.k8s.gatekeeper.sh
      rego: |
        package k8sprivilegedcontainer

        violation[{"msg": msg, "details":}] {
          container := input.review.object.spec.containers[_]
          container.securityContext.privileged == true
          msg := sprintf("Privileged container %v not allowed in %v", [container.name, input.review.object.metadata.name])
          details := {"container": container.name}
        }

Require Resource Limits

package k8scontainerlimits

violation[{"msg": msg}] {
  container := input.review.object.spec.containers[_]
  not container.resources.limits
  msg := sprintf("Container %v has no resource limits", [container.name])
}

violation[{"msg": msg}] {
  container := input.review.object.spec.containers[_]
  not container.resources.limits.memory
  msg := sprintf("Container %v has no memory limit", [container.name])
}

Restrict Image Sources

package k8sregistry

allowed_registries := [
  "ghcr.io",
  "docker.io",
  "registry.internal.company.com"
]

violation[{"msg": msg}] {
  container := input.review.object.spec.containers[_]
  image := container.image
  not startswith(image, "ghcr.io/")
  not startswith(image, "docker.io/")
  not startswith(image, "registry.internal.company.com/")
  msg := sprintf("Image %v not from allowed registry", [image])
}

Block Latest Tag

package k8slatesttag

violation[{"msg": msg}] {
  container := input.review.object.spec.containers[_]
  endswith(container.image, ":latest")
  msg := sprintf("Image %v using latest tag is not allowed", [container.image])
}

Terraform Validation

Conftest Installation

# Install conftest
brew install conftest

# Or via pip
pip install conftest

Policy Example

# policy.rego
package main

deny[msg] {
  input.resource.type == "aws_instance"
  input.resource.name == "admin"
  input.resource.config.instance_type == "m5.24xlarge"
  msg := "Large instances not allowed for admin server"
}

warn[msg] {
  input.resource.type == "aws_s3_bucket"
  not input.resource.config.server_side_encryption_configuration
  msg := "S3 bucket should have encryption enabled"
}

Usage

# main.tf
resource "aws_instance" "web" {
  ami           = "ami-12345"
  instance_type = "t3.micro"
  
  tags = {
    Name = "web-server"
  }
}
# Validate
conftest test main.tf

# With custom policy
conftest test main.tf -p policy.rego

API Authorization

OPA as Authorization Engine

#opa-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: authz-policy
data:
  authz.rego: |
    package authz
    
    default allow = false
    
    allow {
      input.method == "GET"
      input.user.roles[_] == "reader"
    }
    
    allow {
      input.method == "POST"
      input.user.roles[_] == "writer"
    }
    
    allow {
      input.user.roles[_] == "admin"
    }

Integration

package main

import (
    "context"
    "fmt"
    "github.com/open-policy-agent/opa/rego"
)

func Authorize(ctx context.Context, input map]interface{}) (bool, error) {
    query := "data.authz.allow"
    
    r := rego.New(
        rego.Query(query),
        rego.Input(input),
    )
    
    rs, err := r.Eval(ctx)
    if err != nil {
        return false, err
    }
    
    if len(rs) == 0 || len(rs[0].Expressions) == 0 {
        return false, nil
    }
    
    return rs[0].Expressions[0].Value.(bool), nil
}

Testing Rego Policies

Unit Tests

package main

test_allow_admin {
    allow with input as {"method": "POST", "user": {"roles": ["admin"]}}
}

test_allow_reader {
    allow with input as {"method": "GET", "user": {"roles": ["reader"]}}
}

test_deny_writer_get {
    not allow with input as {"method": "GET", "user": {"roles": ["writer"]}}
}

Run Tests

# Run with OPA
opa test policy.rego -v

# Run with conftest
conftest verify policy/

Implementation Checklist

Setup

  • Install OPA/Conftest/Gatekeeper
  • Configure initial policies
  • Set up policy testing

Policy Development

  • Document policy requirements
  • Write Rego policies
  • Test thoroughly
  • Version in Git

Enforcement

  • Deploy to test environment
  • Monitor violations
  • Tune policies
  • Deploy to production

Summary

OPA provides powerful policy enforcement:

  1. Rego is expressive for complex rules
  2. Gatekeeper integrates natively with Kubernetes
  3. Conftest validates configuration files
  4. Testing ensures policy correctness

Start with simple policies, then add complexity as understanding grows.


External Resources

Comments