Skip to main content
โšก Calmops

Incident Response Automation: SOAR Platforms and Security Orchestration

Incident Response Automation: SOAR Platforms and Security Orchestration

TL;DR: This guide covers implementing incident response automation with SOAR platforms. Learn to build playbooks, automate triage, orchestrate tools, and reduce response times.


Introduction

Security Orchestration, Automation, and Response (SOAR) platforms help security teams:

  • Automate repetitive tasks - Reduce manual effort
  • Speed response times - Faster incident resolution
  • Standardize processes - Ensure consistent responses
  • Improve collaboration - Coordinate team activities

SOAR Architecture

Core Components

from dataclasses import dataclass
from typing import List, Dict
from enum import Enum

class Severity(Enum):
    CRITICAL = 1
    HIGH = 2
    MEDIUM = 3
    LOW = 4

@dataclass
class Incident:
    id: str
    title: str
    severity: Severity
    source: str
    status: str
    assignee: str = None
    
class SOARPlatform:
    def __init__(self):
        self.incidents = []
        self.playbooks = {}
        self.connectors = {}
        
    def create_incident(self, incident: Incident):
        """Create new incident"""
        self.incidents.append(incident)
        
    def run_playbook(self, incident_id: str, playbook_name: str):
        """Execute playbook for incident"""
        playbook = self.playbooks[playbook_name]
        playbook.execute(incident_id)

Playbook Development

Simple Playbook

class Playbook:
    def __init__(self, name: str, steps: List[dict]):
        self.name = name
        self.steps = steps
        
    def execute(self, incident_id: str):
        """Execute playbook steps"""
        for step in self.steps:
            action = step['action']
            params = step.get('params', {})
            
            result = self._execute_action(action, params)
            
            if not result['success'] and step.get('stop_on_failure', True):
                break
    
    def _execute_action(self, action: str, params: dict) -> dict:
        """Execute individual action"""
        # Implementation varies by action type
        pass

# Example: Phishing Investigation Playbook
phishing_playbook = Playbook(
    name="Phishing Investigation",
    steps=[
        {
            'action': 'get_email_headers',
            'params': {'incident_id': '$INCIDENT_ID'},
            'stop_on_failure': True
        },
        {
            'action': 'check_sender_reputation',
            'params': {'sender': '$SENDER_EMAIL'},
            'stop_on_failure': False
        },
        {
            'action': 'extract_iocs',
            'params': {'email': '$EMAIL_CONTENT'},
            'stop_on_failure': False
        },
        {
            'action': 'block_sender',
            'params': {'sender': '$SENDER_EMAIL'},
            'stop_on_failure': False
        },
        {
            'action': 'notify_security_team',
            'params': {'message': 'Phishing investigation complete'},
            'stop_on_failure': False
        }
    ]
)

Automated Triage

Triage Engine

class TriageEngine:
    def __init__(self, soar: SOARPlatform):
        self.soar = soar
        self.rules = []
        
    def add_rule(self, condition: dict, action: str, priority: int = 1):
        """Add triage rule"""
        self.rules.append({
            'condition': condition,
            'action': action,
            'priority': priority
        })
        self.rules.sort(key=lambda x: x['priority'])
        
    def process_incident(self, incident: Incident) -> str:
        """Process incident through triage"""
        for rule in self.rules:
            if self._evaluate_condition(rule['condition'], incident):
                return self._execute_action(rule['action'], incident)
        
        return 'manual_review'
    
    def _evaluate_condition(self, condition: dict, incident: Incident) -> bool:
        """Evaluate if condition matches"""
        # Simple condition evaluation
        field = condition.get('field')
        operator = condition.get('operator')
        value = condition.get('value')
        
        incident_value = getattr(incident, field, None)
        
        if operator == 'equals':
            return incident_value == value
        elif operator == 'contains':
            return value in incident_value
        elif operator == 'greater_than':
            return incident_value > value
        
        return False
    
    def _execute_action(self, action: str, incident: Incident) -> str:
        """Execute triage action"""
        if action == 'auto_resolve':
            incident.status = 'resolved'
            return 'auto_resolved'
        elif action == 'escalate':
            incident.severity = Severity.CRITICAL
            return 'escalated'
        elif action.startswith('run_playbook:'):
            playbook_name = action.split(':')[1]
            self.soar.run_playbook(incident.id, playbook_name)
            return 'playbook_triggered'
        
        return 'manual_review'

Triage Rules Example

triage = TriageEngine(soar)

# Rule 1: Low severity noise
triage.add_rule(
    condition={'field': 'source', 'operator': 'equals', 'value': 'IDS'},
    action='auto_resolve',
    priority=1
)

# Rule 2: Known malware indicator
triage.add_rule(
    condition={'field': 'title', 'operator': 'contains', 'value': 'malware'},
    action='run_playbook:malware_investigation',
    priority=2
)

