Skip to main content
โšก Calmops

Logging: ELK Stack and Splunk for Python Applications

Logging: ELK Stack and Splunk for Python Applications

Centralized logging is essential for debugging, monitoring, and auditing production systems. The ELK Stack (Elasticsearch, Logstash, Kibana) and Splunk provide powerful platforms for log aggregation and analysis. This guide covers practical patterns for implementing centralized logging.

Structured Logging

JSON Logging

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    """Format logs as JSON for easy parsing."""
    
    def format(self, record):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # Add exception info if present
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)
        
        # Add extra fields
        if hasattr(record, 'extra_fields'):
            log_data.update(record.extra_fields)
        
        return json.dumps(log_data)

def setup_json_logging(name, log_file=None):
    """Setup JSON logging."""
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    
    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(JSONFormatter())
    logger.addHandler(console_handler)
    
    # File handler
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(JSONFormatter())
        logger.addHandler(file_handler)
    
    return logger

# Usage
logger = setup_json_logging('myapp', 'app.log')
logger.info("Application started")

# With extra fields
record = logging.LogRecord(
    name='myapp',
    level=logging.INFO,
    pathname='',
    lineno=0,
    msg='User login',
    args=(),
    exc_info=None
)
record.extra_fields = {'user_id': 123, 'ip': '192.168.1.1'}
logger.handle(record)

Structured Logging with structlog

import structlog

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Usage
logger.info("user_login", user_id=123, ip="192.168.1.1")
logger.error("payment_failed", user_id=456, amount=99.99, reason="insufficient_funds")

ELK Stack Integration

Sending Logs to Elasticsearch

import logging
from pythonjsonlogger import jsonlogger
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

class ElasticsearchHandler(logging.Handler):
    """Send logs to Elasticsearch."""
    
    def __init__(self, hosts, index_name='logs'):
        super().__init__()
        self.es = Elasticsearch(hosts)
        self.index_name = index_name
    
    def emit(self, record):
        try:
            log_entry = self.format(record)
            
            # Send to Elasticsearch
            self.es.index(
                index=f"{self.index_name}-{record.created}",
                body=log_entry
            )
        except Exception:
            self.handleError(record)

def setup_elasticsearch_logging(hosts=['localhost:9200']):
    """Setup logging to Elasticsearch."""
    logger = logging.getLogger('elasticsearch_logger')
    logger.setLevel(logging.DEBUG)
    
    # JSON formatter
    formatter = jsonlogger.JsonFormatter()
    
    # Elasticsearch handler
    es_handler = ElasticsearchHandler(hosts)
    es_handler.setFormatter(formatter)
    logger.addHandler(es_handler)
    
    return logger

# Usage
logger = setup_elasticsearch_logging()
logger.info("Application event", extra={'user_id': 123})

Logstash Configuration

# logstash.conf
input {
  file {
    path => "/var/log/myapp/*.log"
    start_position => "beginning"
    codec => json
  }
}

filter {
  if [type] == "python_app" {
    mutate {
      add_field => { "[@metadata][index_name]" => "python-app-%{+YYYY.MM.dd}" }
    }
    
    # Parse timestamps
    date {
      match => [ "timestamp", "ISO8601" ]
      target => "@timestamp"
    }
    
    # Extract fields
    if [message] =~ /error/ {
      mutate {
        add_field => { "severity" => "error" }
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index_name]}"
  }
}

Kibana Queries

from elasticsearch import Elasticsearch

