Skip to main content
โšก Calmops

Vulnerability Management: Scanning, Prioritization, and Remediation

Vulnerability Management: Scanning, Prioritization, and Remediation

TL;DR: This guide covers building a comprehensive vulnerability management program. Learn vulnerability scanning techniques, risk prioritization frameworks, remediation workflows, and continuous monitoring strategies.


Introduction

Vulnerability management is a continuous process of identifying, evaluating, prioritizing, and remediating security vulnerabilities. A mature vulnerability management program helps organizations:

  • Reduce attack surface - Find and fix weaknesses before attackers
  • Prioritize effectively - Focus on highest-risk vulnerabilities
  • Track progress - Measure security improvements over time
  • Meet compliance - Satisfy regulatory requirements

Vulnerability Assessment

Types of Scanning

Type Description Tools
Network Scanning Find open ports and services Nmap, Nessus
Credentialed Scanning Authenticated vulnerability checks Qualys, Tenable
Web Application Scanning Find web vulnerabilities OWASP ZAP, Burp Suite
Container Scanning Scan container images Trivy, Clair
Code Scanning Find vulnerabilities in source SonarQube, Snyk

Network Vulnerability Scanner

import nmap

class NetworkVulnerabilityScanner:
    def __init__(self):
        self.scanner = nmap.PortScanner()
        
    def scan_network(self, network: str) -> dict:
        """Scan network for vulnerabilities"""
        self.scanner.scan(
            hosts=network,
            arguments='-sV --script vuln'
        )
        
        results = {}
        for host in self.scanner.all_hosts():
            results[host] = {
                'state': self.scanner[host].state(),
                'services': self.scanner[host].get('services', {}),
                'vulnerabilities': self._extract_vulnerabilities(host)
            }
        
        return results
    
    def _extract_vulnerabilities(self, host: str) -> list:
        """Extract vulnerabilities from scan results"""
        vulns = []
        for proto in self.scanner[host].all_protocols():
            ports = self.scanner[host][proto].keys()
            for port in ports:
                service = self.scanner[host][proto][port]
                if 'script' in service:
                    vulns.append({
                        'port': port,
                        'service': service.get('name'),
                        'vulns': service.get('script', {})
                    })
        return vulns

Web Application Scanning

OWASP ZAP Integration

from zapv2 import ZAPv2

class WebVulnerabilityScanner:
    def __init__(self, target_url: str):
        self.target_url = target_url
        self.zap = ZAPv2(proxies={'http': 'http://localhost:8080', 'https': 'localhost:8080'})
        
    def spider_scan(self):
        """Spider the application"""
        self.zap.urlopen(self.target_url)
        scan_id = self.zap.spider.scan(self.target_url)
        
        while int(self.zap.spider.status(scan_id)) < 100:
            time.sleep(5)
        
        return self.zap.spider.results(scan_id)
    
    def active_scan(self):
        """Run active vulnerability scan"""
        scan_id = self.zap.ascan.scan(self.target_url)
        
        while int(self.zap.ascan.status(scan_id)) < 100:
            time.sleep(5)
        
        return self.get_findings()
    
    def get_findings(self) -> list:
        """Get all findings"""
        alerts = self.zap.core.alerts()
        return [
            {
                'name': alert['name'],
                'risk': alert['risk'],
                'url': alert['url'],
                'description': alert['description'],
                'solution': alert['solution']
            }
            for alert in alerts
        ]

Container Vulnerability Scanning

Trivy Integration

# Install Trivy
trivy image myapp:latest

# Scan with specific severity
trivy image --severity HIGH,CRITICAL myapp:latest

# Output to JSON
trivy image --format json --output results.json myapp:latest

Container Scan Script

import subprocess
import json

class ContainerVulnerabilityScanner:
    def __init__(self, image_name: str):
        self.image_name = image_name
        
    def scan(self) -> dict:
        """Scan container image for vulnerabilities"""
        result = subprocess.run(
            ['trivy', 'image', '--format', 'json', '--severity', 'HIGH,CRITICAL', 
             self.image_name],
            capture_output=True,
            text=True
        )
        
        return json.loads(result.stdout)
    
    def parse_results(self, scan_results: dict) -> list:
        """Parse and categorize vulnerabilities"""
        vulnerabilities = []
        
        for result in scan_results.get('Results', []):
            for vuln in result.get('Vulnerabilities', []):
                vulnerabilities.append({
                    'id': vuln['VulnerabilityID'],
                    'severity': vuln['Severity'],
                    'package': vuln['PkgName'],
                    'version': vuln['InstalledVersion'],
                    'fixed_version': vuln.get('FixedVersion', 'N/A'),
                    'description': vuln.get('Description', '')
                })
        
        return vulnerabilities

