SIEM (Security Information and Event Management) is the backbone of security operations. This guide covers SIEM components, log management, and building security monitoring.
Understanding SIEM
SIEM Components
siem_components:
- name: "Log Collection"
description: "Gather logs from all sources"
- name: "Log Aggregation"
description: "Normalize and store logs"
- name: "Correlation"
description: "Detect attack patterns"
- name: "Alerting"
description: "Notify on threats"
- name: "Dashboard"
description: "Visualize security state"
- name: "Forensics"
description: "Investigate incidents"
Log Sources
critical_logs:
network:
- "Firewall logs"
- "IDS/IPS alerts"
- "VPN connections"
system:
- "Authentication logs"
- "Process execution"
- "File access"
application:
- "HTTP access/error logs"
- "API requests"
- "Database queries"
security:
- "WAF alerts"
- "AV detections"
- "DDoS alerts"
Log Management
Centralized Logging
# Python centralized logging
import logging
import json
from datetime import datetime
class SecureLogHandler(logging.Handler):
"""Send logs to central system"""
def __init__(self, endpoint, api_key):
super().__init__()
self.endpoint = endpoint
self.api_key = api_key
def emit(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'host': 'myhost',
'service': 'myservice'
}
# Include exception info
if record.exc_info:
log_entry['exception'] = self.format(record)
# Send to log aggregator
requests.post(
self.endpoint,
json=log_entry,
headers={'Authorization': f'Bearer {self.api_key}'}
)
# Usage
logger = logging.getLogger('security')
logger.addHandler(SecureLogHandler('https://logs.example.com/api', 'key'))
Log Format (JSON)
{
"timestamp": "2025-01-15T10:30:00Z",
"level": "INFO",
"host": "web-server-1",
"service": "nginx",
"event_type": "access_log",
"remote_ip": "192.168.1.1",
"method": "GET",
"path": "/api/users",
"status": 200,
"response_time": 45
}
SIEM Platforms
Elasticsearch + Kibana + Logstash (ELK Stack)
# ELK Stack components
components:
- name: "Beats"
role: "Lightweight shippers"
- name: "Logstash"
role: "Parse and transform logs"
- name: "Elasticsearch"
role: "Search and analytics"
- name: "Kibana"
role: "Visualization"
# Logstash pipeline
input {
beats {
port => 5044
}
}
filter {
# Parse JSON logs
json {
source => "message"
}
# Add GeoIP
geoip {
source => "client_ip"
target => "geoip"
}
# Timestamp
date {
match => ["timestamp", "ISO8601"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "security-logs-%{+YYYY.MM.dd}"
}
}
Splunk
# Splunk configurations
inputs_conf:
[monitor:///var/log]
disabled = false
index = main
sourcetype = syslog
props_conf:
[syslog]
TRANSFORM-rmdrops = drop-null
TIME_PREFIX = ^
MAX_TIMESTAMP_LOOKAHEAD = 32
Detection Rules
Sigma Rules
# Sigma rule example - Failed SSH login
title: Failed SSH Login
id: 1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d
status: stable
description: Detects failed SSH login attempts
logsource:
category: process_creation
product: linux
detection:
selection:
Image|endswith: '/usr/bin/ssh'
ParentImage: '/usr/sbin/sshd'
condition: selection
filters:
- ExitCode: 0
fields:
- Computer
- User
- CommandLine
- ExitCode
level: medium
tags:
- attack.initial_access
- attack.t1110
Correlation Rules
# Correlation rule - Multiple failed logins
title: Brute Force Detection
description: Detect possible brute force attack
correlation:
- search: "index=auth action=failure | stats count by user"
condition: count > 5
window: 5m
- search: "index=auth action=success | stats count by user"
condition: count > 0
window: 1m
after: trigger
action:
- create_alert
- send_to_siem
Alerting
Alert Configuration
# Alert components
alert_elements:
- name: "Trigger"
description: "When to fire"
- name: "Severity"
description: "Critical, High, Medium, Low"
- name: "Action"
description: "Email, Slack, PagerDuty"
- name: "Detail"
description: "What to include in alert"
# Alert rule example
class AlertRule:
def __init__(self, name, query, threshold, severity, action):
self.name = name
self.query = query
self.threshold = threshold
self.severity = severity
self.action = action
def evaluate(self, results):
if results.count >= self.threshold:
self.action.trigger(
title=f"Alert: {self.name}",
severity=self.severity,
details=results
)
# Example: SSH brute force
alert = AlertRule(
name="SSH Brute Force",
query="index=linux_logs process=sshd action=failure",
threshold=10,
severity="HIGH",
action=SlackAction(channel="#security-alerts")
)
Best Practices
# SIEM best practices
collection:
- "Collect from all critical systems"
- "Use standard formats (JSON, CEF)"
- "Ensure log integrity"
- "Balance detail vs volume"
retention:
- "Hot: 30 days"
- "Warm: 90 days"
- "Cold: 1+ years"
- "Comply with regulations"
tuning:
- "Start with known rules"
- "Tune false positives"
- "Prioritize critical alerts"
- "Regular rule reviews"
response:
- "Have runbooks for alerts"
- "Test response regularly"
- "Document everything"
- "Post-incident analysis"
Conclusion
SIEM is essential for security monitoring:
- Centralize: Aggregate all logs in one place
- Normalize: Use standard formats
- Detect: Write correlation rules
- Respond: Have alert response plans
Start with open-source (ELK) or managed services (Splunk, Microsoft Sentinel).
Comments