This commit is contained in:
130
src/depcheck/analyzers/cve.py
Normal file
130
src/depcheck/analyzers/cve.py
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
"""CVE analysis module with bundled vulnerability database."""
|
||||||
|
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
from depcheck.models import Dependency, ScanResult, Severity, Vulnerability
|
||||||
|
|
||||||
|
|
||||||
|
def load_cve_database() -> dict:
|
||||||
|
"""Load the bundled CVE database."""
|
||||||
|
db_path = Path(__file__).parent.parent / "data" / "cve_database.yaml"
|
||||||
|
if db_path.exists():
|
||||||
|
with open(db_path, "r") as f:
|
||||||
|
return yaml.safe_load(f) or {}
|
||||||
|
return {"cves": [], "version_warnings": []}
|
||||||
|
|
||||||
|
|
||||||
|
class CVEAnalyzer:
|
||||||
|
"""Analyze dependencies for known vulnerabilities using bundled database."""
|
||||||
|
|
||||||
|
def __init__(self, db: Optional[dict] = None):
|
||||||
|
self.db = db or load_cve_database()
|
||||||
|
self._build_index()
|
||||||
|
|
||||||
|
def _build_index(self) -> None:
|
||||||
|
"""Build index for faster lookups."""
|
||||||
|
self._cve_index: dict[str, list] = {}
|
||||||
|
self._warning_index: dict[str, dict] = {}
|
||||||
|
|
||||||
|
for cve in self.db.get("cves", []):
|
||||||
|
package = cve.get("package", "").lower()
|
||||||
|
if package not in self._cve_index:
|
||||||
|
self._cve_index[package] = []
|
||||||
|
self._cve_index[package].append(cve)
|
||||||
|
|
||||||
|
for warning in self.db.get("version_warnings", []):
|
||||||
|
package = warning.get("package", "").lower()
|
||||||
|
self._warning_index[package] = warning
|
||||||
|
|
||||||
|
def analyze(self, dependency: Dependency) -> list[Vulnerability]:
|
||||||
|
"""Check a dependency against the CVE database."""
|
||||||
|
vulnerabilities = []
|
||||||
|
package_name = dependency.name.lower()
|
||||||
|
|
||||||
|
cves = self._cve_index.get(package_name, [])
|
||||||
|
for cve_data in cves:
|
||||||
|
if self._is_version_affected(
|
||||||
|
dependency.current_version,
|
||||||
|
cve_data.get("affected_versions", ""),
|
||||||
|
):
|
||||||
|
vulnerability = Vulnerability(
|
||||||
|
cve_id=cve_data.get("cve_id", "UNKNOWN"),
|
||||||
|
severity=Severity(cve_data.get("severity", "medium")),
|
||||||
|
description=cve_data.get("description", ""),
|
||||||
|
affected_versions=cve_data.get("affected_versions", ""),
|
||||||
|
fixed_version=cve_data.get("fixed_version"),
|
||||||
|
references=cve_data.get("references", []),
|
||||||
|
)
|
||||||
|
vulnerabilities.append(vulnerability)
|
||||||
|
|
||||||
|
return vulnerabilities
|
||||||
|
|
||||||
|
def _is_version_affected(self, current: str, affected_range: str) -> bool:
|
||||||
|
"""Check if current version is within affected range."""
|
||||||
|
if not current or not affected_range:
|
||||||
|
return False
|
||||||
|
|
||||||
|
current = current.strip()
|
||||||
|
affected_range = affected_range.strip()
|
||||||
|
|
||||||
|
if affected_range.startswith("<"):
|
||||||
|
version_part = affected_range[1:].strip()
|
||||||
|
return self._compare_versions(current, version_part) < 0
|
||||||
|
elif affected_range.startswith("<="):
|
||||||
|
version_part = affected_range[2:].strip()
|
||||||
|
return self._compare_versions(current, version_part) <= 0
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _compare_versions(self, v1: str, v2: str) -> int:
|
||||||
|
"""Compare two version strings. Returns -1 if v1 < v2, 0 if equal, 1 if v1 > v2."""
|
||||||
|
try:
|
||||||
|
from packaging.version import parse as parse_version
|
||||||
|
|
||||||
|
parsed_v1 = parse_version(v1)
|
||||||
|
parsed_v2 = parse_version(v2)
|
||||||
|
|
||||||
|
if parsed_v1 < parsed_v2:
|
||||||
|
return -1
|
||||||
|
elif parsed_v1 > parsed_v2:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
except Exception:
|
||||||
|
parts1 = re.split(r"[.\-_]", v1)
|
||||||
|
parts2 = re.split(r"[.\-_]", v2)
|
||||||
|
|
||||||
|
for i in range(max(len(parts1), len(parts2))):
|
||||||
|
p1 = parts1[i] if i < len(parts1) else "0"
|
||||||
|
p2 = parts2[i] if i < len(parts2) else "0"
|
||||||
|
|
||||||
|
try:
|
||||||
|
cmp = int(p1) - int(p2)
|
||||||
|
except ValueError:
|
||||||
|
cmp = (p1 > p2) - (p1 < p2)
|
||||||
|
|
||||||
|
if cmp != 0:
|
||||||
|
return 1 if cmp > 0 else -1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def check_version_warning(self, dependency: Dependency) -> Optional[str]:
|
||||||
|
"""Check if there's a version warning for this dependency."""
|
||||||
|
package_name = dependency.name.lower()
|
||||||
|
warning = self._warning_index.get(package_name)
|
||||||
|
if warning:
|
||||||
|
min_safe = warning.get("min_safe", "")
|
||||||
|
if self._compare_versions(dependency.current_version, min_safe) < 0:
|
||||||
|
return warning.get("message")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def analyze_scan_result(self, result: ScanResult) -> ScanResult:
|
||||||
|
"""Analyze all dependencies in a scan result for vulnerabilities."""
|
||||||
|
for dep in result.dependencies:
|
||||||
|
vulnerabilities = self.analyze(dep)
|
||||||
|
for vuln in vulnerabilities:
|
||||||
|
result.vulnerabilities.append((dep, vuln))
|
||||||
|
return result
|
||||||
Reference in New Issue
Block a user