Skip to main content
โšก Calmops

Advanced Threat Detection: SIEM, EDR, and ML-Based Anomaly Detection

Advanced Threat Detection: SIEM, EDR, and ML-Based Anomaly Detection

Modern security requires multi-layered detection spanning logs, endpoints, and behavioral patterns to catch advanced threats before damage occurs.


SIEM Architecture

Log Aggregation Pipeline

    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”‚  Firewalls   โ”‚
    โ”‚  Proxies     โ”‚โ”€โ”€โ”€โ”
    โ”‚  Routers     โ”‚   โ”‚
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
                       โ”‚
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
    โ”‚ Web Servers  โ”‚   โ”‚
    โ”‚ Databases    โ”‚โ”€โ”€โ”€โ”คโ”€โ”€โ–บ Collector โ”€โ”€โ–บ Parser โ”€โ”€โ–บ Enrichment โ”€โ”€โ–บ SIEM DB
    โ”‚ Apps         โ”‚   โ”‚
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
                       โ”‚
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
    โ”‚ Endpoints    โ”‚   โ”‚
    โ”‚ Cloud APIs   โ”‚โ”€โ”€โ”€โ”˜
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

SIEM Implementation

import json
from datetime import datetime
from typing import List, Dict
from elasticsearch import Elasticsearch

class SIEMCollector:
    """Collect and aggregate security logs from multiple sources"""
    
    def __init__(self, es_host: str):
        self.es = Elasticsearch([es_host])
        self.correlations = {}
    
    def ingest_log(self, source: str, log_entry: Dict) -> str:
        """Ingest log from any source"""
        
        # Normalize timestamp
        if 'timestamp' not in log_entry:
            log_entry['timestamp'] = datetime.utcnow().isoformat()
        
        # Add source information
        log_entry['source'] = source
        log_entry['ingested_at'] = datetime.utcnow().isoformat()
        
        # Index in Elasticsearch
        doc_id = self.es.index(
            index='security-logs',
            body=log_entry
        )['_id']
        
        return doc_id
    
    def detect_anomalies(self) -> List[Dict]:
        """Detect suspicious patterns"""
        
        anomalies = []
        
        # Pattern 1: Multiple failed logins
        failed_logins = self.es.search(
            index='security-logs',
            body={
                'query': {
                    'bool': {
                        'must': [
                            {'match': {'event_type': 'login_failed'}},
                            {'range': {'timestamp': {'gte': 'now-5m'}}}
                        ]
                    }
                },
                'aggs': {
                    'by_user': {
                        'terms': {'field': 'username', 'size': 100}
                    }
                }
            }
        )
        
        for bucket in failed_logins['aggregations']['by_user']['buckets']:
            if bucket['doc_count'] > 5:  # 5 failed logins in 5 min
                anomalies.append({
                    'type': 'brute_force',
                    'user': bucket['key'],
                    'failed_attempts': bucket['doc_count'],
                    'severity': 'high'
                })
        
        # Pattern 2: Data exfiltration (large data transfer)
        exfil = self.es.search(
            index='security-logs',
            body={
                'query': {
                    'bool': {
                        'must': [
                            {'match': {'event_type': 'file_transfer'}},
                            {'range': {'bytes_transferred': {'gte': 1073741824}}}  # 1GB
                        ]
                    }
                }
            }
        )
        
        for hit in exfil['hits']['hits']:
            anomalies.append({
                'type': 'data_exfiltration',
                'user': hit['_source']['username'],
                'bytes': hit['_source']['bytes_transferred'],
                'destination': hit['_source']['destination_ip'],
                'severity': 'critical'
            })
        
        return anomalies
    
    def correlate_events(self, event_ids: List[str]) -> Dict:
        """Correlate multiple events into incident"""
        
        events = [self.es.get(index='security-logs', id=eid)['_source'] 
                  for eid in event_ids]
        
        # Calculate correlation score
        unique_users = len(set(e.get('username') for e in events))
        unique_sources = len(set(e.get('source_ip') for e in events))
        time_window = max(e['timestamp'] for e in events) - \
                      min(e['timestamp'] for e in events)
        
        correlation_score = (unique_users + unique_sources) / time_window.total_seconds()
        
        return {
            'event_ids': event_ids,
            'correlation_score': correlation_score,
            'users_involved': unique_users,
            'sources_involved': unique_sources,
            'time_span_seconds': time_window.total_seconds(),
            'incident_probability': 'high' if correlation_score > 0.5 else 'low'
        }

# Usage
siem = SIEMCollector('localhost:9200')

# Ingest logs from firewall
siem.ingest_log('firewall', {
    'timestamp': datetime.utcnow().isoformat(),
    'event_type': 'connection_blocked',
    'source_ip': '192.168.1.100',
    'destination_ip': '10.0.0.50',
    'port': 22,
    'reason': 'blocked_port'
})

