Skip to main content
โšก Calmops

Kubernetes Operators: Automating Complex Application Management

Introduction

Kubernetes Operators are a powerful extension mechanism that allows you to encode operational knowledge into software. They automate complex application deployment, configuration, and management tasks that would otherwise require manual intervention.

Understanding Operators

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                 Kubernetes Operator Concept                      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  Traditional Deployment:                                        โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                              โ”‚
โ”‚  โ”‚   YAML       โ”‚ โ”€โ”€โ–ถ kubectl apply โ”€โ”€โ–ถ K8s Cluster          โ”‚
โ”‚  โ”‚  Manifests  โ”‚                                              โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                              โ”‚
โ”‚                                                                 โ”‚
โ”‚  Problem: Complex apps need:                                    โ”‚
โ”‚  - Automated backup and restore                                 โ”‚
โ”‚  - Failover and recovery                                        โ”‚
โ”‚  - Configuration updates                                        โ”‚
โ”‚  - Custom scaling logic                                          โ”‚
โ”‚  - Monitoring and alerts                                        โ”‚
โ”‚                                                                 โ”‚
โ”‚  Solution: Operator                                             โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                          โ”‚
โ”‚  โ”‚ Custom Resource  โ”‚ โ”€โ”€โ–ถ Operator โ”€โ”€โ–ถ K8s Resources           โ”‚
โ”‚  โ”‚   (MyApp)        โ”‚     (Controller)   (Pods, Services)     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                          โ”‚
โ”‚                                                                 โ”‚
โ”‚  The Operator understands:                                      โ”‚
โ”‚  - How to install the application                              โ”‚
โ”‚  - How to upgrade it                                           โ”‚
โ”‚  - How to handle failures                                       โ”‚
โ”‚  - How to scale based on metrics                                โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Operator Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Operator Architecture                           โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Kubernetes API Server                        โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ Built-in       โ”‚  โ”‚ Custom Resource Definition   โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ Resources      โ”‚  โ”‚ (CRD): MyApp                 โ”‚    โ”‚  โ”‚
โ”‚  โ”‚  โ”‚ (Pod, Service) โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                    โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ–ฒ                                   โ”‚
โ”‚                              โ”‚ Watch                             โ”‚
โ”‚                              โ”‚                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚                    Operator / Controller                 โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚  โ”‚
โ”‚  โ”‚  โ”‚           Reconciliation Loop                        โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚                                                     โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  1. Watch Custom Resources                         โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  2. Detect changes                                 โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  3. Read current state                            โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  4. Calculate desired state                       โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  5. Make changes to cluster                       โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  6. Update status                                โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  7. Repeat                                       โ”‚ โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                              โ”‚                                   โ”‚
โ”‚                              โ–ผ                                   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚              Kubernetes Resources Created                โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  Pods   โ”‚  โ”‚Services โ”‚  โ”‚ ConfigMapsโ”‚  โ”‚  Secrets   โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ”‚                                                                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Python Operator (kopf)

import kopf
import kubernetes.client
from kubernetes.client import CoreV1Api
from typing import Dict, List
import yaml

# Configuration
kopf.configure(debug=True)

# Custom Resource Definition will be created automatically
# apiVersion: example.com/v1
# kind: Database

@dataclass
class DatabaseSpec:
    """Database specification."""
    version: str
    size: str  # small, medium, large
    backup_enabled: bool = False
    backup_schedule: str = "0 2 * * *"  # Daily at 2 AM

@dataclass  
class DatabaseStatus:
    """Database status."""
    ready: bool = False
    endpoint: str = ""
    version: str = ""
    phase: str = "Creating"