class KibanaQueries:
    """Common Kibana queries."""
    
    def __init__(self, es_host='localhost:9200'):
        self.es = Elasticsearch([es_host])
    
    def get_error_logs(self, hours=1):
        """Get error logs from last N hours."""
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"match": {"level": "ERROR"}},
                        {"range": {"@timestamp": {"gte": f"now-{hours}h"}}}
                    ]
                }
            },
            "size": 100
        }
        
        results = self.es.search(index="logs-*", body=query)
        return results['hits']['hits']
    
    def get_logs_by_user(self, user_id, hours=24):
        """Get logs for specific user."""
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"match": {"user_id": user_id}},
                        {"range": {"@timestamp": {"gte": f"now-{hours}h"}}}
                    ]
                }
            }
        }
        
        results = self.es.search(index="logs-*", body=query)
        return results['hits']['hits']
    
    def get_performance_stats(self, hours=1):
        """Get performance statistics."""
        query = {
            "aggs": {
                "response_times": {
                    "stats": {"field": "response_time_ms"}
                },
                "errors_per_endpoint": {
                    "terms": {
                        "field": "endpoint",
                        "size": 10
                    }
                }
            },
            "query": {
                "range": {"@timestamp": {"gte": f"now-{hours}h"}}
            }
        }
        
        results = self.es.search(index="logs-*", body=query)
        return results['aggregations']

# Usage
kibana = KibanaQueries()
errors = kibana.get_error_logs(hours=1)
print(f"Found {len(errors)} errors")

Splunk Integration

Sending Logs to Splunk

import logging
from splunk_http_event_collector import http_event_collector

class SplunkHandler(logging.Handler):
    """Send logs to Splunk."""
    
    def __init__(self, host, port, token, source='python_app'):
        super().__init__()
        self.hec = http_event_collector(
            token=token,
            host=host,
            port=port
        )
        self.source = source
    
    def emit(self, record):
        try:
            log_entry = self.format(record)
            
            self.hec.batchEvent({
                'source': self.source,
                'sourcetype': '_json',
                'event': log_entry
            })
        except Exception:
            self.handleError(record)

def setup_splunk_logging(host, port, token):
    """Setup logging to Splunk."""
    logger = logging.getLogger('splunk_logger')
    logger.setLevel(logging.DEBUG)
    
    formatter = jsonlogger.JsonFormatter()
    
    splunk_handler = SplunkHandler(host, port, token)
    splunk_handler.setFormatter(formatter)
    logger.addHandler(splunk_handler)
    
    return logger

# Usage
logger = setup_splunk_logging(
    host='splunk.example.com',
    port=8088,
    token='your-hec-token'
)
logger.info("Application event")

Splunk Queries

from splunk_sdk import client, results

class SplunkQueries:
    """Common Splunk queries."""
    
    def __init__(self, host, port, username, password):
        self.service = client.connect(
            host=host,
            port=port,
            username=username,
            password=password
        )
    
    def search_errors(self, hours=1):
        """Search for errors."""
        query = f'source=python_app level=ERROR earliest=-{hours}h'
        
        job = self.service.jobs.create(query)
        
        while not job.is_done():
            pass
        
        return results.ResultsReader(job.results())
    
    def get_top_errors(self, limit=10):
        """Get top errors."""
        query = f'source=python_app level=ERROR | stats count by message | sort - count | head {limit}'
        
        job = self.service.jobs.create(query)
        
        while not job.is_done():
            pass
        
        return results.ResultsReader(job.results())
    
    def get_performance_metrics(self):
        """Get performance metrics."""
        query = 'source=python_app | stats avg(response_time_ms) as avg_time, max(response_time_ms) as max_time, pct95(response_time_ms) as p95_time by endpoint'
        
        job = self.service.jobs.create(query)
        
        while not job.is_done():
            pass
        
        return results.ResultsReader(job.results())

# Usage
# splunk = SplunkQueries('localhost', 8089, 'admin', 'password')
# errors = splunk.search_errors(hours=1)

Advanced Logging Patterns

Context-Aware Logging

import contextvars
import logging

# Context variables
request_id = contextvars.ContextVar('request_id', default=None)
user_id = contextvars.ContextVar('user_id', default=None)

class ContextFilter(logging.Filter):
    """Add context to log records."""
    
    def filter(self, record):
        record.request_id = request_id.get()
        record.user_id = user_id.get()
        return True

