Advanced Threat Detection: SIEM, EDR, and ML-Based Anomaly Detection
Modern security requires multi-layered detection spanning logs, endpoints, and behavioral patterns to catch advanced threats before damage occurs.
SIEM Architecture
Log Aggregation Pipeline
โโโโโโโโโโโโโโโโ
โ Firewalls โ
โ Proxies โโโโโ
โ Routers โ โ
โโโโโโโโโโโโโโโโ โ
โ
โโโโโโโโโโโโโโโโ โ
โ Web Servers โ โ
โ Databases โโโโโคโโโบ Collector โโโบ Parser โโโบ Enrichment โโโบ SIEM DB
โ Apps โ โ
โโโโโโโโโโโโโโโโ โ
โ
โโโโโโโโโโโโโโโโ โ
โ Endpoints โ โ
โ Cloud APIs โโโโโ
โโโโโโโโโโโโโโโโ
SIEM Implementation
import json
from datetime import datetime
from typing import List, Dict
from elasticsearch import Elasticsearch
class SIEMCollector:
"""Collect and aggregate security logs from multiple sources"""
def __init__(self, es_host: str):
self.es = Elasticsearch([es_host])
self.correlations = {}
def ingest_log(self, source: str, log_entry: Dict) -> str:
"""Ingest log from any source"""
# Normalize timestamp
if 'timestamp' not in log_entry:
log_entry['timestamp'] = datetime.utcnow().isoformat()
# Add source information
log_entry['source'] = source
log_entry['ingested_at'] = datetime.utcnow().isoformat()
# Index in Elasticsearch
doc_id = self.es.index(
index='security-logs',
body=log_entry
)['_id']
return doc_id
def detect_anomalies(self) -> List[Dict]:
"""Detect suspicious patterns"""
anomalies = []
# Pattern 1: Multiple failed logins
failed_logins = self.es.search(
index='security-logs',
body={
'query': {
'bool': {
'must': [
{'match': {'event_type': 'login_failed'}},
{'range': {'timestamp': {'gte': 'now-5m'}}}
]
}
},
'aggs': {
'by_user': {
'terms': {'field': 'username', 'size': 100}
}
}
}
)
for bucket in failed_logins['aggregations']['by_user']['buckets']:
if bucket['doc_count'] > 5: # 5 failed logins in 5 min
anomalies.append({
'type': 'brute_force',
'user': bucket['key'],
'failed_attempts': bucket['doc_count'],
'severity': 'high'
})
# Pattern 2: Data exfiltration (large data transfer)
exfil = self.es.search(
index='security-logs',
body={
'query': {
'bool': {
'must': [
{'match': {'event_type': 'file_transfer'}},
{'range': {'bytes_transferred': {'gte': 1073741824}}} # 1GB
]
}
}
}
)
for hit in exfil['hits']['hits']:
anomalies.append({
'type': 'data_exfiltration',
'user': hit['_source']['username'],
'bytes': hit['_source']['bytes_transferred'],
'destination': hit['_source']['destination_ip'],
'severity': 'critical'
})
return anomalies
def correlate_events(self, event_ids: List[str]) -> Dict:
"""Correlate multiple events into incident"""
events = [self.es.get(index='security-logs', id=eid)['_source']
for eid in event_ids]
# Calculate correlation score
unique_users = len(set(e.get('username') for e in events))
unique_sources = len(set(e.get('source_ip') for e in events))
time_window = max(e['timestamp'] for e in events) - \
min(e['timestamp'] for e in events)
correlation_score = (unique_users + unique_sources) / time_window.total_seconds()
return {
'event_ids': event_ids,
'correlation_score': correlation_score,
'users_involved': unique_users,
'sources_involved': unique_sources,
'time_span_seconds': time_window.total_seconds(),
'incident_probability': 'high' if correlation_score > 0.5 else 'low'
}
# Usage
siem = SIEMCollector('localhost:9200')
# Ingest logs from firewall
siem.ingest_log('firewall', {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'connection_blocked',
'source_ip': '192.168.1.100',
'destination_ip': '10.0.0.50',
'port': 22,
'reason': 'blocked_port'
})
# Find anomalies
anomalies = siem.detect_anomalies()
for anomaly in anomalies:
print(f"Alert: {anomaly['type']} - {anomaly['severity']}")
EDR (Endpoint Detection and Response)
Behavior Monitoring
import hashlib
import psutil
from pathlib import Path
from typing import List, Dict
class EndpointDetectionResponse:
"""Monitor endpoint behavior for threats"""
def __init__(self):
self.baseline = {}
self.process_whitelist = self._load_whitelist()
def _load_whitelist(self) -> set:
"""Load known-good processes"""
return {
'python.exe', 'node.exe', 'java.exe',
'chrome.exe', 'explorer.exe', 'svchost.exe'
}
def monitor_process_execution(self) -> List[Dict]:
"""Detect suspicious process behavior"""
threats = []
for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
try:
name = proc.info['name'].lower()
exe = proc.info['exe'] or ''
# Red flags
if name not in self.process_whitelist:
# Suspicious unsigned executable
if not self._is_signed(exe):
threats.append({
'type': 'unsigned_executable',
'pid': proc.info['pid'],
'executable': exe,
'severity': 'medium'
})
# Process spawned from temp directory
if 'temp' in exe.lower() or 'appdata' in exe.lower():
threats.append({
'type': 'suspicious_location',
'pid': proc.info['pid'],
'path': exe,
'severity': 'high'
})
# Command line injection
cmdline = ' '.join(proc.info.get('cmdline', []))
if any(inj in cmdline for inj in [';', '&&', '|', '>', '<', '$(']):
threats.append({
'type': 'command_injection_attempt',
'pid': proc.info['pid'],
'command': cmdline,
'severity': 'high'
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return threats
def monitor_file_modifications(self, watch_dirs: List[str]) -> List[Dict]:
"""Detect unauthorized file changes"""
threats = []
for watch_dir in watch_dirs:
for filepath in Path(watch_dir).rglob('*'):
if filepath.is_file():
# Critical system files
if any(critical in str(filepath) for critical in
['system32', 'windows', 'drivers']):
# Check if file is whitelisted
if not self._is_file_whitelisted(str(filepath)):
threats.append({
'type': 'system_file_modification',
'path': str(filepath),
'modified': filepath.stat().st_mtime,
'severity': 'critical'
})
return threats
def monitor_network_connections(self) -> List[Dict]:
"""Detect C2 and suspicious connections"""
threats = []
known_bad_ips = self._load_threat_intelligence()
for conn in psutil.net_connections():
# Check against threat intelligence
if conn.raddr and conn.raddr.ip in known_bad_ips:
threats.append({
'type': 'c2_communication',
'pid': conn.pid,
'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}",
'malware': known_bad_ips[conn.raddr.ip],
'severity': 'critical'
})
# Unusual port usage
if conn.raddr and conn.raddr.port in [445, 139, 3389]: # SMB, RDP
if conn.pid not in [0, 4, 8]: # Not system processes
threats.append({
'type': 'lateral_movement_attempt',
'pid': conn.pid,
'target': f"{conn.raddr.ip}:{conn.raddr.port}",
'severity': 'high'
})
return threats
def _is_signed(self, exe_path: str) -> bool:
"""Check if executable is signed"""
# Implementation would check digital signature
return True
def _is_file_whitelisted(self, filepath: str) -> bool:
# Implementation would check file hash against known-good list
return False
def _load_threat_intelligence(self) -> Dict[str, str]:
# Load from threat intel feed (AlienVault OTX, etc)
return {}
# Usage
edr = EndpointDetectionResponse()
print("=== Process Monitoring ===")
for threat in edr.monitor_process_execution():
print(f"Threat: {threat['type']} (PID: {threat['pid']})")
print("\n=== File Monitoring ===")
for threat in edr.monitor_file_modifications(['C:\\Windows']):
print(f"Threat: {threat['type']} on {threat['path']}")
ML-Based Anomaly Detection
Behavioral Baseline Learning
import numpy as np
from sklearn.isolation_forest import IsolationForest
from sklearn.preprocessing import StandardScaler
import joblib
class MLAnomalyDetector:
"""Machine learning model for behavioral anomalies"""
def __init__(self):
self.model = IsolationForest(contamination=0.05)
self.scaler = StandardScaler()
self.is_trained = False
def extract_features(self, user_activity: Dict) -> np.ndarray:
"""Extract behavioral features from user activity"""
return np.array([
user_activity['login_count'], # Logins per day
user_activity['failed_login_count'], # Failed logins
user_activity['files_accessed'], # Files accessed
user_activity['bytes_downloaded'], # Data downloaded
user_activity['bytes_uploaded'], # Data uploaded
user_activity['privileged_operations'], # Sudo/admin actions
user_activity['after_hours_activity'], # After hours percentage
user_activity['unique_hosts'], # Different hosts accessed
user_activity['vpn_connections'], # VPN logins
user_activity['api_calls'], # API calls made
]).reshape(1, -1)
def train_baseline(self, historical_activities: List[Dict]):
"""Train on normal user behavior"""
features = []
for activity in historical_activities:
features.append(self.extract_features(activity))
X = np.vstack(features)
X_scaled = self.scaler.fit_transform(X)
# Train Isolation Forest
self.model.fit(X_scaled)
self.is_trained = True
print(f"Trained on {len(historical_activities)} records")
def detect_anomaly(self, user_activity: Dict) -> Dict:
"""Predict if activity is anomalous"""
if not self.is_trained:
raise ValueError("Model not trained yet")
X = self.extract_features(user_activity)
X_scaled = self.scaler.transform(X)
# -1 = anomaly, 1 = normal
prediction = self.model.predict(X_scaled)[0]
anomaly_score = -self.model.score_samples(X_scaled)[0]
return {
'is_anomaly': prediction == -1,
'anomaly_score': float(anomaly_score),
'severity': self._score_to_severity(anomaly_score)
}
def _score_to_severity(self, score: float) -> str:
"""Convert anomaly score to severity"""
if score > 0.8:
return 'critical'
elif score > 0.5:
return 'high'
elif score > 0.3:
return 'medium'
else:
return 'low'
def save_model(self, path: str):
"""Save trained model"""
joblib.dump({
'model': self.model,
'scaler': self.scaler
}, path)
def load_model(self, path: str):
"""Load pre-trained model"""
data = joblib.load(path)
self.model = data['model']
self.scaler = data['scaler']
self.is_trained = True
# Usage
detector = MLAnomalyDetector()
# Train on normal behavior
normal_users = [
{
'login_count': 1,
'failed_login_count': 0,
'files_accessed': 25,
'bytes_downloaded': 102400,
'bytes_uploaded': 51200,
'privileged_operations': 0,
'after_hours_activity': 5,
'unique_hosts': 1,
'vpn_connections': 0,
'api_calls': 50
} for _ in range(1000)
]
detector.train_baseline(normal_users)
# Detect anomaly
suspicious_activity = {
'login_count': 50, # Multiple logins
'failed_login_count': 20, # Many failures
'files_accessed': 5000, # Huge file access
'bytes_downloaded': 10737418240, # 10GB downloaded
'bytes_uploaded': 0,
'privileged_operations': 100, # Many sudo commands
'after_hours_activity': 95, # All after hours
'unique_hosts': 50, # Many different IPs
'vpn_connections': 30,
'api_calls': 10000
}
result = detector.detect_anomaly(suspicious_activity)
print(f"Anomaly: {result['is_anomaly']}, Score: {result['anomaly_score']:.2f}, Severity: {result['severity']}")
Incident Response Automation
Automated Response Playbooks
from typing import List, Dict
from enum import Enum
from datetime import datetime
class IncidentSeverity(Enum):
CRITICAL = 'critical'
HIGH = 'high'
MEDIUM = 'medium'
LOW = 'low'
class IncidentResponse:
"""Automated incident response execution"""
def __init__(self):
self.playbooks = self._load_playbooks()
def _load_playbooks(self) -> Dict:
"""Load incident response playbooks"""
return {
'ransomware': self._ransomware_response,
'data_exfiltration': self._exfiltration_response,
'credential_compromise': self._credential_response,
'c2_communication': self._c2_response
}
def execute_response(self, incident: Dict):
"""Execute automated response"""
incident_type = incident['type']
severity = IncidentSeverity(incident['severity'])
print(f"[{datetime.now()}] Executing response for {incident_type} ({severity.value})")
if incident_type in self.playbooks:
playbook = self.playbooks[incident_type]
playbook(incident, severity)
else:
self._escalate_to_soc(incident)
def _ransomware_response(self, incident: Dict, severity: IncidentSeverity):
"""Ransomware detection response"""
affected_host = incident['hostname']
print(f"1. Isolating {affected_host} from network...")
self._isolate_host(affected_host)
print(f"2. Killing suspicious processes...")
for pid in incident['process_ids']:
self._kill_process(pid)
print(f"3. Disabling network shares...")
self._disable_shares(affected_host)
print(f"4. Preserving logs for forensics...")
self._preserve_logs(affected_host)
print(f"5. Alerting incident response team...")
self._create_incident_ticket({
'type': 'ransomware',
'severity': severity.value,
'host': affected_host,
'timestamp': datetime.now().isoformat()
})
def _exfiltration_response(self, incident: Dict, severity: IncidentSeverity):
"""Data exfiltration response"""
username = incident['user']
destination = incident['destination_ip']
print(f"1. Terminating user {username} sessions...")
self._terminate_user_sessions(username)
print(f"2. Blocking destination {destination}...")
self._block_ip(destination)
print(f"3. Revoking API tokens...")
self._revoke_api_tokens(username)
print(f"4. Forcing password reset...")
self._force_password_reset(username)
print(f"5. Querying backup for data integrity...")
integrity_check = self._verify_data_integrity()
print(f" Data integrity: {'OK' if integrity_check else 'COMPROMISED'}")
def _credential_response(self, incident: Dict, severity: IncidentSeverity):
"""Compromised credential response"""
credential_type = incident['credential_type']
affected_users = incident['affected_users']
print(f"1. Revoking all {credential_type} across {len(affected_users)} users...")
for user in affected_users:
self._revoke_all_tokens(user)
print(f"2. Forcing MFA re-enrollment...")
self._force_mfa_reenroll(affected_users)
print(f"3. Checking for unauthorized access...")
self._check_unauthorized_access(affected_users)
def _c2_response(self, incident: Dict, severity: IncidentSeverity):
"""C2 command & control response"""
c2_ip = incident['c2_ip']
infected_hosts = incident['infected_hosts']
print(f"1. Blocking C2 server {c2_ip} globally...")
self._block_ip(c2_ip)
print(f"2. Isolating {len(infected_hosts)} infected hosts...")
for host in infected_hosts:
self._isolate_host(host)
print(f"3. Scanning network for malware...")
self._trigger_antivirus_scan()
def _isolate_host(self, hostname: str):
"""Remove host from network"""
print(f" - Isolating {hostname}")
def _kill_process(self, pid: int):
print(f" - Killing process {pid}")
def _block_ip(self, ip: str):
print(f" - Blocking {ip} in firewall")
def _revoke_api_tokens(self, username: str):
print(f" - Revoking tokens for {username}")
def _terminate_user_sessions(self, username: str):
print(f" - Terminating {username} sessions")
def _force_password_reset(self, username: str):
print(f" - Forcing password reset for {username}")
def _verify_data_integrity(self) -> bool:
return True
def _revoke_all_tokens(self, user: str):
print(f" - Revoking tokens for {user}")
def _force_mfa_reenroll(self, users: List[str]):
print(f" - Forcing MFA for {len(users)} users")
def _check_unauthorized_access(self, users: List[str]):
print(f" - Checking access logs for {len(users)} users")
def _trigger_antivirus_scan(self):
print(f" - Triggering network antivirus scan")
def _disable_shares(self, host: str):
print(f" - Disabling shares on {host}")
def _preserve_logs(self, host: str):
print(f" - Preserving logs from {host}")
def _create_incident_ticket(self, incident: Dict):
print(f" - Created ticket: {incident}")
def _escalate_to_soc(self, incident: Dict):
print(f" - Escalating to SOC: {incident}")
# Usage
ir = IncidentResponse()
# Ransomware detected
ir.execute_response({
'type': 'ransomware',
'severity': 'critical',
'hostname': 'workstation-42',
'process_ids': [1234, 5678]
})
Glossary
- SIEM: Security Information & Event Management - central log analysis
- EDR: Endpoint Detection & Response - endpoint-level threat hunting
- C2: Command & Control - attacker communication with compromised host
- False Positive: Alert triggered by legitimate activity
- Zero-day: Previously unknown vulnerability
- APT: Advanced Persistent Threat - sophisticated attackers
Comments