@kopf.on.create('databases.example.com')
@kopf.on.update('databases.example.com')
def database_create_or_update(body, meta, spec, status, **kwargs):
    """Handle Database resource creation or update."""
    name = meta.name
    namespace = meta.namespace
    
    # Extract spec
    version = spec.get('version', '14')
    size = spec.get('size', 'small')
    backup_enabled = spec.get('backup_enabled', False)
    
    # Create or update resources
    try:
        # Create StatefulSet
        _create_statefulset(name, namespace, version, size)
        
        # Create Service
        _create_service(name, namespace)
        
        # Handle backups if enabled
        if backup_enabled:
            _setup_backup(name, namespace, spec.get('backup_schedule'))
        
        # Update status
        return {
            'status': {
                'ready': True,
                'endpoint': f'{name}.{namespace}.svc.cluster.local',
                'version': version,
                'phase': 'Running'
            }
        }
    
    except Exception as e:
        return {
            'status': {
                'ready': False,
                'phase': 'Failed',
                'error': str(e)
            }
        }


@kopf.on.delete('databases.example.com')
def database_delete(body, meta, **kwargs):
    """Handle Database resource deletion."""
    name = meta.name
    namespace = meta.namespace
    
    # Cleanup resources
    _delete_statefulset(name, namespace)
    _delete_service(name, namespace)
    _cleanup_backups(name, namespace)
    
    return {}


@kopf.on.field('databases.example.com', field='spec.size')
def database_scale(spec, old, new, **kwargs):
    """Handle size changes (scaling)."""
    # Scale up or down based on size change
    _update_replicas(new)


def _create_statefulset(name: str, namespace: str, version: str, size: str):
    """Create StatefulSet for database."""
    replicas = {'small': 1, 'medium': 2, 'large': 3}.get(size, 1)
    
    # Resource limits based on size
    resources = {
        'small': {'cpu': '500m', 'memory': '512Mi'},
        'medium': {'cpu': '1', 'memory': '1Gi'},
        'large': {'cpu': '2', 'memory': '2Gi'},
    }.get(size, {'cpu': '500m', 'memory': '512Mi'})
    
    statefulset = {
        'apiVersion': 'apps/v1',
        'kind': 'StatefulSet',
        'metadata': {
            'name': name,
            'namespace': namespace
        },
        'spec': {
            'serviceName': name,
            'replicas': replicas,
            'selector': {
                'matchLabels': {'app': name}
            },
            'template': {
                'metadata': {
                    'labels': {'app': name}
                },
                'spec': {
                    'containers': [{
                        'name': 'postgres',
                        'image': f'postgres:{version}',
                        'ports': [{'containerPort': 5432}],
                        'env': [
                            {'name': 'POSTGRES_DB', 'value': name},
                            {'name': 'POSTGRES_USER', 'value': name},
                        ],
                        'resources': {
                            'requests': resources,
                            'limits': resources
                        }
                    }]
                }
            }
        }
    }
    
    api = kubernetes.client.AppsV1Api()
    
    try:
        api.read_namespaced_stateful_set(name, namespace)
    except kubernetes.client.exceptions.ApiException:
        api.create_namespaced_stateful_set(namespace, statefulset)


def _create_service(name: str, namespace: str):
    """Create Service for database."""
    service = {
        'apiVersion': 'v1',
        'kind': 'Service',
        'metadata': {'name': name, 'namespace': namespace},
        'spec': {
            'selector': {'app': name},
            'ports': [
                {'name': 'postgres', 'port': 5432, 'targetPort': 5432}
            ],
            'clusterIP': 'None'  # Headless service for StatefulSet
        }
    }
    
    api = kubernetes.client.CoreV1Api()
    
    try:
        api.read_namespaced_service(name, namespace)
    except kubernetes.client.exceptions.ApiException:
        api.create_namespaced_service(namespace, service)


def _setup_backup(name: str, namespace: str, schedule: str):
    """Setup automated backups using CronJob."""
    # Implementation for backup CronJob
    pass


def _delete_statefulset(name: str, namespace: str):
    """Delete StatefulSet."""
    api = kubernetes.client.AppsV1Api()
    try:
        api.delete_namespaced_stateful_set(name, namespace)
    except:
        pass


def _delete_service(name: str, namespace: str):
    """Delete Service."""
    api = kubernetes.client.CoreV1Api()
    try:
        api.delete_namespaced_service(name, namespace)
    except:
        pass


