Incident Response Automation: SOAR Platforms and Security Orchestration
TL;DR: This guide covers implementing incident response automation with SOAR platforms. Learn to build playbooks, automate triage, orchestrate tools, and reduce response times.
Introduction
Security Orchestration, Automation, and Response (SOAR) platforms help security teams:
- Automate repetitive tasks - Reduce manual effort
- Speed response times - Faster incident resolution
- Standardize processes - Ensure consistent responses
- Improve collaboration - Coordinate team activities
SOAR Architecture
Core Components
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
class Severity(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
@dataclass
class Incident:
id: str
title: str
severity: Severity
source: str
status: str
assignee: str = None
class SOARPlatform:
def __init__(self):
self.incidents = []
self.playbooks = {}
self.connectors = {}
def create_incident(self, incident: Incident):
"""Create new incident"""
self.incidents.append(incident)
def run_playbook(self, incident_id: str, playbook_name: str):
"""Execute playbook for incident"""
playbook = self.playbooks[playbook_name]
playbook.execute(incident_id)
Playbook Development
Simple Playbook
class Playbook:
def __init__(self, name: str, steps: List[dict]):
self.name = name
self.steps = steps
def execute(self, incident_id: str):
"""Execute playbook steps"""
for step in self.steps:
action = step['action']
params = step.get('params', {})
result = self._execute_action(action, params)
if not result['success'] and step.get('stop_on_failure', True):
break
def _execute_action(self, action: str, params: dict) -> dict:
"""Execute individual action"""
# Implementation varies by action type
pass
# Example: Phishing Investigation Playbook
phishing_playbook = Playbook(
name="Phishing Investigation",
steps=[
{
'action': 'get_email_headers',
'params': {'incident_id': '$INCIDENT_ID'},
'stop_on_failure': True
},
{
'action': 'check_sender_reputation',
'params': {'sender': '$SENDER_EMAIL'},
'stop_on_failure': False
},
{
'action': 'extract_iocs',
'params': {'email': '$EMAIL_CONTENT'},
'stop_on_failure': False
},
{
'action': 'block_sender',
'params': {'sender': '$SENDER_EMAIL'},
'stop_on_failure': False
},
{
'action': 'notify_security_team',
'params': {'message': 'Phishing investigation complete'},
'stop_on_failure': False
}
]
)
Automated Triage
Triage Engine
class TriageEngine:
def __init__(self, soar: SOARPlatform):
self.soar = soar
self.rules = []
def add_rule(self, condition: dict, action: str, priority: int = 1):
"""Add triage rule"""
self.rules.append({
'condition': condition,
'action': action,
'priority': priority
})
self.rules.sort(key=lambda x: x['priority'])
def process_incident(self, incident: Incident) -> str:
"""Process incident through triage"""
for rule in self.rules:
if self._evaluate_condition(rule['condition'], incident):
return self._execute_action(rule['action'], incident)
return 'manual_review'
def _evaluate_condition(self, condition: dict, incident: Incident) -> bool:
"""Evaluate if condition matches"""
# Simple condition evaluation
field = condition.get('field')
operator = condition.get('operator')
value = condition.get('value')
incident_value = getattr(incident, field, None)
if operator == 'equals':
return incident_value == value
elif operator == 'contains':
return value in incident_value
elif operator == 'greater_than':
return incident_value > value
return False
def _execute_action(self, action: str, incident: Incident) -> str:
"""Execute triage action"""
if action == 'auto_resolve':
incident.status = 'resolved'
return 'auto_resolved'
elif action == 'escalate':
incident.severity = Severity.CRITICAL
return 'escalated'
elif action.startswith('run_playbook:'):
playbook_name = action.split(':')[1]
self.soar.run_playbook(incident.id, playbook_name)
return 'playbook_triggered'
return 'manual_review'
Triage Rules Example
triage = TriageEngine(soar)
# Rule 1: Low severity noise
triage.add_rule(
condition={'field': 'source', 'operator': 'equals', 'value': 'IDS'},
action='auto_resolve',
priority=1
)
# Rule 2: Known malware indicator
triage.add_rule(
condition={'field': 'title', 'operator': 'contains', 'value': 'malware'},
action='run_playbook:malware_investigation',
priority=2
)
# Rule 3: Critical severity
triage.add_rule(
condition={'field': 'severity', 'operator': 'equals', 'value': Severity.CRITICAL},
action='escalate',
priority=3
)
Tool Orchestration
Integration Examples
SIEM Integration
class SIEMConnector:
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
def search_events(self, query: str, time_range: str) -> List[dict]:
"""Search SIEM for events"""
# Implementation using SIEM API
pass
def get_alert_details(self, alert_id: str) -> dict:
"""Get alert details from SIEM"""
pass
def enrich_incident(self, incident: Incident) -> dict:
"""Enrich incident with SIEM data"""
events = self.search_events(
f"src_ip={incident.source}",
time_range="24h"
)
return {
'related_events': events,
'event_count': len(events)
}
Firewall Integration
class FirewallConnector:
def __init__(self, host: str, api_key: str):
self.host = host
self.api_key = api_key
def block_ip(self, ip: str, duration: int = 3600):
"""Block IP address"""
# Implementation using firewall API
pass
def unblock_ip(self, ip: str):
"""Unblock IP address"""
pass
def check_block_status(self, ip: str) -> bool:
"""Check if IP is blocked"""
pass
Automated Response Playbooks
Brute Force Response
brute_force_playbook = Playbook(
name="Brute Force Response",
steps=[
# Step 1: Get authentication logs
{
'action': 'query_logs',
'params': {
'query': f"event_type=auth_failed AND src_ip={incident.source}",
'time_range': '1h'
}
},
# Step 2: Check failed login count
{
'action': 'analyze_auth_pattern',
'params': {'events': '$QUERY_RESULT'}
},
# Step 3: Block if threshold exceeded
{
'action': 'conditional_block',
'params': {
'condition': 'failed_logins > 10',
'then': {'action': 'block_ip', 'ip': incident.source},
'else': {'action': 'add_watchlist', 'ip': incident.source}
}
},
# Step 4: Notify user
{
'action': 'send_notification',
'params': {
'channel': 'email',
'recipient': incident.assignee,
'message': f'Brute force detected from {incident.source}'
}
}
]
)
Ransomware Detection Response
ransomware_playbook = Playbook(
name="Ransomware Response",
steps=[
# Step 1: Isolate affected system
{
'action': 'isolate_endpoint',
'params': {'hostname': '$AFFECTED_HOST'}
},
# Step 2: Collect forensics
{
'action': 'collect_forensics',
'params': {'hostname': '$AFFECTED_HOST'}
},
# Step 3: Check for encryption activity
{
'action': 'analyze_file_activity',
'params': {'hostname': '$AFFECTED_HOST'}
},
# Step 4: Block malicious files
{
'action': 'quarantine_files',
'params': {'hashes': '$MALICIOUS_HASHES'}
},
# Step 5: Notify security team
{
'action': 'create_ticket',
'params': {'severity': 'critical', 'title': 'Ransomware detected'}
}
]
)
Metrics and Reporting
Key Metrics
| Metric | Description | Target |
|---|---|---|
| Mean Time to Detect (MTTD) | Time from incident to detection | < 1 hour |
| Mean Time to Respond (MTTR) | Time from detection to resolution | < 4 hours |
| Automation Rate | % of incidents handled automatically | > 70% |
| False Positive Rate | % of false positive alerts | < 15% |
| Playbook Coverage | % of incident types with playbooks | > 90% |
Reporting Dashboard
class SOARReporting:
def __init__(self, soar: SOARPlatform):
self.soar = soar
def get_dashboard_metrics(self) -> dict:
"""Get metrics for dashboard"""
incidents = self.soar.incidents
resolved = [i for i in incidents if i.status == 'resolved']
automated = [i for i in incidents if i.status == 'auto_resolved']
return {
'total_incidents': len(incidents),
'resolved_count': len(resolved),
'resolution_rate': len(resolved) / len(incidents) if incidents else 0,
'automation_rate': len(automated) / len(incidents) if incidents else 0,
'avg_resolution_time': self._calculate_avg_time(resolved),
'open_incidents': len([i for i in incidents if i.status != 'resolved'])
}
def _calculate_avg_time(self, resolved_incidents: list) -> float:
"""Calculate average resolution time"""
if not resolved_incidents:
return 0.0
total_time = sum([
(i.resolved_at - i.created_at).total_seconds()
for i in resolved_incidents
])
return total_time / len(resolved_incidents) / 3600 # Hours
Conclusion
SOAR platforms transform incident response:
- Playbook automation - Streamline repetitive tasks
- Automated triage - Filter noise quickly
- Tool orchestration - Integrate security tools
- Faster response - Reduce MTTR significantly
- Metrics-driven - Measure and improve continuously
External Resources
- [Splunk SOAR](https://www.splunk.com/en_us/products/splunk- soar.html)
- Palo Alto XSOAR
- The Hive Project
Comments