Skip to main content
โšก Calmops

Threat Intelligence: OSINT, Threat Feeds, Incident Response

Introduction

Threat intelligence transforms security from reactive to proactive. Organizations with mature threat intelligence programs detect threats 65% faster and reduce breach costs by 50%. This guide covers building a threat intelligence capability from scratch.

Key Statistics:

  • 68% of breaches involve external actors
  • Average dwell time: 197 days (detection)
  • Threat intelligence reduces false positives by 45%
  • 80% of successful attacks exploit known vulnerabilities

Threat Intelligence Lifecycle

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Threat Intelligence Lifecycle                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚    โ”‚  Plan   โ”‚โ”€โ”€โ”€โ–ถโ”‚ Collect โ”‚โ”€โ”€โ”€โ–ถโ”‚ Process โ”‚โ”€โ”€โ”€โ–ถโ”‚ Analyze โ”‚   โ”‚
โ”‚    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚        โ–ฒ                                            โ”‚         โ”‚
โ”‚        โ”‚                                            โ–ผ         โ”‚
โ”‚        โ”‚              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚         โ”‚
โ”‚        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚  Share  โ”‚โ—€โ”€โ”€โ”€โ”‚  Dissem โ”‚โ—€โ”€โ”€โ”€โ”˜         โ”‚
โ”‚                       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜              โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

OSINT Techniques

Domain Reconnaissance

#!/usr/bin/env python3
"""OSINT domain reconnaissance."""

import socket
import whois
import requests
from datetime import datetime
from bs4 import BeautifulSoup

def analyze_domain(domain):
    """Comprehensive domain analysis."""
    
    results = {
        'domain': domain,
        'timestamp': datetime.now().isoformat(),
        'whois': {},
        'dns': {},
        'web_technologies': [],
        'subdomains': []
    }
    
    # WHOIS lookup
    try:
        w = whois.whois(domain)
        results['whois'] = {
            'registrar': w.registrar,
            'creation_date': str(w.creation_date),
            'expiration_date': str(w.expiration_date),
            'name_servers': w.name_servers,
            'registrant': {
                'name': w.name,
                'country': w.country
            }
        }
    except Exception as e:
        results['whois']['error'] = str(e)
    
    # DNS records
    try:
        results['dns']['a'] = socket.gethostbyname(domain)
    except:
        pass
    
    try:
        results['dns']['mx'] = socket.gethostbyname_ex(f'mail.{domain}')
    except:
        pass
    
    # Technology detection
    try:
        response = requests.get(f'https://{domain}', timeout=10)
        server = response.headers.get('Server', 'Unknown')
        results['web_technologies'].append(f'Server: {server}')
        
        # Check for common technologies
        html = response.text
        if 'wp-content' in html:
            results['web_technologies'].append('WordPress')
        if 'react' in html.lower():
            results['web_technologies'].append('React')
        if 'vue' in html.lower():
            results['web_technologies'].append('Vue.js')
    except Exception as e:
        results['web_technologies'].append(f'Error: {str(e)}')
    
    return results

if __name__ == '__main__':
    import json
    result = analyze_domain('example.com')
    print(json.dumps(result, indent=2))

Subdomain Enumeration

# Using amass
amass enum -d example.com -o subdomains.txt

# Using assetfinder
assetfinder --subs-only example.com > subdomains.txt

# Using findomain
findomain -t example.com -o subdomains.txt

# Combining results
cat subdomains.txt | sort -u > combined_subdomains.txt
#!/usr/bin/env python3
"""Check for compromised credentials."""

import requests
import json

def check_breach(email):
    """Check if email appears in known breaches."""
    
    # Using Have I Been Pwned API
    url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
    headers = {
        'User-Agent': 'ThreatIntelTool/1.0'
    }
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        
        if response.status_code == 200:
            breaches = response.json()
            return {
                'email': email,
                'breached': True,
                'breach_count': len(breaches),
                'breaches': [b['Name'] for b in breaches]
            }
        elif response.status_code == 404:
            return {'email': email, 'breached': False}
        else:
            return {'error': f'Status {response.status_code}'}
    except Exception as e:
        return {'error': str(e)}