# Find anomalies
anomalies = siem.detect_anomalies()
for anomaly in anomalies:
    print(f"Alert: {anomaly['type']} - {anomaly['severity']}")

EDR (Endpoint Detection and Response)

Behavior Monitoring

import hashlib
import psutil
from pathlib import Path
from typing import List, Dict

class EndpointDetectionResponse:
    """Monitor endpoint behavior for threats"""
    
    def __init__(self):
        self.baseline = {}
        self.process_whitelist = self._load_whitelist()
    
    def _load_whitelist(self) -> set:
        """Load known-good processes"""
        return {
            'python.exe', 'node.exe', 'java.exe',
            'chrome.exe', 'explorer.exe', 'svchost.exe'
        }
    
    def monitor_process_execution(self) -> List[Dict]:
        """Detect suspicious process behavior"""
        
        threats = []
        
        for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
            try:
                name = proc.info['name'].lower()
                exe = proc.info['exe'] or ''
                
                # Red flags
                if name not in self.process_whitelist:
                    # Suspicious unsigned executable
                    if not self._is_signed(exe):
                        threats.append({
                            'type': 'unsigned_executable',
                            'pid': proc.info['pid'],
                            'executable': exe,
                            'severity': 'medium'
                        })
                    
                    # Process spawned from temp directory
                    if 'temp' in exe.lower() or 'appdata' in exe.lower():
                        threats.append({
                            'type': 'suspicious_location',
                            'pid': proc.info['pid'],
                            'path': exe,
                            'severity': 'high'
                        })
                
                # Command line injection
                cmdline = ' '.join(proc.info.get('cmdline', []))
                if any(inj in cmdline for inj in [';', '&&', '|', '>', '<', '$(']):
                    threats.append({
                        'type': 'command_injection_attempt',
                        'pid': proc.info['pid'],
                        'command': cmdline,
                        'severity': 'high'
                    })
                    
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                pass
        
        return threats
    
    def monitor_file_modifications(self, watch_dirs: List[str]) -> List[Dict]:
        """Detect unauthorized file changes"""
        
        threats = []
        
        for watch_dir in watch_dirs:
            for filepath in Path(watch_dir).rglob('*'):
                if filepath.is_file():
                    # Critical system files
                    if any(critical in str(filepath) for critical in 
                           ['system32', 'windows', 'drivers']):
                        
                        # Check if file is whitelisted
                        if not self._is_file_whitelisted(str(filepath)):
                            threats.append({
                                'type': 'system_file_modification',
                                'path': str(filepath),
                                'modified': filepath.stat().st_mtime,
                                'severity': 'critical'
                            })
        
        return threats
    
    def monitor_network_connections(self) -> List[Dict]:
        """Detect C2 and suspicious connections"""
        
        threats = []
        known_bad_ips = self._load_threat_intelligence()
        
        for conn in psutil.net_connections():
            # Check against threat intelligence
            if conn.raddr and conn.raddr.ip in known_bad_ips:
                threats.append({
                    'type': 'c2_communication',
                    'pid': conn.pid,
                    'remote_address': f"{conn.raddr.ip}:{conn.raddr.port}",
                    'malware': known_bad_ips[conn.raddr.ip],
                    'severity': 'critical'
                })
            
            # Unusual port usage
            if conn.raddr and conn.raddr.port in [445, 139, 3389]:  # SMB, RDP
                if conn.pid not in [0, 4, 8]:  # Not system processes
                    threats.append({
                        'type': 'lateral_movement_attempt',
                        'pid': conn.pid,
                        'target': f"{conn.raddr.ip}:{conn.raddr.port}",
                        'severity': 'high'
                    })
        
        return threats
    
    def _is_signed(self, exe_path: str) -> bool:
        """Check if executable is signed"""
        # Implementation would check digital signature
        return True
    
    def _is_file_whitelisted(self, filepath: str) -> bool:
        # Implementation would check file hash against known-good list
        return False
    
    def _load_threat_intelligence(self) -> Dict[str, str]:
        # Load from threat intel feed (AlienVault OTX, etc)
        return {}

# Usage
edr = EndpointDetectionResponse()

print("=== Process Monitoring ===")
for threat in edr.monitor_process_execution():
    print(f"Threat: {threat['type']} (PID: {threat['pid']})")

print("\n=== File Monitoring ===")
for threat in edr.monitor_file_modifications(['C:\\Windows']):
    print(f"Threat: {threat['type']} on {threat['path']}")

ML-Based Anomaly Detection

Behavioral Baseline Learning

import numpy as np
from sklearn.isolation_forest import IsolationForest
from sklearn.preprocessing import StandardScaler
import joblib

