Skip to main content
โšก Calmops

Logging Best Practices: Structured Logging for Production

Good logs are invaluable when debugging production issues. Bad logs are noise that obscures the signal. This guide covers logging best practices that make debugging easier and systems more observable.

Log Levels

Use appropriate log levels consistently:

The Hierarchy

Level When to Use Example
DEBUG Detailed info for debugging Loop iterations, variable values
INFO Normal operation events User logged in, order placed
WARNING Something unexpected, but handled Retry succeeded, deprecated API used
ERROR Error that affected current request Database timeout, validation failed
CRITICAL System cannot function Cannot connect to database

Guidelines

import logging

logger = logging.getLogger(__name__)

# DEBUG - Only when debugging
for item in items:  # Don't log every iteration!
    logger.debug(f"Processing item {item.id}")
    process(item)

# INFO - Normal operations
logger.info(f"User {user_id} logged in from {ip_address}")
logger.info(f"Order {order_id} created for ${total}")

# WARNING - Recoverable issues
logger.warning(f"Rate limit approaching: {requests_count}/1000")
logger.warning("Using deprecated API v1, please migrate to v2")

# ERROR - Failures
logger.error(f"Failed to process order {order_id}: {str(e)}")
logger.error("Database connection failed after 3 retries")

# CRITICAL - System failures
logger.critical("Database unavailable - cannot serve requests")

What NOT to Log

# DON'T log sensitive data
logger.info(f"User login: {email}, password: {password}")  # BAD!
logger.info(f"Payment: card={credit_card_number}")  # BAD!

# DON'T log at wrong level
logger.error(f"Cache miss for key: {key}")  # Should be DEBUG
logger.info("Server started")  # Should be INFO

# DON'T log excessively
for i in range(10000):  # Don't log every iteration
    process(items[i])

Structured Logging

Structured logs are machine-readable JSON with consistent fields:

Plain Text vs Structured

# BAD - Plain text, hard to parse
logger.info("User [email protected] placed order 12345 for $99.99")

# GOOD - Structured JSON
logger.info(
    "Order placed",
    extra={
        "event_type": "order_placed",
        "user_id": "user-123",
        "user_email": "[email protected]",
        "order_id": "order-12345",
        "total": 99.99,
        "currency": "USD",
        "items_count": 3
    }
)

# Output:
# {"timestamp":"2024-01-15T10:00:00Z","level":"INFO","message":"Order placed",
#  "event_type":"order_placed","user_id":"user-123","order_id":"order-12345",
#  "total":99.99,"items_count":3}

Python Structured Logging

import json
import logging
from datetime import datetime

class StructuredFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        
        # Add extra fields
        if hasattr(record, "user_id"):
            log_data["user_id"] = record.user_id
        if hasattr(record, "order_id"):
            log_data["order_id"] = record.order_id
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)
        
        return json.dumps(log_data)

# Usage
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger = logging.getLogger("orders")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

logger.info("Order placed", extra={"order_id": "123", "user_id": "456"})

Using Structlog

import structlog

structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

# Easy structured logging
logger.info("order_placed",
    order_id="12345",
    user_id="user-456",
    total=99.99,
    items=["item-1", "item-2"]
)

Correlation IDs

Link related log entries across services and requests:

Implementing Correlation ID

import uuid
from contextvars import ContextVar
from functools import wraps

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def get_correlation_id():
    return correlation_id.get()

def set_correlation_id(correlation_id_value=None):
    if correlation_id_value is None:
        correlation_id_value = str(uuid.uuid4())
    correlation_id.set(correlation_id_value)
    return correlation_id_value

# Middleware to set correlation ID
class CorrelationMiddleware:
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        # Get from request or generate new
        cid = None
        for key, value in scope.get("headers", []):
            if key == b"x-correlation-id":
                cid = value.decode()
                break
        
        if not cid:
            cid = str(uuid.uuid4())
        
        correlation_id.set(cid)
        
        # Add to response headers
        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                message["headers"].append(
                    (b"x-correlation-id", cid.encode())
                )
            await send(message)
        
        await self.app(scope, receive, send_wrapper)

# Use in logging
def log_with_correlation(logger, message, **kwargs):
    logger.info(
        message,
        extra={
            "correlation_id": get_correlation_id(),
            **kwargs
        }
    )

Distributed Tracing Integration

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def log_with_trace(logger, message, **kwargs):
    span = trace.get_current_span()
    
    trace_id = format(span.get_span_context().trace_id, "032x")
    span_id = format(span.get_span_context().span_id, "016x")
    
    logger.info(
        message,
        extra={
            "trace_id": trace_id,
            "span_id": span_id,
            **kwargs
        }
    )