def _cleanup_backups(name: str, namespace: str):
    """Cleanup backups."""
    pass

Go Operator (Controller Runtime)

package main

import (
    "context"
    "fmt"
    
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/util/intstr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    "sigs.k8s.io/controller-runtime/pkg/log"
    
    examplecomv1 "example.com/api/v1"
)

// DatabaseReconciler reconciles a Database object
type DatabaseReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

//+kubebuilder:rbac:groups=example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=example.com,resources=databases/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete

func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    logger := log.FromContext(ctx)
    
    // Fetch the Database instance
    db := &examplecomv1.Database{}
    err := r.Get(ctx, req.NamespacedName, db)
    if err != nil {
        if errors.IsNotFound(err) {
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }
    
    // Get desired size
    size := getReplicas(db.Spec.Size)
    
    // Create or update StatefulSet
    sts := &appsv1.StatefulSet{}
    _, err = controllerutil.CreateOrUpdate(ctx, r.Client, sts, func() error {
        return r.mutateStatefulSet(db, sts)
    })
    if err != nil {
        logger.Error(err, "Failed to create/update StatefulSet")
        return ctrl.Result{}, err
    }
    
    // Create or update Service
    svc := &corev1.Service{}
    _, err = controllerutil.CreateOrUpdate(ctx, r.Client, svc, func() error {
        return r.mutateService(db, svc)
    })
    if err != nil {
        logger.Error(err, "Failed to create/update Service")
        return ctrl.Result{}, err
    }
    
    // Update status
    db.Status.Ready = true
    db.Status.Endpoint = fmt.Sprintf("%s.%s.svc.cluster.local", db.Name, db.Namespace)
    db.Status.Version = db.Spec.Version
    db.Status.Replicas = size
    
    if err := r.Status().Update(ctx, db); err != nil {
        logger.Error(err, "Failed to update Database status")
        return ctrl.Result{}, err
    }
    
    return ctrl.Result{}, nil
}

func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&examplecomv1.Database{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Complete(r)
}

func (r *DatabaseReconciler) mutateStatefulSet(db *examplecomv1.Database, sts *appsv1.StatefulSet) error {
    sts.ObjectMeta = metav1.ObjectMeta{
        Name:      db.Name,
        Namespace: db.Namespace,
    }
    
    replicas := getReplicas(db.Spec.Size)
    
    sts.Spec = appsv1.StatefulSetSpec{
        ServiceName: db.Name,
        Replicas:    &replicas,
        Selector: &metav1.LabelSelector{
            MatchLabels: map[string]string{"app": db.Name},
        },
        Template: corev1.PodTemplateSpec{
            ObjectMeta: metav1.ObjectMeta{
                Labels: map[string]string{"app": db.Name},
            },
            Spec: corev1.PodSpec{
                Containers: []corev1.Container{{
                    Name:  "postgres",
                    Image: fmt.Sprintf("postgres:%s", db.Spec.Version),
                    Ports: []corev1.ContainerPort{{
                        Name:          "postgres",
                        ContainerPort: 5432,
                    }},
                    Env: []corev1.EnvVar{
                        {Name: "POSTGRES_DB", Value: db.Name},
                        {Name: "POSTGRES_USER", Value: db.Name},
                    },
                }},
            },
        },
    }
    
    // Set owner reference
    controllerutil.SetControllerReference(db, sts, r.Scheme)
    
    return nil
}

func (r *DatabaseReconciler) mutateService(db *examplecomv1.Database, svc *corev1.Service) error {
    svc.ObjectMeta = metav1.ObjectMeta{
        Name:      db.Name,
        Namespace: db.Namespace,
    }
    
    svc.Spec = corev1.ServiceSpec{
        Selector:  map[string]string{"app": db.Name},
        Ports: []corev1.ServicePort{{
            Name:       "postgres",
            Port:       5432,
            TargetPort: intstr.FromInt(5432),
        }},
        ClusterIP: corev1.ClusterIPNone,
    }
    
    controllerutil.SetControllerReference(db, svc, r.Scheme)
    
    return nil
}

