diff --git a/src/depcheck/analyzers/cve.py b/src/depcheck/analyzers/cve.py new file mode 100644 index 0000000..096aa2a --- /dev/null +++ b/src/depcheck/analyzers/cve.py @@ -0,0 +1,130 @@ +"""CVE analysis module with bundled vulnerability database.""" + +import re +from pathlib import Path +from typing import Optional + +import yaml + +from depcheck.models import Dependency, ScanResult, Severity, Vulnerability + + +def load_cve_database() -> dict: + """Load the bundled CVE database.""" + db_path = Path(__file__).parent.parent / "data" / "cve_database.yaml" + if db_path.exists(): + with open(db_path, "r") as f: + return yaml.safe_load(f) or {} + return {"cves": [], "version_warnings": []} + + +class CVEAnalyzer: + """Analyze dependencies for known vulnerabilities using bundled database.""" + + def __init__(self, db: Optional[dict] = None): + self.db = db or load_cve_database() + self._build_index() + + def _build_index(self) -> None: + """Build index for faster lookups.""" + self._cve_index: dict[str, list] = {} + self._warning_index: dict[str, dict] = {} + + for cve in self.db.get("cves", []): + package = cve.get("package", "").lower() + if package not in self._cve_index: + self._cve_index[package] = [] + self._cve_index[package].append(cve) + + for warning in self.db.get("version_warnings", []): + package = warning.get("package", "").lower() + self._warning_index[package] = warning + + def analyze(self, dependency: Dependency) -> list[Vulnerability]: + """Check a dependency against the CVE database.""" + vulnerabilities = [] + package_name = dependency.name.lower() + + cves = self._cve_index.get(package_name, []) + for cve_data in cves: + if self._is_version_affected( + dependency.current_version, + cve_data.get("affected_versions", ""), + ): + vulnerability = Vulnerability( + cve_id=cve_data.get("cve_id", "UNKNOWN"), + severity=Severity(cve_data.get("severity", "medium")), + description=cve_data.get("description", ""), + affected_versions=cve_data.get("affected_versions", ""), + fixed_version=cve_data.get("fixed_version"), + references=cve_data.get("references", []), + ) + vulnerabilities.append(vulnerability) + + return vulnerabilities + + def _is_version_affected(self, current: str, affected_range: str) -> bool: + """Check if current version is within affected range.""" + if not current or not affected_range: + return False + + current = current.strip() + affected_range = affected_range.strip() + + if affected_range.startswith("<"): + version_part = affected_range[1:].strip() + return self._compare_versions(current, version_part) < 0 + elif affected_range.startswith("<="): + version_part = affected_range[2:].strip() + return self._compare_versions(current, version_part) <= 0 + + return False + + def _compare_versions(self, v1: str, v2: str) -> int: + """Compare two version strings. Returns -1 if v1 < v2, 0 if equal, 1 if v1 > v2.""" + try: + from packaging.version import parse as parse_version + + parsed_v1 = parse_version(v1) + parsed_v2 = parse_version(v2) + + if parsed_v1 < parsed_v2: + return -1 + elif parsed_v1 > parsed_v2: + return 1 + return 0 + except Exception: + parts1 = re.split(r"[.\-_]", v1) + parts2 = re.split(r"[.\-_]", v2) + + for i in range(max(len(parts1), len(parts2))): + p1 = parts1[i] if i < len(parts1) else "0" + p2 = parts2[i] if i < len(parts2) else "0" + + try: + cmp = int(p1) - int(p2) + except ValueError: + cmp = (p1 > p2) - (p1 < p2) + + if cmp != 0: + return 1 if cmp > 0 else -1 + + return 0 + + def check_version_warning(self, dependency: Dependency) -> Optional[str]: + """Check if there's a version warning for this dependency.""" + package_name = dependency.name.lower() + warning = self._warning_index.get(package_name) + if warning: + min_safe = warning.get("min_safe", "") + if self._compare_versions(dependency.current_version, min_safe) < 0: + return warning.get("message") + return None + + def analyze_scan_result(self, result: ScanResult) -> ScanResult: + """Analyze all dependencies in a scan result for vulnerabilities.""" + for dep in result.dependencies: + vulnerabilities = self.analyze(dep) + for vuln in vulnerabilities: + result.vulnerabilities.append((dep, vuln)) + return result