# Rule 3: Critical severity
triage.add_rule(
    condition={'field': 'severity', 'operator': 'equals', 'value': Severity.CRITICAL},
    action='escalate',
    priority=3
)

Tool Orchestration

Integration Examples

SIEM Integration

class SIEMConnector:
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        
    def search_events(self, query: str, time_range: str) -> List[dict]:
        """Search SIEM for events"""
        # Implementation using SIEM API
        pass
    
    def get_alert_details(self, alert_id: str) -> dict:
        """Get alert details from SIEM"""
        pass
    
    def enrich_incident(self, incident: Incident) -> dict:
        """Enrich incident with SIEM data"""
        events = self.search_events(
            f"src_ip={incident.source}",
            time_range="24h"
        )
        
        return {
            'related_events': events,
            'event_count': len(events)
        }

Firewall Integration

class FirewallConnector:
    def __init__(self, host: str, api_key: str):
        self.host = host
        self.api_key = api_key
        
    def block_ip(self, ip: str, duration: int = 3600):
        """Block IP address"""
        # Implementation using firewall API
        pass
    
    def unblock_ip(self, ip: str):
        """Unblock IP address"""
        pass
    
    def check_block_status(self, ip: str) -> bool:
        """Check if IP is blocked"""
        pass

Automated Response Playbooks

Brute Force Response

brute_force_playbook = Playbook(
    name="Brute Force Response",
    steps=[
        # Step 1: Get authentication logs
        {
            'action': 'query_logs',
            'params': {
                'query': f"event_type=auth_failed AND src_ip={incident.source}",
                'time_range': '1h'
            }
        },
        # Step 2: Check failed login count
        {
            'action': 'analyze_auth_pattern',
            'params': {'events': '$QUERY_RESULT'}
        },
        # Step 3: Block if threshold exceeded
        {
            'action': 'conditional_block',
            'params': {
                'condition': 'failed_logins > 10',
                'then': {'action': 'block_ip', 'ip': incident.source},
                'else': {'action': 'add_watchlist', 'ip': incident.source}
            }
        },
        # Step 4: Notify user
        {
            'action': 'send_notification',
            'params': {
                'channel': 'email',
                'recipient': incident.assignee,
                'message': f'Brute force detected from {incident.source}'
            }
        }
    ]
)

Ransomware Detection Response

ransomware_playbook = Playbook(
    name="Ransomware Response",
    steps=[
        # Step 1: Isolate affected system
        {
            'action': 'isolate_endpoint',
            'params': {'hostname': '$AFFECTED_HOST'}
        },
        # Step 2: Collect forensics
        {
            'action': 'collect_forensics',
            'params': {'hostname': '$AFFECTED_HOST'}
        },
        # Step 3: Check for encryption activity
        {
            'action': 'analyze_file_activity',
            'params': {'hostname': '$AFFECTED_HOST'}
        },
        # Step 4: Block malicious files
        {
            'action': 'quarantine_files',
            'params': {'hashes': '$MALICIOUS_HASHES'}
        },
        # Step 5: Notify security team
        {
            'action': 'create_ticket',
            'params': {'severity': 'critical', 'title': 'Ransomware detected'}
        }
    ]
)

Metrics and Reporting

Key Metrics

Metric Description Target
Mean Time to Detect (MTTD) Time from incident to detection < 1 hour
Mean Time to Respond (MTTR) Time from detection to resolution < 4 hours
Automation Rate % of incidents handled automatically > 70%
False Positive Rate % of false positive alerts < 15%
Playbook Coverage % of incident types with playbooks > 90%

Reporting Dashboard

class SOARReporting:
    def __init__(self, soar: SOARPlatform):
        self.soar = soar
        
    def get_dashboard_metrics(self) -> dict:
        """Get metrics for dashboard"""
        incidents = self.soar.incidents
        
        resolved = [i for i in incidents if i.status == 'resolved']
        automated = [i for i in incidents if i.status == 'auto_resolved']
        
        return {
            'total_incidents': len(incidents),
            'resolved_count': len(resolved),
            'resolution_rate': len(resolved) / len(incidents) if incidents else 0,
            'automation_rate': len(automated) / len(incidents) if incidents else 0,
            'avg_resolution_time': self._calculate_avg_time(resolved),
            'open_incidents': len([i for i in incidents if i.status != 'resolved'])
        }
    
    def _calculate_avg_time(self, resolved_incidents: list) -> float:
        """Calculate average resolution time"""
        if not resolved_incidents:
            return 0.0
        
        total_time = sum([
            (i.resolved_at - i.created_at).total_seconds()
            for i in resolved_incidents
        ])
        
        return total_time / len(resolved_incidents) / 3600  # Hours

Conclusion

SOAR platforms transform incident response:

  1. Playbook automation - Streamline repetitive tasks
  2. Automated triage - Filter noise quickly
  3. Tool orchestration - Integrate security tools
  4. Faster response - Reduce MTTR significantly
  5. Metrics-driven - Measure and improve continuously

External Resources


Comments