Introduction
Policy as code enables organizations to define, version, and enforce policies programmatically. The Open Policy Agent (OPA) provides a unified framework for policy enforcement across the stack.
This guide covers OPA’s architecture, the Rego policy language, and practical implementation for cloud-native security.
Understanding OPA
What is OPA?
OPA is an open-source policy engine that enables:
- Decentralized policy: Policies defined close to applications
- Runtime enforcement: Check policies at API calls
- Static analysis: Validate configs before deployment
- Custom logic: Express complex policies easily
Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Policy Decision โ
โ Point โ
โ โ
โ Request โโโบ OPA Engine โโโบ Decision (Allow/Deny) โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Rego Policies โ โ
โ โ package kubernetes.admission โ โ
โ โ โ โ
โ โ deny[msg] { โ โ
โ โ input.request.kind.kind == "Pod" โ โ
โ โ not input.request.object.spec.privileged โ โ
โ โ msg := "Privileged pods not allowed" โ โ
โ โ } โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
The Rego Language
Basic Syntax
# Simple rule
package example
allow {
input.method == "GET"
}
# Rule with condition
deny[msg] {
input.user.role == "admin"
msg := "Admin cannot perform this action"
}
# Complex rule
allowed_resources[resource] {
role := input.user.roles[_]
resource := role.permissions[_]
resource.type == input.request.type
}
Data Structures
# Objects
user := {
"name": "john",
"roles": ["admin", "developer"]
}
# Arrays
names := ["alice", "bob", "charlie"]
# Array comprehensions
squares := [x*x | x := numbers[_]; x > 0]
# Sets
unique_roles := {role | role := input.user.roles[_]}
Built-in Functions
# String functions
lower("HELLO") # "hello"
upper("hello") # "HELLO"
contains("hello world", "world") # true
trim(" hello ") # "hello"
# Array functions
count([1, 2, 3]) # 3
sum([1, 2, 3]) # 6
sort([3, 1, 2]) # [1, 2, 3]
# Object functions
keys({"a": 1, "b": 2}) # ["a", "b"]
values({"a": 1, "b": 2}) # [1, 2]
# Input functions
input.request.time # timestamp
input.request.sourceIP # IP address
Kubernetes Admission Control
Gatekeeper Installation
# Install Gatekeeper
kubectl apply -f https://raw.githubusercontent.com/open-policy-agent/gatekeeper/release-3.15/deploy/gatekeeper.yaml
# Install ConstraintTemplates
kubectl apply -f https://raw.githubusercontent.com/open-policy-agent/gatekeeper/release-3.15/deploy/gatekeeper-crds.yaml
# Verify installation
kubectl get pods -n gatekeeper-system
Constraint Templates
# k8srequiredlabels-template.yaml
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
name: k8srequiredlabels
spec:
crd:
spec:
names:
kind: K8sRequiredLabels
validation:
openAPIV3Schema:
type: object
properties:
labels:
type: array
items:
type: object
properties:
key:
type: string
value:
type: string
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package k8srequiredlabels
violation[{"msg": msg}] {
provided := {k | k := input.review.object.metadata.labels[_].key}
required := {k | k := input.parameters.labels[_].key}
missing := required - provided
count(missing) > 0
msg := sprintf("Required labels %v not found", [missing])
}
Constraint
# require-labels-constraint.yaml
apiVersion: constraints.gatekeeper.sh/v1
kind: K8sRequiredLabels
metadata:
name: require-namespace-labels
spec:
match:
kinds:
- apiGroups: [""]
kinds: ["Namespace"]
parameters:
labels:
- key: "environment"
- key: "team"
- key: "cost-center"
Practical Policy Examples
Prevent Privileged Containers
# privileged-pod-template.yaml
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
name: k8sprivilegedcontainer
spec:
crd:
spec:
names:
kind: K8sPrivilegedContainer
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package k8sprivilegedcontainer
violation[{"msg": msg, "details":}] {
container := input.review.object.spec.containers[_]
container.securityContext.privileged == true
msg := sprintf("Privileged container %v not allowed in %v", [container.name, input.review.object.metadata.name])
details := {"container": container.name}
}
Require Resource Limits
package k8scontainerlimits
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not container.resources.limits
msg := sprintf("Container %v has no resource limits", [container.name])
}
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not container.resources.limits.memory
msg := sprintf("Container %v has no memory limit", [container.name])
}
Restrict Image Sources
package k8sregistry
allowed_registries := [
"ghcr.io",
"docker.io",
"registry.internal.company.com"
]
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
image := container.image
not startswith(image, "ghcr.io/")
not startswith(image, "docker.io/")
not startswith(image, "registry.internal.company.com/")
msg := sprintf("Image %v not from allowed registry", [image])
}
Block Latest Tag
package k8slatesttag
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
endswith(container.image, ":latest")
msg := sprintf("Image %v using latest tag is not allowed", [container.image])
}
Terraform Validation
Conftest Installation
# Install conftest
brew install conftest
# Or via pip
pip install conftest
Policy Example
# policy.rego
package main
deny[msg] {
input.resource.type == "aws_instance"
input.resource.name == "admin"
input.resource.config.instance_type == "m5.24xlarge"
msg := "Large instances not allowed for admin server"
}
warn[msg] {
input.resource.type == "aws_s3_bucket"
not input.resource.config.server_side_encryption_configuration
msg := "S3 bucket should have encryption enabled"
}
Usage
# main.tf
resource "aws_instance" "web" {
ami = "ami-12345"
instance_type = "t3.micro"
tags = {
Name = "web-server"
}
}
# Validate
conftest test main.tf
# With custom policy
conftest test main.tf -p policy.rego
API Authorization
OPA as Authorization Engine
#opa-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: authz-policy
data:
authz.rego: |
package authz
default allow = false
allow {
input.method == "GET"
input.user.roles[_] == "reader"
}
allow {
input.method == "POST"
input.user.roles[_] == "writer"
}
allow {
input.user.roles[_] == "admin"
}
Integration
package main
import (
"context"
"fmt"
"github.com/open-policy-agent/opa/rego"
)
func Authorize(ctx context.Context, input map]interface{}) (bool, error) {
query := "data.authz.allow"
r := rego.New(
rego.Query(query),
rego.Input(input),
)
rs, err := r.Eval(ctx)
if err != nil {
return false, err
}
if len(rs) == 0 || len(rs[0].Expressions) == 0 {
return false, nil
}
return rs[0].Expressions[0].Value.(bool), nil
}
Testing Rego Policies
Unit Tests
package main
test_allow_admin {
allow with input as {"method": "POST", "user": {"roles": ["admin"]}}
}
test_allow_reader {
allow with input as {"method": "GET", "user": {"roles": ["reader"]}}
}
test_deny_writer_get {
not allow with input as {"method": "GET", "user": {"roles": ["writer"]}}
}
Run Tests
# Run with OPA
opa test policy.rego -v
# Run with conftest
conftest verify policy/
Implementation Checklist
Setup
- Install OPA/Conftest/Gatekeeper
- Configure initial policies
- Set up policy testing
Policy Development
- Document policy requirements
- Write Rego policies
- Test thoroughly
- Version in Git
Enforcement
- Deploy to test environment
- Monitor violations
- Tune policies
- Deploy to production
Summary
OPA provides powerful policy enforcement:
- Rego is expressive for complex rules
- Gatekeeper integrates natively with Kubernetes
- Conftest validates configuration files
- Testing ensures policy correctness
Start with simple policies, then add complexity as understanding grows.
Comments