Log Aggregation: ELK Stack, Loki, and Structured Logging
TL;DR: This guide covers implementing log aggregation using ELK Stack and Loki. Learn log collection, parsing, structured logging, and building searchable log systems.
Introduction
Centralized logging enables:
- Troubleshooting - Find errors across services
- Audit - Track system access and changes
- Security - Detect suspicious patterns
- Compliance - Meet regulatory requirements
ELK Stack Architecture
Components
| Component | Purpose |
|---|---|
| Elasticsearch | Search and analytics engine |
| Logstash | Log processing pipeline |
| Kibana | Visualization UI |
| Beats | Lightweight log shippers |
Installation
# Docker Compose for ELK Stack
version: '3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
ports:
- "5601:5601"
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
ports:
- "5044:5044"
Structured Logging
JSON Logging in Go
package main
import (
"encoding/json"
"os"
"time"
"github.com/rs/zerolog"
)
type LogEntry struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Service string `json:"service"`
Fields map[string]interface{} `json:"fields,omitempty"`
}
func main() {
logger := zerolog.New(os.Stdout).
With().
Str("service", "my-app").
Timestamp().
Logger()
// Structured logging
logger.Info().
Str("user_id", "123").
Str("action", "login").
Msg("User logged in")
// Error logging
logger.Error().
Err(err).
Str("request_id", "abc123").
Msg("Request failed")
}
JSON Logging in Python
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'service': 'my-app',
'fields': {
'user_id': getattr(record, 'user_id', None),
'request_id': getattr(record, 'request_id', None),
}
}
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Usage
logger = logging.getLogger('my-app')
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
Filebeat Configuration
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/*.log
fields:
service: my-application
fields_under_root: true
- type: docker
containers.ids: '*'
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
output.logstash:
hosts: ["localhost:5044"]
logging.json: true
Logstash Pipeline
# pipeline.conf
input {
beats {
port => 5044
}
}
filter {
# Parse JSON fields
json {
source => "message"
target => "parsed"
}
# Add timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# Parse application logs
if [service] == "my-app" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:logger} - %{GREEDYDATA:message}"
}
}
}
# Add geoip for IP addresses
geoip {
source => "client_ip"
}
# User agent parsing
useragent {
source => "user_agent"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "my-app-%{+YYYY.MM.dd}"
}
}
Loki Setup
Docker Compose
# loki.yml
version: '3'
services:
loki:
image: grafana/loki:2.9.0
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/log:/var/log
- ./promtail.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
Promtail Configuration
# promtail.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: system-logs
__path__: /var/log/*.log
- job_name: application
static_configs:
- targets:
- localhost
labels:
job: application-logs
__path__: /var/log/myapp/*.json
Log Queries
Kibana Queries
# Find all errors
level:error
# Find errors in specific service
level:error AND service:my-app
# Find errors in time range
@timestamp:[now-1h TO now] AND level:error
# Find specific user
user_id:12345
# Complex query
service:api AND status:500 AND NOT user_id:*
Loki LogQL
# Find all error logs
{job="application"} |= "error"
# Parse JSON log
{job="application"} | json | level="error"
# Extract and filter
{job="application"} | json | duration_ms > 1000
# Aggregate
sum(rate({job="application"}[5m])) by (level)
Log-Based Metrics
Elasticsearch Aggregation
{
"size": 0,
"query": {
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
},
"aggs": {
"errors_by_service": {
"terms": {
"field": "service.keyword",
"size": 10
}
},
"error_rate": {
"filter": {
"term": {
"level": "error"
}
}
}
}
}
Loki Metrics
# Error rate by service
sum(rate({job="application"} | json | level="error"[5m])) by (service)
# Request rate
sum(rate({job="application"}[5m])) by (status)
# Average response size
avg(rate({job="application"} | json | response_size[5m])) by (endpoint)
Conclusion
Log aggregation provides:
- Centralized search - Find logs across all services
- Structured data - Parse and query efficiently
- Metrics derivation - Create metrics from logs
- Compliance - Meet audit requirements
- Troubleshooting - Debug issues quickly
Comments