Skip to main content
โšก Calmops

Log Aggregation: ELK Stack, Loki, and Structured Logging

Log Aggregation: ELK Stack, Loki, and Structured Logging

TL;DR: This guide covers implementing log aggregation using ELK Stack and Loki. Learn log collection, parsing, structured logging, and building searchable log systems.


Introduction

Centralized logging enables:

  • Troubleshooting - Find errors across services
  • Audit - Track system access and changes
  • Security - Detect suspicious patterns
  • Compliance - Meet regulatory requirements

ELK Stack Architecture

Components

Component Purpose
Elasticsearch Search and analytics engine
Logstash Log processing pipeline
Kibana Visualization UI
Beats Lightweight log shippers

Installation

# Docker Compose for ELK Stack
version: '3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
    ports:
      - "9200:9200"
      
  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    ports:
      - "5601:5601"
      
  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    ports:
      - "5044:5044"

Structured Logging

JSON Logging in Go

package main

import (
    "encoding/json"
    "os"
    "time"
    
    "github.com/rs/zerolog"
)

type LogEntry struct {
    Timestamp  string                 `json:"timestamp"`
    Level     string                 `json:"level"`
    Message   string                 `json:"message"`
    Service   string                 `json:"service"`
    Fields    map[string]interface{} `json:"fields,omitempty"`
}

func main() {
    logger := zerolog.New(os.Stdout).
        With().
        Str("service", "my-app").
        Timestamp().
        Logger()
    
    // Structured logging
    logger.Info().
        Str("user_id", "123").
        Str("action", "login").
        Msg("User logged in")
    
    // Error logging
    logger.Error().
        Err(err).
        Str("request_id", "abc123").
        Msg("Request failed")
}

JSON Logging in Python

import json
import logging
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': record.getMessage(),
            'service': 'my-app',
            'fields': {
                'user_id': getattr(record, 'user_id', None),
                'request_id': getattr(record, 'request_id', None),
            }
        }
        
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)
            
        return json.dumps(log_data)

# Usage
logger = logging.getLogger('my-app')
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)

Filebeat Configuration

# filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/*.log
    fields:
      service: my-application
    fields_under_root: true

  - type: docker
    containers.ids: '*'
    processors:
      - add_docker_metadata:
          host: "unix:///var/run/docker.sock"

output.logstash:
  hosts: ["localhost:5044"]

logging.json: true

Logstash Pipeline

# pipeline.conf
input {
  beats {
    port => 5044
  }
}

filter {
    # Parse JSON fields
    json {
        source => "message"
        target => "parsed"
    }
    
    # Add timestamp
    date {
        match => ["timestamp", "ISO8601"]
        target => "@timestamp"
    }
    
    # Parse application logs
    if [service] == "my-app" {
        grok {
            match => { 
                "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:logger} - %{GREEDYDATA:message}" 
            }
        }
    }
    
    # Add geoip for IP addresses
    geoip {
        source => "client_ip"
    }
    
    # User agent parsing
    useragent {
        source => "user_agent"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "my-app-%{+YYYY.MM.dd}"
    }
}

Loki Setup

Docker Compose

# loki.yml
version: '3'

services:
  loki:
    image: grafana/loki:2.9.0
    ports:
      - "3100:3100"
    command: -config.file=/etc/loki/local-config.yaml
    
  promtail:
    image: grafana/promtail:2.9.0
    volumes:
      - /var/log:/var/log
      - ./promtail.yml:/etc/promtail/config.yml
    command: -config.file=/etc/promtail/config.yml
    
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"

Promtail Configuration

# promtail.yml
server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: system
    static_configs:
      - targets:
          - localhost
        labels:
          job: system-logs
          __path__: /var/log/*.log
          
  - job_name: application
    static_configs:
      - targets:
          - localhost
        labels:
          job: application-logs
          __path__: /var/log/myapp/*.json

Log Queries

Kibana Queries

# Find all errors
level:error

# Find errors in specific service
level:error AND service:my-app

# Find errors in time range
@timestamp:[now-1h TO now] AND level:error

# Find specific user
user_id:12345

# Complex query
service:api AND status:500 AND NOT user_id:*

Loki LogQL

# Find all error logs
{job="application"} |= "error"

# Parse JSON log
{job="application"} | json | level="error"

# Extract and filter
{job="application"} | json | duration_ms > 1000

# Aggregate
sum(rate({job="application"}[5m])) by (level)

Log-Based Metrics

Elasticsearch Aggregation

{
  "size": 0,
  "query": {
    "range": {
      "@timestamp": {
        "gte": "now-1h"
      }
    }
  },
  "aggs": {
    "errors_by_service": {
      "terms": {
        "field": "service.keyword",
        "size": 10
      }
    },
    "error_rate": {
      "filter": {
        "term": {
          "level": "error"
        }
      }
    }
  }
}

Loki Metrics

# Error rate by service
sum(rate({job="application"} | json | level="error"[5m])) by (service)

# Request rate
sum(rate({job="application"}[5m])) by (status)

# Average response size
avg(rate({job="application"} | json | response_size[5m])) by (endpoint)

Conclusion

Log aggregation provides:

  1. Centralized search - Find logs across all services
  2. Structured data - Parse and query efficiently
  3. Metrics derivation - Create metrics from logs
  4. Compliance - Meet audit requirements
  5. Troubleshooting - Debug issues quickly

External Resources


Comments