class MLAnomalyDetector:
    """Machine learning model for behavioral anomalies"""
    
    def __init__(self):
        self.model = IsolationForest(contamination=0.05)
        self.scaler = StandardScaler()
        self.is_trained = False
    
    def extract_features(self, user_activity: Dict) -> np.ndarray:
        """Extract behavioral features from user activity"""
        
        return np.array([
            user_activity['login_count'],           # Logins per day
            user_activity['failed_login_count'],    # Failed logins
            user_activity['files_accessed'],        # Files accessed
            user_activity['bytes_downloaded'],      # Data downloaded
            user_activity['bytes_uploaded'],        # Data uploaded
            user_activity['privileged_operations'], # Sudo/admin actions
            user_activity['after_hours_activity'],  # After hours percentage
            user_activity['unique_hosts'],          # Different hosts accessed
            user_activity['vpn_connections'],       # VPN logins
            user_activity['api_calls'],             # API calls made
        ]).reshape(1, -1)
    
    def train_baseline(self, historical_activities: List[Dict]):
        """Train on normal user behavior"""
        
        features = []
        for activity in historical_activities:
            features.append(self.extract_features(activity))
        
        X = np.vstack(features)
        X_scaled = self.scaler.fit_transform(X)
        
        # Train Isolation Forest
        self.model.fit(X_scaled)
        self.is_trained = True
        
        print(f"Trained on {len(historical_activities)} records")
    
    def detect_anomaly(self, user_activity: Dict) -> Dict:
        """Predict if activity is anomalous"""
        
        if not self.is_trained:
            raise ValueError("Model not trained yet")
        
        X = self.extract_features(user_activity)
        X_scaled = self.scaler.transform(X)
        
        # -1 = anomaly, 1 = normal
        prediction = self.model.predict(X_scaled)[0]
        anomaly_score = -self.model.score_samples(X_scaled)[0]
        
        return {
            'is_anomaly': prediction == -1,
            'anomaly_score': float(anomaly_score),
            'severity': self._score_to_severity(anomaly_score)
        }
    
    def _score_to_severity(self, score: float) -> str:
        """Convert anomaly score to severity"""
        if score > 0.8:
            return 'critical'
        elif score > 0.5:
            return 'high'
        elif score > 0.3:
            return 'medium'
        else:
            return 'low'
    
    def save_model(self, path: str):
        """Save trained model"""
        joblib.dump({
            'model': self.model,
            'scaler': self.scaler
        }, path)
    
    def load_model(self, path: str):
        """Load pre-trained model"""
        data = joblib.load(path)
        self.model = data['model']
        self.scaler = data['scaler']
        self.is_trained = True

# Usage
detector = MLAnomalyDetector()

# Train on normal behavior
normal_users = [
    {
        'login_count': 1,
        'failed_login_count': 0,
        'files_accessed': 25,
        'bytes_downloaded': 102400,
        'bytes_uploaded': 51200,
        'privileged_operations': 0,
        'after_hours_activity': 5,
        'unique_hosts': 1,
        'vpn_connections': 0,
        'api_calls': 50
    } for _ in range(1000)
]

detector.train_baseline(normal_users)

# Detect anomaly
suspicious_activity = {
    'login_count': 50,           # Multiple logins
    'failed_login_count': 20,    # Many failures
    'files_accessed': 5000,      # Huge file access
    'bytes_downloaded': 10737418240,  # 10GB downloaded
    'bytes_uploaded': 0,
    'privileged_operations': 100,     # Many sudo commands
    'after_hours_activity': 95,       # All after hours
    'unique_hosts': 50,               # Many different IPs
    'vpn_connections': 30,
    'api_calls': 10000
}

result = detector.detect_anomaly(suspicious_activity)
print(f"Anomaly: {result['is_anomaly']}, Score: {result['anomaly_score']:.2f}, Severity: {result['severity']}")

Incident Response Automation

Automated Response Playbooks

from typing import List, Dict
from enum import Enum
from datetime import datetime

class IncidentSeverity(Enum):
    CRITICAL = 'critical'
    HIGH = 'high'
    MEDIUM = 'medium'
    LOW = 'low'