def check_password_compromised(password_hash):
    """Check if password hash is compromised."""
    
    # k-Anonymity model
    prefix = password_hash[:5]
    suffix = password_hash[5:]
    
    url = f"https://api.pwnedpasswords.com/range/{prefix}"
    response = requests.get(url)
    
    hashes = response.text.split('\n')
    for h in hashes:
        hash_suffix, count = h.split(':')
        if hash_suffix.lower() == suffix.lower():
            return {'compromised': True, 'count': int(count)}
    
    return {'compromised': False}

Threat Feeds Integration

Open Source Feeds

# Threat Intelligence Platform Configuration
threat_intel:
  sources:
    # AlienVault OTX
    alienvault:
      enabled: true
      api_key: ${ALIENVAULT_API_KEY}
      pulse_types:
        - malware
        - vulnerability
        - threat
        
    # Abuse.ch Feeds
    abusech:
      enabled: true
      feeds:
        - url: "https://urlhaus.abuse.ch/downloads/json/"
          type: malware_urls
        - url: "https://feodotracker.abuse.ch/downloads/ipblocklist.json"
          type: c2_indicators
        - url: "https://spamhaus.org/drop/asn.txt"
          type: asn_blocklist
        
    # OTX AlienVault
    otx:
      enabled: true
      api_key: ${OTX_API_KEY}
      
    # ThreatFox
    threatfox:
      enabled: true
      url: "https://threatfox.abuse.ch/downloads/json/"

MISP Integration

# MISP (Malware Information Sharing Platform)
misp:
  url: "https://misp.example.com"
  api_key: ${MISP_API_KEY}
  
  # Auto-import indicators
  auto_import:
    enabled: true
    tags:
      - "tlp:amber"
      - "osint:verified"
    types:
      - domain
      - ip
      - url
      - malware
    filters:
      malware_category: "banker"
      
  # Export for SIEM
  export:
    format: "stix"
    schedule: "hourly"

MITRE ATT&CK Framework

Coverage Mapping

#!/usr/bin/env python3
"""Map security events to MITRE ATT&CK framework."""

MITRE_TACTICS = {
    'TA0001': 'Initial Access',
    'TA0002': 'Execution',
    'TA0003': 'Persistence',
    'TA0004': 'Privilege Escalation',
    'TA0005': 'Defense Evasion',
    'TA0006': 'Credential Access',
    'TA0007': 'Discovery',
    'TA0008': 'Lateral Movement',
    'TA0009': 'Collection',
    'TA0011': 'Command and Control',
    'TA0010': 'Exfiltration',
    'TA0040': 'Impact'
}

MITRE_TECHNIQUES = {
    'T1566': {
        'name': 'Phishing',
        'tactics': ['TA0001'],
        'detection': 'Check for suspicious email attachments/links',
        'mitigation': 'Email filtering, user training'
    },
    'T1059': {
        'name': 'Command and Scripting Interpreter',
        'tactics': ['TA0002'],
        'detection': 'Monitor process execution with PowerShell/WMI',
        'mitigation': 'Restrict script execution, application whitelisting'
    },
    'T1547': {
        'name': 'Boot or Logon Autostart Execution',
        'tactics': ['TA0003'],
        'detection': 'Monitor registry keys and startup folders',
        'mitigation': 'Restrict write access to startup locations'
    },
    'T1082': {
        'name': 'System Information Discovery',
        'tactics': ['TA0007'],
        'detection': 'Monitor for systeminfo, hostname commands',
        'mitigation': 'Limit command-line logging'
    }
}