Risk Prioritization

CVSS Scoring

from dataclasses import dataclass
from enum import Enum

class Severity(Enum):
    CRITICAL = 9.0
    HIGH = 7.0
    MEDIUM = 4.0
    LOW = 0.1
    INFO = 0.0

@dataclass
class Vulnerability:
    cve_id: str
    cvss_score: float
    exploit_available: bool
    affected_systems: int
    remediation_complexity: str
    
    @property
    def priority_score(self) -> float:
        """Calculate priority score"""
        base_score = self.cvss_score
        
        # Boost for available exploits
        if self.exploit_available:
            base_score *= 1.5
            
        # Boost for affected systems
        base_score *= (1 + min(self.affected_systems / 100, 0.5))
        
        # Reduce for complex remediation
        if self.remediation_complexity == 'High':
            base_score *= 0.8
        elif self.remediation_complexity == 'Low':
            base_score *= 1.2
            
        return min(base_score, 10.0)
    
    @property
    def severity(self) -> Severity:
        """Get severity level"""
        if self.cvss_score >= 9.0:
            return Severity.CRITICAL
        elif self.cvss_score >= 7.0:
            return Severity.HIGH
        elif self.cvss_score >= 4.0:
            return Severity.MEDIUM
        elif self.cvss_score > 0:
            return Severity.LOW
        return Severity.INFO

Prioritization Matrix

CVSS Score Exploitable Priority SLA
9.0-10.0 Yes P1 - Critical 24 hours
9.0-10.0 No P2 - High 7 days
7.0-8.9 Yes P2 - High 7 days
7.0-8.9 No P3 - Medium 30 days
4.0-6.9 Any P3 - Medium 30 days
0.1-3.9 Any P4 - Low 90 days

Remediation Workflow

Ticketing Integration

class RemediationWorkflow:
    def __init__(self, jira_client):
        self.jira = jira_client
        
    def create_ticket(self, vulnerability: Vulnerability) -> str:
        """Create remediation ticket"""
        priority = self._map_priority(vulnerability.severity)
        
        ticket = self.jira.create_issue(
            project='SEC',
            summary=f'CVE-{vulnerability.cve_id}: Security Vulnerability',
            description=self._generate_description(vulnerability),
            issuetype='Bug',
            priority=priority
        )
        
        return ticket.key
    
    def _map_priority(self, severity: Severity) -> str:
        """Map severity to Jira priority"""
        mapping = {
            Severity.CRITICAL: 'Highest',
            Severity.HIGH: 'High',
            Severity.MEDIUM: 'Medium',
            Severity.LOW: 'Low'
        }
        return mapping.get(severity, 'Medium')
    
    def _generate_description(self, vuln: Vulnerability) -> str:
        """Generate ticket description"""
        return f"""
## Vulnerability Details

- **CVE ID**: {vuln.cve_id}
- **CVSS Score**: {vuln.cvss_score}
- **Severity**: {vuln.severity.name}
- **Affected Systems**: {vuln.affected_systems}

## Remediation

Please remediate this vulnerability and update the ticket once complete.
"""

Continuous Monitoring

Vulnerability Feed Integration

class VulnerabilityFeed:
    def __init__(self):
        self.nvd_api = 'https://services.nvd.nist.gov/rest/json/cves/2.0'
        
    def get_recent_cves(self, days: int = 7) -> list:
        """Get recent CVEs from NVD"""
        # Implementation using NVD API
        pass
    
    def check_applicable_cves(self, software_list: list) -> list:
        """Check which CVEs affect your software"""
        recent = self.get_recent_cves()
        applicable = []
        
        for cve in recent:
            for software in software_list:
                if self._is_affected(cve, software):
                    applicable.append(cve)
        
        return applicable
    
    def _is_affected(self, cve, software) -> bool:
        """Check if CVE affects software"""
        # Check CPE matches
        pass

Metrics and Reporting

KPI Dashboard

Metric Description Target
Mean Time to Remediate (MTTR) Average time to fix vulnerabilities < 30 days
Vulnerability Density Vulns per 1000 lines of code < 5
Patch Compliance % of systems patched within SLA > 95%
False Positive Rate % of confirmed false positives < 10%
Coverage % of assets scanned 100%

Conclusion

Effective vulnerability management requires:

  1. Comprehensive scanning - Network, web, container, and code scanning
  2. Risk-based prioritization - Focus on highest-impact vulnerabilities
  3. Automated remediation - Streamline fix processes
  4. Continuous monitoring - Stay current with new vulnerabilities
  5. Metrics-driven - Track and improve over time

External Resources


Comments