@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    with tracer.start_as_current_span("get_order") as span:
        span.set_attribute("order_id", order_id)
        
        log_with_trace(logger, "Fetching order", order_id=order_id)
        
        order = order_service.get(order_id)
        
        log_with_trace(logger, "Order fetched", order_id=order_id)
        
        return order

Log Management Pipeline

ELK Stack

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  App    โ”‚โ”€โ”€โ”€โ–ถโ”‚  Beats  โ”‚โ”€โ”€โ”€โ–ถโ”‚Elasticsearchโ”‚โ”€โ”€โ”€โ–ถโ”‚ Kibana  โ”‚
โ”‚ Logs    โ”‚    โ”‚(Filebeat)โ”‚    โ”‚             โ”‚    โ”‚         โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
# filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/app/*.log
    json.keys_under_root: true
    json.add_error_key: true

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "app-logs-%{[fields.environment]}-%{+yyyy.MM.dd}"

processors:
  - add_fields:
      target: fields
      fields:
        environment: production

Loki Stack

# docker-compose.yml
services:
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"
    config:
      server:
        http_listen_port: 3100
      common:
        path_prefix: /loki
        storage:
          filesystem:
            chunks_directory: /loki/chunks
            rules_directory: /loki/rules
        replication_factor: 1
        ring:
          instance_addr: 127.0.0.1
          kvstore:
            store: inmemory

  promtail:
    image: grafana/promtail:latest
    volumes:
      - /var/log:/var/log
    config:
      clients:
        - url: http://loki:3100/loki/api/v1/push

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"

What to Log

Request/Response Logging

import time
from fastapi import Request, Response

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()
    
    logger.info(
        "Request started",
        extra={
            "method": request.method,
            "path": request.url.path,
            "query_params": dict(request.query_params),
            "correlation_id": get_correlation_id()
        }
    )
    
    response = await call_next(request)
    
    duration = time.time() - start_time
    
    logger.info(
        "Request completed",
        extra={
            "method": request.method,
            "path": request.url.path,
            "status_code": response.status_code,
            "duration_ms": int(duration * 1000),
            "correlation_id": get_correlation_id()
        }
    )
    
    return response

Business Events

class OrderService:
    def create_order(self, customer_id, items):
        order = Order(customer_id=customer_id, items=items)
        
        # Log business event
        logger.info(
            "Order created",
            extra={
                "event_type": "order_created",
                "order_id": order.id,
                "customer_id": customer_id,
                "total": order.total,
                "items_count": len(items),
                "correlation_id": get_correlation_id()
            }
        )
        
        return order
    
    def cancel_order(self, order_id, reason):
        logger.warning(
            "Order cancelled",
            extra={
                "event_type": "order_cancelled",
                "order_id": order_id,
                "reason": reason,
                "correlation_id": get_correlation_id()
            }
        )

Errors

try:
    result = risky_operation()
except ValueError as e:
    logger.error(
        "Validation failed",
        extra={
            "error_type": "validation_error",
            "error_message": str(e),
            "correlation_id": get_correlation_id(),
            # Don't log sensitive data!
        }
    )
    raise

except Exception as e:
    logger.critical(
        "Unexpected error",
        extra={
            "error_type": type(e).__name__,
            "error_message": str(e),
            "traceback": traceback.format_exc(),
            "correlation_id": get_correlation_id()
        }
    )
    raise

Log Analysis Patterns

Finding Errors

# Kibana query
level:ERROR OR level:CRITICAL

# With time range
@timestamp:[now-1h TO now] AND level:ERROR

Finding User Activity

# All logs for a user
user_id:user-123

# All logs for an order
order_id:order-456

# Using correlation ID
correlation_id:a1b2c3d4-e5f6

Performance Analysis

# Slow requests
duration_ms:>5000

# By endpoint
avg(duration_ms) by path

Best Practices Checklist

  • Use structured logging (JSON)
  • Include correlation IDs
  • Use appropriate log levels
  • Log business events, not every detail
  • Never log sensitive data (passwords, tokens, PII)
  • Include context (user_id, order_id, etc.)
  • Set up log rotation
  • Use a log management system
  • Create dashboards for common queries
  • Set up alerts for errors

Conclusion

Effective logging is essential for production systems:

  • Use structured JSON logging
  • Include correlation IDs for request tracing
  • Log at appropriate levels
  • Never log sensitive data
  • Use a centralized log management system
  • Create dashboards and alerts

Start simple, add correlation IDs, then expand to full observability.

External Resources

Comments