class IncidentResponse:
    """Automated incident response execution"""
    
    def __init__(self):
        self.playbooks = self._load_playbooks()
    
    def _load_playbooks(self) -> Dict:
        """Load incident response playbooks"""
        return {
            'ransomware': self._ransomware_response,
            'data_exfiltration': self._exfiltration_response,
            'credential_compromise': self._credential_response,
            'c2_communication': self._c2_response
        }
    
    def execute_response(self, incident: Dict):
        """Execute automated response"""
        
        incident_type = incident['type']
        severity = IncidentSeverity(incident['severity'])
        
        print(f"[{datetime.now()}] Executing response for {incident_type} ({severity.value})")
        
        if incident_type in self.playbooks:
            playbook = self.playbooks[incident_type]
            playbook(incident, severity)
        else:
            self._escalate_to_soc(incident)
    
    def _ransomware_response(self, incident: Dict, severity: IncidentSeverity):
        """Ransomware detection response"""
        
        affected_host = incident['hostname']
        
        print(f"1. Isolating {affected_host} from network...")
        self._isolate_host(affected_host)
        
        print(f"2. Killing suspicious processes...")
        for pid in incident['process_ids']:
            self._kill_process(pid)
        
        print(f"3. Disabling network shares...")
        self._disable_shares(affected_host)
        
        print(f"4. Preserving logs for forensics...")
        self._preserve_logs(affected_host)
        
        print(f"5. Alerting incident response team...")
        self._create_incident_ticket({
            'type': 'ransomware',
            'severity': severity.value,
            'host': affected_host,
            'timestamp': datetime.now().isoformat()
        })
    
    def _exfiltration_response(self, incident: Dict, severity: IncidentSeverity):
        """Data exfiltration response"""
        
        username = incident['user']
        destination = incident['destination_ip']
        
        print(f"1. Terminating user {username} sessions...")
        self._terminate_user_sessions(username)
        
        print(f"2. Blocking destination {destination}...")
        self._block_ip(destination)
        
        print(f"3. Revoking API tokens...")
        self._revoke_api_tokens(username)
        
        print(f"4. Forcing password reset...")
        self._force_password_reset(username)
        
        print(f"5. Querying backup for data integrity...")
        integrity_check = self._verify_data_integrity()
        print(f"   Data integrity: {'OK' if integrity_check else 'COMPROMISED'}")
    
    def _credential_response(self, incident: Dict, severity: IncidentSeverity):
        """Compromised credential response"""
        
        credential_type = incident['credential_type']
        affected_users = incident['affected_users']
        
        print(f"1. Revoking all {credential_type} across {len(affected_users)} users...")
        for user in affected_users:
            self._revoke_all_tokens(user)
        
        print(f"2. Forcing MFA re-enrollment...")
        self._force_mfa_reenroll(affected_users)
        
        print(f"3. Checking for unauthorized access...")
        self._check_unauthorized_access(affected_users)
    
    def _c2_response(self, incident: Dict, severity: IncidentSeverity):
        """C2 command & control response"""
        
        c2_ip = incident['c2_ip']
        infected_hosts = incident['infected_hosts']
        
        print(f"1. Blocking C2 server {c2_ip} globally...")
        self._block_ip(c2_ip)
        
        print(f"2. Isolating {len(infected_hosts)} infected hosts...")
        for host in infected_hosts:
            self._isolate_host(host)
        
        print(f"3. Scanning network for malware...")
        self._trigger_antivirus_scan()
    
    def _isolate_host(self, hostname: str):
        """Remove host from network"""
        print(f"   - Isolating {hostname}")
    
    def _kill_process(self, pid: int):
        print(f"   - Killing process {pid}")
    
    def _block_ip(self, ip: str):
        print(f"   - Blocking {ip} in firewall")
    
    def _revoke_api_tokens(self, username: str):
        print(f"   - Revoking tokens for {username}")
    
    def _terminate_user_sessions(self, username: str):
        print(f"   - Terminating {username} sessions")
    
    def _force_password_reset(self, username: str):
        print(f"   - Forcing password reset for {username}")
    
    def _verify_data_integrity(self) -> bool:
        return True
    
    def _revoke_all_tokens(self, user: str):
        print(f"   - Revoking tokens for {user}")
    
    def _force_mfa_reenroll(self, users: List[str]):
        print(f"   - Forcing MFA for {len(users)} users")
    
    def _check_unauthorized_access(self, users: List[str]):
        print(f"   - Checking access logs for {len(users)} users")
    
    def _trigger_antivirus_scan(self):
        print(f"   - Triggering network antivirus scan")
    
    def _disable_shares(self, host: str):
        print(f"   - Disabling shares on {host}")
    
    def _preserve_logs(self, host: str):
        print(f"   - Preserving logs from {host}")
    
    def _create_incident_ticket(self, incident: Dict):
        print(f"   - Created ticket: {incident}")
    
    def _escalate_to_soc(self, incident: Dict):
        print(f"   - Escalating to SOC: {incident}")

# Usage
ir = IncidentResponse()

# Ransomware detected
ir.execute_response({
    'type': 'ransomware',
    'severity': 'critical',
    'hostname': 'workstation-42',
    'process_ids': [1234, 5678]
})

Glossary

  • SIEM: Security Information & Event Management - central log analysis
  • EDR: Endpoint Detection & Response - endpoint-level threat hunting
  • C2: Command & Control - attacker communication with compromised host
  • False Positive: Alert triggered by legitimate activity
  • Zero-day: Previously unknown vulnerability
  • APT: Advanced Persistent Threat - sophisticated attackers

Resources

Comments