Good logs are invaluable when debugging production issues. Bad logs are noise that obscures the signal. This guide covers logging best practices that make debugging easier and systems more observable.
Log Levels
Use appropriate log levels consistently:
The Hierarchy
| Level | When to Use | Example |
|---|---|---|
| DEBUG | Detailed info for debugging | Loop iterations, variable values |
| INFO | Normal operation events | User logged in, order placed |
| WARNING | Something unexpected, but handled | Retry succeeded, deprecated API used |
| ERROR | Error that affected current request | Database timeout, validation failed |
| CRITICAL | System cannot function | Cannot connect to database |
Guidelines
import logging
logger = logging.getLogger(__name__)
# DEBUG - Only when debugging
for item in items: # Don't log every iteration!
logger.debug(f"Processing item {item.id}")
process(item)
# INFO - Normal operations
logger.info(f"User {user_id} logged in from {ip_address}")
logger.info(f"Order {order_id} created for ${total}")
# WARNING - Recoverable issues
logger.warning(f"Rate limit approaching: {requests_count}/1000")
logger.warning("Using deprecated API v1, please migrate to v2")
# ERROR - Failures
logger.error(f"Failed to process order {order_id}: {str(e)}")
logger.error("Database connection failed after 3 retries")
# CRITICAL - System failures
logger.critical("Database unavailable - cannot serve requests")
What NOT to Log
# DON'T log sensitive data
logger.info(f"User login: {email}, password: {password}") # BAD!
logger.info(f"Payment: card={credit_card_number}") # BAD!
# DON'T log at wrong level
logger.error(f"Cache miss for key: {key}") # Should be DEBUG
logger.info("Server started") # Should be INFO
# DON'T log excessively
for i in range(10000): # Don't log every iteration
process(items[i])
Structured Logging
Structured logs are machine-readable JSON with consistent fields:
Plain Text vs Structured
# BAD - Plain text, hard to parse
logger.info("User [email protected] placed order 12345 for $99.99")
# GOOD - Structured JSON
logger.info(
"Order placed",
extra={
"event_type": "order_placed",
"user_id": "user-123",
"user_email": "[email protected]",
"order_id": "order-12345",
"total": 99.99,
"currency": "USD",
"items_count": 3
}
)
# Output:
# {"timestamp":"2024-01-15T10:00:00Z","level":"INFO","message":"Order placed",
# "event_type":"order_placed","user_id":"user-123","order_id":"order-12345",
# "total":99.99,"items_count":3}
Python Structured Logging
import json
import logging
from datetime import datetime
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra fields
if hasattr(record, "user_id"):
log_data["user_id"] = record.user_id
if hasattr(record, "order_id"):
log_data["order_id"] = record.order_id
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Usage
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger = logging.getLogger("orders")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("Order placed", extra={"order_id": "123", "user_id": "456"})
Using Structlog
import structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
# Easy structured logging
logger.info("order_placed",
order_id="12345",
user_id="user-456",
total=99.99,
items=["item-1", "item-2"]
)
Correlation IDs
Link related log entries across services and requests:
Implementing Correlation ID
import uuid
from contextvars import ContextVar
from functools import wraps
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def get_correlation_id():
return correlation_id.get()
def set_correlation_id(correlation_id_value=None):
if correlation_id_value is None:
correlation_id_value = str(uuid.uuid4())
correlation_id.set(correlation_id_value)
return correlation_id_value
# Middleware to set correlation ID
class CorrelationMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# Get from request or generate new
cid = None
for key, value in scope.get("headers", []):
if key == b"x-correlation-id":
cid = value.decode()
break
if not cid:
cid = str(uuid.uuid4())
correlation_id.set(cid)
# Add to response headers
async def send_wrapper(message):
if message["type"] == "http.response.start":
message["headers"].append(
(b"x-correlation-id", cid.encode())
)
await send(message)
await self.app(scope, receive, send_wrapper)
# Use in logging
def log_with_correlation(logger, message, **kwargs):
logger.info(
message,
extra={
"correlation_id": get_correlation_id(),
**kwargs
}
)
Distributed Tracing Integration
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def log_with_trace(logger, message, **kwargs):
span = trace.get_current_span()
trace_id = format(span.get_span_context().trace_id, "032x")
span_id = format(span.get_span_context().span_id, "016x")
logger.info(
message,
extra={
"trace_id": trace_id,
"span_id": span_id,
**kwargs
}
)
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("get_order") as span:
span.set_attribute("order_id", order_id)
log_with_trace(logger, "Fetching order", order_id=order_id)
order = order_service.get(order_id)
log_with_trace(logger, "Order fetched", order_id=order_id)
return order
Log Management Pipeline
ELK Stack
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโโโ โโโโโโโโโโโ
โ App โโโโโถโ Beats โโโโโถโElasticsearchโโโโโถโ Kibana โ
โ Logs โ โ(Filebeat)โ โ โ โ โ
โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโ
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/app/*.log
json.keys_under_root: true
json.add_error_key: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "app-logs-%{[fields.environment]}-%{+yyyy.MM.dd}"
processors:
- add_fields:
target: fields
fields:
environment: production
Loki Stack
# docker-compose.yml
services:
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
config:
server:
http_listen_port: 3100
common:
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
promtail:
image: grafana/promtail:latest
volumes:
- /var/log:/var/log
config:
clients:
- url: http://loki:3100/loki/api/v1/push
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
What to Log
Request/Response Logging
import time
from fastapi import Request, Response
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
logger.info(
"Request started",
extra={
"method": request.method,
"path": request.url.path,
"query_params": dict(request.query_params),
"correlation_id": get_correlation_id()
}
)
response = await call_next(request)
duration = time.time() - start_time
logger.info(
"Request completed",
extra={
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": int(duration * 1000),
"correlation_id": get_correlation_id()
}
)
return response
Business Events
class OrderService:
def create_order(self, customer_id, items):
order = Order(customer_id=customer_id, items=items)
# Log business event
logger.info(
"Order created",
extra={
"event_type": "order_created",
"order_id": order.id,
"customer_id": customer_id,
"total": order.total,
"items_count": len(items),
"correlation_id": get_correlation_id()
}
)
return order
def cancel_order(self, order_id, reason):
logger.warning(
"Order cancelled",
extra={
"event_type": "order_cancelled",
"order_id": order_id,
"reason": reason,
"correlation_id": get_correlation_id()
}
)
Errors
try:
result = risky_operation()
except ValueError as e:
logger.error(
"Validation failed",
extra={
"error_type": "validation_error",
"error_message": str(e),
"correlation_id": get_correlation_id(),
# Don't log sensitive data!
}
)
raise
except Exception as e:
logger.critical(
"Unexpected error",
extra={
"error_type": type(e).__name__,
"error_message": str(e),
"traceback": traceback.format_exc(),
"correlation_id": get_correlation_id()
}
)
raise
Log Analysis Patterns
Finding Errors
# Kibana query
level:ERROR OR level:CRITICAL
# With time range
@timestamp:[now-1h TO now] AND level:ERROR
Finding User Activity
# All logs for a user
user_id:user-123
# All logs for an order
order_id:order-456
# Using correlation ID
correlation_id:a1b2c3d4-e5f6
Performance Analysis
# Slow requests
duration_ms:>5000
# By endpoint
avg(duration_ms) by path
Best Practices Checklist
- Use structured logging (JSON)
- Include correlation IDs
- Use appropriate log levels
- Log business events, not every detail
- Never log sensitive data (passwords, tokens, PII)
- Include context (user_id, order_id, etc.)
- Set up log rotation
- Use a log management system
- Create dashboards for common queries
- Set up alerts for errors
Conclusion
Effective logging is essential for production systems:
- Use structured JSON logging
- Include correlation IDs for request tracing
- Log at appropriate levels
- Never log sensitive data
- Use a centralized log management system
- Create dashboards and alerts
Start simple, add correlation IDs, then expand to full observability.
External Resources
Related Articles
- Observability - Full observability stack
- Distributed Tracing - Tracing across services
Comments