Skip to main content

Log Aggregation: ELK Stack, Loki, Splunk

Created: February 18, 2026 4 min read

Introduction

Centralized logging is foundational for observability. From debugging production issues to security analytics, having a robust log aggregation system is essential.

Key Statistics:

  • 70% of debugging time involves logs
  • ELK powers 60% of logging deployments
  • Loki reduces log storage costs by 80% vs traditional
  • 85% of security incidents detected via log analysis

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Log Aggregation Pipeline                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐   │
│  │ Sources │───▶│ Collect │───▶│ Process │───▶│  Store  │   │
│  │ (App,   │    │ (File,  │    │ (Parse, │    │ (ES,    │   │
│  │  K8s)   │    │  API)   │    │  Filter)│    │  Loki)  │   │
│  └─────────┘    └─────────┘    └─────────┘    └────┬────┘   │
│                                                       │         │
│                                                       ▼         │
│                                              ┌─────────────┐   │
│                                              │  Visualize  │   │
│                                              │ (Kibana,    │   │
│                                              │  Grafana)   │   │
│                                              └─────────────┘   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

ELK Stack

Elasticsearch

# Elasticsearch cluster
cluster.name: production-logs
node.name: log-es-01
network.host: 0.0.0.0
discovery.seed_hosts: ["log-es-01", "log-es-02", "log-es-03"]

# Memory settings
indices.memory.index_buffer_size: 20%
search.max_buckets: 10000

# Security
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true

Logstash Configuration

# logstash pipeline
input {
  beats {
    port => 5044
  }
  
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["app-logs", "access-logs"]
    codec => json
  }
}

filter {
  # Parse JSON logs
  if [message] =~ /^{.*}$/ {
    json {
      source => "message"
    }
  }
  
  # Add timestamp
  date {
    match => ["timestamp", "ISO8601", "MMM dd HH:mm:ss"]
    target => "@timestamp"
  }
  
  # Parse nginx access logs
  if [type] == "nginx" {
    grok {
      match => { 
        "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status:int} %{NUMBER:bytes:int} "%{DATA:referrer}" "%{DATA:agent}"'
      }
    }
  }
  
  # Add geoip for IP
  geoip {
    source => "client_ip"
    target => "geoip"
  }
  
  # Filter sensitive data
  mutate {
    gsub => [
      "password", "[REDACTED]", "***",
      "credit_card", "[REDACTED]", "***"
    ]
  }
}

output {
  elasticsearch {
    hosts => ["es-node-1:9200", "es-node-2:9200"]
    index => "logs-%{[@metadata][beat]}-%{+YYYY.MM.dd}"
  }
}

Kibana Dashboard

{
  "attributes": {
    "title": "Application Logs Dashboard",
    "description": "Centralized logging dashboard",
    "panels": [
      {
        "id": "log-count",
        "type": "visualization",
        "visualization": {
          "title": "Logs per Minute",
          "type": "line",
          "aggs": [
            {"type": "count", "id": "1"}
          ],
          "axis_formats": {
            "left": "number"
          }
        }
      },
      {
        "id": "error-rate",
        "type": "visualization",
        "visualization": {
          "title": "Error Rate",
          "type": "line",
          "aggs": [
            {"type": "avg", "field": "status", "id": "1"}
          ]
        }
      }
    ]
  }
}

Loki

Configuration

# loki-config.yaml
auth_enabled: false

server:
  http_listen_port: 3100
  grpc_listen_port: 9096

common:
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks
      rules_directory: /loki/rules
  replication_factor: 1
  ring:
    instance_addr: 127.0.0.1
    kvstore:
      store: inmemory

schema_config:
  configs:
    - from: 2026-01-01
      store: boltdb-shipper
      object_store: filesystem
      schema: v11
      index:
        prefix: index_
        period: 24h

limits_config:
  reject_old_samples: true
  reject_old_samples_max_age: 168h
  ingestion_rate_mb: 50
  ingestion_burst_size_mb: 100

scrape_configs:
  - job_name: kubernetes
    kubernetes_sd_configs:
      - role: pod
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_name]
        action: replace
        target_label: pod

Promtail

# promtail-config.yaml
server:
  http_listen_port: 9080
  grpc_listen_port: 9081

positions:
  filename: /var/lib/promtail/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: system
    static_configs:
      - targets:
          - localhost
        labels:
          job: systemlogs
          host: localhost
          __path__: /var/log/*.log
  
  - job_name: kubernetes
    kubernetes_sd_configs:
      - role: pod
    pipeline_stages:
      - docker: {}
      - regex:
          expression: '^(?P<time>.*?) (?P<stream>stdout|stderr) (?P<flags>.) (?P<message>.*)$'
      - labels:
          stream:
          flags:
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_name]
        action: replace
        target_label: pod
      - source_labels: [__meta_kubernetes_namespace]
        action: replace
        target_label: namespace

Splunk

Configuration

#!/usr/bin/env python3
"""Splunk SDK usage."""

from splunklib import client

class SplunkLogger:
    """Send logs to Splunk."""
    
    def __init__(self, host, port=8089, username=None, password=None):
        self.service = client.connect(
            host=host,
            port=port,
            username=username,
            password=password
        )
    
    def send_event(self, event, index='main', sourcetype='json'):
        """Send log event to Splunk."""
        
        # Create receiving endpoint
        endpoint = service.inputs
        
        # Send event
        return endpoint.create(
            name='',
            kind='raw',
            index=index,
            sourcetype=sourcetype,
            source=__file__,
            raw=event
        )
    
    def search(self, query, earliest='-24h', latest='now'):
        """Search logs."""
        
        job = service.jobs.create(query, 
                                   earliest_time=earliest,
                                   latest_time=latest)
        
        # Wait for results
        while not job.is_done():
            time.sleep(0.2)
        
        return list(job.results())

Best Practices

# Recommended log levels by environment
log_levels:
  development: DEBUG
  staging: INFO  
  production: INFO
  critical: WARN

# Retention policies
retention:
  hot:
    duration: 7d
    storage: SSD
  warm:
    duration: 30d
    storage: HDD
  cold:
    duration: 90d
    storage: Object storage (S3/GCS)
  archive:
    duration: 1y
    storage: Glacier

# Index patterns
index_patterns:
  - pattern: "logs-app-*"
    retention: 30d
  - pattern: "logs-access-*"
    retention: 90d
  - pattern: "logs-audit-*"
    retention: 1y

External Resources


Resources

Comments

Share this article

Scan to read on mobile