func getReplicas(size string) int32 {
    switch size {
    case "small":
        return 1
    case "medium":
        return 2
    case "large":
        return 3
    default:
        return 1
    }
}

func main() {
    ctrl.SetLogger(zap.New())
    
    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
        Scheme:             scheme,
        LeaderElection:     true,
        LeaderElectionID:   "example.com",
    })
    if err != nil {
        os.Exit(1)
    }
    
    if err = (&DatabaseReconciler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr); err != nil {
        os.Exit(1)
    }
    
    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
        os.Exit(1)
    }
}

Custom Resource Definition

# crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.example.com
spec:
  group: example.com
  names:
    kind: Database
    plural: databases
    shortNames:
      - db
  scope: Namespaced
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              required:
                - version
                - size
              properties:
                version:
                  type: string
                  description: "PostgreSQL version"
                  example: "14"
                size:
                  type: string
                  enum: [small, medium, large]
                  description: "Database size/tier"
                backup_enabled:
                  type: boolean
                  default: false
                backup_schedule:
                  type: string
                  default: "0 2 * * *"
            status:
              type: object
              properties:
                ready:
                  type: boolean
                endpoint:
                  type: string
                version:
                  type: string
                replicas:
                  type: integer

Using the Operator

# database.yaml
apiVersion: example.com/v1
kind: Database
metadata:
  name: my-production-db
  namespace: production
spec:
  version: "14"
  size: large
  backup_enabled: true
  backup_schedule: "0 2 * * *"
# Deploy operator
kubectl apply -f operator.yaml

# Deploy database
kubectl apply -f database.yaml

# Check status
kubectl get database my-production-db

# Scale database
kubectl patch database my-production-db -p '{"spec":{"size":"medium"}}' --type=merge

# Delete database
kubectl delete database my-production-db

Common Operator Patterns

# Finalizers - Ensure cleanup
@kopf.on.create('myresources.example.com')
def add_finalizer(body, **kwargs):
    if 'finalizers' not in body.metadata:
        body.metadata['finalizers'] = []
    
    if 'my-operator/finalizer' not in body.metadata['finalizers']:
        body.metadata['finalizers'].append('my-operator/finalizer')
    
    return {'metadata': {'finalizers': body.metadata['finalizers']}}

@kopf.on.delete('myresources.example.com')
def cleanup_on_delete(spec, **kwargs):
    # Perform cleanup before deletion
    _cleanup_resources(spec)
    
    # Return finalizer to remove it
    return {}

# Status Conditions
def update_status(conditions: list, condition_type: str, status: bool, reason: str):
    """Update status conditions."""
    now = datetime.utcnow()
    
    # Find existing condition
    existing = next((c for c in conditions if c.get('type') == condition_type), None)
    
    if existing:
        existing['status'] = status
        existing['reason'] = reason
        existing['lastTransitionTime'] = now.isoformat()
    else:
        conditions.append({
            'type': condition_type,
            'status': status,
            'reason': reason,
            'lastTransitionTime': now.isoformat()
        })
    
    return conditions

Best Practices

# Operator best practices
# 1. Use finalizers for cleanup
# 2. Implement status conditions
# 3. Handle reconcile errors gracefully
# 4. Add proper logging
# 5. Implement idempotency
# 6. Add webhooks for validation
# 7. Use leader election for HA
# 8. Implement proper error handling

Conclusion

Kubernetes Operators extend Kubernetes to manage complex applications automatically. By encoding operational knowledge into controllers, operators can handle deployment, scaling, backups, and recovery without manual intervention.

Key takeaways:

  • Operators use Custom Resource Definitions to extend Kubernetes
  • The reconciliation loop ensures desired state is maintained
  • Use operator frameworks like Operator SDK or kopf for development
  • Implement finalizers for proper cleanup
  • Add status conditions for better observability

Resources

Comments