def setup_context_logging():
    """Setup logging with context."""
    logger = logging.getLogger('context_logger')
    logger.setLevel(logging.DEBUG)
    
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - '
        '[%(request_id)s] [%(user_id)s] - %(message)s'
    )
    handler.setFormatter(formatter)
    
    context_filter = ContextFilter()
    handler.addFilter(context_filter)
    
    logger.addHandler(handler)
    return logger

# Usage
logger = setup_context_logging()

# Set context
request_id.set('req-123')
user_id.set('user-456')

logger.info("Processing request")

Performance Logging

import time
import logging
from functools import wraps

logger = logging.getLogger(__name__)

def log_performance(func):
    """Decorator to log function performance."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        
        try:
            result = func(*args, **kwargs)
            return result
        finally:
            duration = time.time() - start
            
            logger.info(
                f"Function executed",
                extra={
                    'function': func.__name__,
                    'duration_ms': duration * 1000,
                    'status': 'success'
                }
            )
    
    return wrapper

@log_performance
def slow_operation():
    time.sleep(0.5)
    return "Done"

slow_operation()

Audit Logging

import logging
from datetime import datetime

class AuditLogger:
    """Log audit events."""
    
    def __init__(self):
        self.logger = logging.getLogger('audit')
        self.logger.setLevel(logging.INFO)
    
    def log_user_action(self, user_id, action, resource, details=None):
        """Log user action."""
        self.logger.info(
            f"User action: {action}",
            extra={
                'user_id': user_id,
                'action': action,
                'resource': resource,
                'timestamp': datetime.utcnow().isoformat(),
                'details': details or {}
            }
        )
    
    def log_data_access(self, user_id, data_type, access_type):
        """Log data access."""
        self.logger.info(
            f"Data access: {access_type}",
            extra={
                'user_id': user_id,
                'data_type': data_type,
                'access_type': access_type,
                'timestamp': datetime.utcnow().isoformat()
            }
        )
    
    def log_security_event(self, event_type, severity, details):
        """Log security event."""
        self.logger.warning(
            f"Security event: {event_type}",
            extra={
                'event_type': event_type,
                'severity': severity,
                'details': details,
                'timestamp': datetime.utcnow().isoformat()
            }
        )

# Usage
audit = AuditLogger()
audit.log_user_action(user_id=123, action='login', resource='auth')
audit.log_security_event(
    event_type='failed_login_attempt',
    severity='medium',
    details={'ip': '192.168.1.1', 'attempts': 3}
)

Common Pitfalls and Best Practices

โŒ Bad: Logging Sensitive Data

# DON'T: Log passwords or tokens
logger.info(f"User login with password: {password}")

โœ… Good: Sanitize Sensitive Data

# DO: Sanitize sensitive information
def sanitize_log_data(data):
    if 'password' in data:
        data['password'] = '***'
    if 'token' in data:
        data['token'] = '***'
    return data

logger.info("User login", extra=sanitize_log_data(user_data))

โŒ Bad: Unstructured Logs

# DON'T: Use unstructured log messages
logger.info(f"User {user_id} performed action {action} on {resource}")

โœ… Good: Structured Logs

# DO: Use structured logging
logger.info("user_action", extra={
    'user_id': user_id,
    'action': action,
    'resource': resource
})

โŒ Bad: No Log Levels

# DON'T: Log everything at INFO level
logger.info("Debug info")
logger.info("Warning condition")
logger.info("Error occurred")

โœ… Good: Use Appropriate Log Levels

# DO: Use appropriate log levels
logger.debug("Debug info")
logger.warning("Warning condition")
logger.error("Error occurred")

Summary

Effective centralized logging requires:

  1. Structured logging with JSON format
  2. ELK Stack for log aggregation and analysis
  3. Splunk as alternative for enterprise logging
  4. Context-aware logging for request tracking
  5. Performance logging for optimization
  6. Audit logging for compliance
  7. Sensitive data sanitization for security
  8. Appropriate log levels for filtering

These patterns ensure comprehensive, searchable, and secure logging in production systems.

Comments