Logging: ELK Stack and Splunk for Python Applications
Centralized logging is essential for debugging, monitoring, and auditing production systems. The ELK Stack (Elasticsearch, Logstash, Kibana) and Splunk provide powerful platforms for log aggregation and analysis. This guide covers practical patterns for implementing centralized logging.
Structured Logging
JSON Logging
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for easy parsing."""
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# Add exception info if present
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
# Add extra fields
if hasattr(record, 'extra_fields'):
log_data.update(record.extra_fields)
return json.dumps(log_data)
def setup_json_logging(name, log_file=None):
"""Setup JSON logging."""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)
# File handler
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
return logger
# Usage
logger = setup_json_logging('myapp', 'app.log')
logger.info("Application started")
# With extra fields
record = logging.LogRecord(
name='myapp',
level=logging.INFO,
pathname='',
lineno=0,
msg='User login',
args=(),
exc_info=None
)
record.extra_fields = {'user_id': 123, 'ip': '192.168.1.1'}
logger.handle(record)
Structured Logging with structlog
import structlog
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Usage
logger.info("user_login", user_id=123, ip="192.168.1.1")
logger.error("payment_failed", user_id=456, amount=99.99, reason="insufficient_funds")
ELK Stack Integration
Sending Logs to Elasticsearch
import logging
from pythonjsonlogger import jsonlogger
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ElasticsearchHandler(logging.Handler):
"""Send logs to Elasticsearch."""
def __init__(self, hosts, index_name='logs'):
super().__init__()
self.es = Elasticsearch(hosts)
self.index_name = index_name
def emit(self, record):
try:
log_entry = self.format(record)
# Send to Elasticsearch
self.es.index(
index=f"{self.index_name}-{record.created}",
body=log_entry
)
except Exception:
self.handleError(record)
def setup_elasticsearch_logging(hosts=['localhost:9200']):
"""Setup logging to Elasticsearch."""
logger = logging.getLogger('elasticsearch_logger')
logger.setLevel(logging.DEBUG)
# JSON formatter
formatter = jsonlogger.JsonFormatter()
# Elasticsearch handler
es_handler = ElasticsearchHandler(hosts)
es_handler.setFormatter(formatter)
logger.addHandler(es_handler)
return logger
# Usage
logger = setup_elasticsearch_logging()
logger.info("Application event", extra={'user_id': 123})
Logstash Configuration
# logstash.conf
input {
file {
path => "/var/log/myapp/*.log"
start_position => "beginning"
codec => json
}
}
filter {
if [type] == "python_app" {
mutate {
add_field => { "[@metadata][index_name]" => "python-app-%{+YYYY.MM.dd}" }
}
# Parse timestamps
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
# Extract fields
if [message] =~ /error/ {
mutate {
add_field => { "severity" => "error" }
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index_name]}"
}
}
Kibana Queries
from elasticsearch import Elasticsearch
class KibanaQueries:
"""Common Kibana queries."""
def __init__(self, es_host='localhost:9200'):
self.es = Elasticsearch([es_host])
def get_error_logs(self, hours=1):
"""Get error logs from last N hours."""
query = {
"query": {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"@timestamp": {"gte": f"now-{hours}h"}}}
]
}
},
"size": 100
}
results = self.es.search(index="logs-*", body=query)
return results['hits']['hits']
def get_logs_by_user(self, user_id, hours=24):
"""Get logs for specific user."""
query = {
"query": {
"bool": {
"must": [
{"match": {"user_id": user_id}},
{"range": {"@timestamp": {"gte": f"now-{hours}h"}}}
]
}
}
}
results = self.es.search(index="logs-*", body=query)
return results['hits']['hits']
def get_performance_stats(self, hours=1):
"""Get performance statistics."""
query = {
"aggs": {
"response_times": {
"stats": {"field": "response_time_ms"}
},
"errors_per_endpoint": {
"terms": {
"field": "endpoint",
"size": 10
}
}
},
"query": {
"range": {"@timestamp": {"gte": f"now-{hours}h"}}
}
}
results = self.es.search(index="logs-*", body=query)
return results['aggregations']
# Usage
kibana = KibanaQueries()
errors = kibana.get_error_logs(hours=1)
print(f"Found {len(errors)} errors")
Splunk Integration
Sending Logs to Splunk
import logging
from splunk_http_event_collector import http_event_collector
class SplunkHandler(logging.Handler):
"""Send logs to Splunk."""
def __init__(self, host, port, token, source='python_app'):
super().__init__()
self.hec = http_event_collector(
token=token,
host=host,
port=port
)
self.source = source
def emit(self, record):
try:
log_entry = self.format(record)
self.hec.batchEvent({
'source': self.source,
'sourcetype': '_json',
'event': log_entry
})
except Exception:
self.handleError(record)
def setup_splunk_logging(host, port, token):
"""Setup logging to Splunk."""
logger = logging.getLogger('splunk_logger')
logger.setLevel(logging.DEBUG)
formatter = jsonlogger.JsonFormatter()
splunk_handler = SplunkHandler(host, port, token)
splunk_handler.setFormatter(formatter)
logger.addHandler(splunk_handler)
return logger
# Usage
logger = setup_splunk_logging(
host='splunk.example.com',
port=8088,
token='your-hec-token'
)
logger.info("Application event")
Splunk Queries
from splunk_sdk import client, results
class SplunkQueries:
"""Common Splunk queries."""
def __init__(self, host, port, username, password):
self.service = client.connect(
host=host,
port=port,
username=username,
password=password
)
def search_errors(self, hours=1):
"""Search for errors."""
query = f'source=python_app level=ERROR earliest=-{hours}h'
job = self.service.jobs.create(query)
while not job.is_done():
pass
return results.ResultsReader(job.results())
def get_top_errors(self, limit=10):
"""Get top errors."""
query = f'source=python_app level=ERROR | stats count by message | sort - count | head {limit}'
job = self.service.jobs.create(query)
while not job.is_done():
pass
return results.ResultsReader(job.results())
def get_performance_metrics(self):
"""Get performance metrics."""
query = 'source=python_app | stats avg(response_time_ms) as avg_time, max(response_time_ms) as max_time, pct95(response_time_ms) as p95_time by endpoint'
job = self.service.jobs.create(query)
while not job.is_done():
pass
return results.ResultsReader(job.results())
# Usage
# splunk = SplunkQueries('localhost', 8089, 'admin', 'password')
# errors = splunk.search_errors(hours=1)
Advanced Logging Patterns
Context-Aware Logging
import contextvars
import logging
# Context variables
request_id = contextvars.ContextVar('request_id', default=None)
user_id = contextvars.ContextVar('user_id', default=None)
class ContextFilter(logging.Filter):
"""Add context to log records."""
def filter(self, record):
record.request_id = request_id.get()
record.user_id = user_id.get()
return True
def setup_context_logging():
"""Setup logging with context."""
logger = logging.getLogger('context_logger')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - '
'[%(request_id)s] [%(user_id)s] - %(message)s'
)
handler.setFormatter(formatter)
context_filter = ContextFilter()
handler.addFilter(context_filter)
logger.addHandler(handler)
return logger
# Usage
logger = setup_context_logging()
# Set context
request_id.set('req-123')
user_id.set('user-456')
logger.info("Processing request")
Performance Logging
import time
import logging
from functools import wraps
logger = logging.getLogger(__name__)
def log_performance(func):
"""Decorator to log function performance."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start
logger.info(
f"Function executed",
extra={
'function': func.__name__,
'duration_ms': duration * 1000,
'status': 'success'
}
)
return wrapper
@log_performance
def slow_operation():
time.sleep(0.5)
return "Done"
slow_operation()
Audit Logging
import logging
from datetime import datetime
class AuditLogger:
"""Log audit events."""
def __init__(self):
self.logger = logging.getLogger('audit')
self.logger.setLevel(logging.INFO)
def log_user_action(self, user_id, action, resource, details=None):
"""Log user action."""
self.logger.info(
f"User action: {action}",
extra={
'user_id': user_id,
'action': action,
'resource': resource,
'timestamp': datetime.utcnow().isoformat(),
'details': details or {}
}
)
def log_data_access(self, user_id, data_type, access_type):
"""Log data access."""
self.logger.info(
f"Data access: {access_type}",
extra={
'user_id': user_id,
'data_type': data_type,
'access_type': access_type,
'timestamp': datetime.utcnow().isoformat()
}
)
def log_security_event(self, event_type, severity, details):
"""Log security event."""
self.logger.warning(
f"Security event: {event_type}",
extra={
'event_type': event_type,
'severity': severity,
'details': details,
'timestamp': datetime.utcnow().isoformat()
}
)
# Usage
audit = AuditLogger()
audit.log_user_action(user_id=123, action='login', resource='auth')
audit.log_security_event(
event_type='failed_login_attempt',
severity='medium',
details={'ip': '192.168.1.1', 'attempts': 3}
)
Common Pitfalls and Best Practices
โ Bad: Logging Sensitive Data
# DON'T: Log passwords or tokens
logger.info(f"User login with password: {password}")
โ Good: Sanitize Sensitive Data
# DO: Sanitize sensitive information
def sanitize_log_data(data):
if 'password' in data:
data['password'] = '***'
if 'token' in data:
data['token'] = '***'
return data
logger.info("User login", extra=sanitize_log_data(user_data))
โ Bad: Unstructured Logs
# DON'T: Use unstructured log messages
logger.info(f"User {user_id} performed action {action} on {resource}")
โ Good: Structured Logs
# DO: Use structured logging
logger.info("user_action", extra={
'user_id': user_id,
'action': action,
'resource': resource
})
โ Bad: No Log Levels
# DON'T: Log everything at INFO level
logger.info("Debug info")
logger.info("Warning condition")
logger.info("Error occurred")
โ Good: Use Appropriate Log Levels
# DO: Use appropriate log levels
logger.debug("Debug info")
logger.warning("Warning condition")
logger.error("Error occurred")
Summary
Effective centralized logging requires:
- Structured logging with JSON format
- ELK Stack for log aggregation and analysis
- Splunk as alternative for enterprise logging
- Context-aware logging for request tracking
- Performance logging for optimization
- Audit logging for compliance
- Sensitive data sanitization for security
- Appropriate log levels for filtering
These patterns ensure comprehensive, searchable, and secure logging in production systems.
Comments