def map_to_mitre(event_data):
    """Map security event to MITRE ATT&CK technique."""
    
    # Example mapping logic
    technique_mappings = {
        'powershell.exe -encodedcommand': 'T1059',
        'reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run': 'T1547',
        'systeminfo': 'T1082',
        'whoami': 'T1087',
        'net user': 'T1087'
    }
    
    detected_techniques = []
    for pattern, technique_id in technique_mappings.items():
        if pattern.lower() in event_data.get('command', '').lower():
            technique = MITRE_TECHNIQUES.get(technique_id, {})
            detected_techniques.append({
                'technique_id': technique_id,
                'name': technique.get('name', 'Unknown'),
                'tactics': [MITRE_TACTICS.get(t, t) for t in technique.get('tactics', [])],
                'mitigation': technique.get('mitigation', 'N/A')
            })
    
    return detected_techniques

SIEM Integration

Splunk Threat Intelligence

# Splunk threat intelligence configuration
threat_intel:
  # Local threat intelligence lookup
  local_intel:
    path: "/opt/splunk/etc/apps/SA-UTimeIntelligenceLookups"
    
    # IOC fields to lookup
    ioc_fields:
      - src_ip
      - dest_ip
      - domain
      - file_hash
      - url
      
    # Automatic correlation
    correlations:
      - name: "Malware C2 Communication"
        search: "sourcetype=proxy action=allow | lookup mal_ip_list ip as dest_ip OUTPUT action"
        alert:
          threshold: 1
          action: notable
          
      - name: "Phishing Domain Access"
        search: "sourcetype=proxy url=* | lookup phishing_domains domain OUTPUT verdict"
        alert:
          threshold: 1
          action: email

# Threat Intelligence Dashboard
dashboards:
  - name: "Threat Intelligence Overview"
    panels:
      - "Top 10 Attacker IPs"
      - "Malware Family Distribution"
      - "Attack Timeline"
      - "Coverage by MITRE Tactic"

QRadar Integration

#!/usr/bin/env python3
"""QRadar threat intelligence plugin."""

def process_offense(qradar_event):
    """Process QRadar offense with threat intelligence."""
    
    offense = qradar_event['offense']
    
    # Get related indicators
    indicators = {
        'source_ip': offense.get('source_ip'),
        'destination_ip': offense.get('destination_ip'),
        'username': offense.get('username'),
        'domain': offense.get('domain')
    }
    
    # Lookup in threat feeds
    threat_intel = {}
    
    for ioc_type, ioc_value in indicators.items():
        if ioc_value:
            # Query threat feeds
            result = query_threat_feeds(ioc_type, ioc_value)
            if result:
                threat_intel[ioc_value] = result
    
    # Calculate threat score
    threat_score = calculate_threat_score(threat_intel)
    
    # Generate recommendations
    recommendations = []
    if threat_score > 70:
        recommendations.append("Immediate investigation required")
        recommendations.append("Block identified IPs/domains")
    
    return {
        'offense_id': offense['id'],
        'threat_score': threat_score,
        'threat_intel': threat_intel,
        'recommendations': recommendations
    }

Incident Response Integration

Automated IOC Enrichment

# Automated incident response with threat intelligence
incident_response:
  automation:
    - name: "IOC Enrichment"
      trigger:
        severity: high
      actions:
        - lookup_ioc:
            indicators:
              - source_ip
              - dest_ip
              - file_hash
        - update_incident:
            fields:
              threat_actors: "${enriched.threat_actors}"
              malware_families: "${enriched.malware_families}"
              attack_techniques: "${enriched.mitre_techniques}"
              
    - name: "Auto-Block Malicious IPs"
      trigger:
        condition: "threat_confidence > 90"
      actions:
        - block_ip:
            target: "firewall"
            duration: "24h"
        - update_blocklist:
            source: "threat_intel"

Threat Intelligence Platforms

Platform Type Key Features Cost
MISP Open Source IOC sharing, taxii Free
OpenCTI Open Source Knowledge graph, MITRE Free
Anomali Commercial ThreatStream, VM $$$
Recorded Future Commercial Real-time intelligence $$$$
Mandiant Commercial Expert-led intelligence $$$$
CrowdStrike Commercial Falcon Intelligence $$$

External Resources


Comments