From 08a9f1415a8f9817bab9f8d68be39e494ce33acc Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 21:34:46 +0000 Subject: [PATCH] Add checks modules (vulnerabilities, outdated) --- depaudit/checks/outdated.py | 200 ++++++++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 depaudit/checks/outdated.py diff --git a/depaudit/checks/outdated.py b/depaudit/checks/outdated.py new file mode 100644 index 0000000..7f130a8 --- /dev/null +++ b/depaudit/checks/outdated.py @@ -0,0 +1,200 @@ +from __future__ import annotations + +import json +import re +from dataclasses import dataclass +from typing import Any + +import requests + +from depaudit.checks import OutdatedPackage + + +@dataclass +class RegistryClient: + base_url: str + timeout: int = 30 + + def get_latest_version(self, package_name: str) -> str | None: + raise NotImplementedError + + def parse_version(self, version_data: Any) -> str: + raise NotImplementedError + + +class NPMClient(RegistryClient): + def __init__(self, timeout: int = 30): + super().__init__("https://registry.npmjs.org", timeout) + + def get_latest_version(self, package_name: str) -> str | None: + try: + url = f"{self.base_url}/{package_name}/latest" + response = requests.get(url, timeout=self.timeout) + if response.status_code == 200: + data = response.json() + return data.get("version") + return None + except Exception: + return None + + def parse_version(self, version_data: Any) -> str: + if isinstance(version_data, dict): + return version_data.get("version", "") + return str(version_data) + + +class PyPIClient(RegistryClient): + def __init__(self, timeout: int = 30): + super().__init__("https://pypi.org/pypi", timeout) + + def get_latest_version(self, package_name: str) -> str | None: + try: + url = f"{self.base_url}/{package_name}/json" + response = requests.get(url, timeout=self.timeout) + if response.status_code == 200: + data = response.json() + return data.get("info", {}).get("version") + return None + except Exception: + return None + + def parse_version(self, version_data: Any) -> str: + return str(version_data) + + +class CratesClient(RegistryClient): + def __init__(self, timeout: int = 30): + super().__init__("https://crates.io/api/v1", timeout) + + def get_latest_version(self, package_name: str) -> str | None: + try: + url = f"{self.base_url}/crates/{package_name}" + response = requests.get(url, timeout=self.timeout) + if response.status_code == 200: + data = response.json() + versions = data.get("versions", []) + if versions: + return versions[0].get("num") + return None + except Exception: + return None + + def parse_version(self, version_data: Any) -> str: + return str(version_data) + + +class GoClient(RegistryClient): + def __init__(self, timeout: int = 30): + super().__init__("https://pkg.go.dev", timeout) + + def get_latest_version(self, package_name: str) -> str | None: + try: + url = f"{self.base_url}/info?module={package_name}" + response = requests.get(url, timeout=self.timeout) + if response.status_code == 200: + data = response.json() + version = data.get("Version") + if version: + match = re.match(r"v?([\d.]+)", version) + if match: + return match.group(0) + return None + except Exception: + return None + + def parse_version(self, version_data: Any) -> str: + return str(version_data) + + +class MavenClient(RegistryClient): + def __init__(self, timeout: int = 30): + super().__init__("https://repo1.maven.org/maven2", timeout) + + def get_latest_version(self, group_id: str, artifact_id: str) -> str | None: + try: + group_path = group_id.replace(".", "/") + metadata_url = f"{self.base_url}/{group_path}/{artifact_id}/maven-metadata.xml" + response = requests.get(metadata_url, timeout=self.timeout) + if response.status_code == 200: + content = response.text + match = re.search(r"([^<]+)", content) + if match: + return match.group(1) + return None + except Exception: + return None + + def parse_version(self, version_data: Any) -> str: + return str(version_data) + + +CLIENTS = { + "javascript": NPMClient, + "python": PyPIClient, + "rust": CratesClient, + "go": GoClient, + "java": MavenClient, +} + + +def get_client(language: str, timeout: int = 30) -> RegistryClient | None: + client_class = CLIENTS.get(language) + if client_class: + return client_class(timeout) + return None + + +def check_outdated( + package_name: str, + current_version: str, + language: str, + timeout: int = 30, +) -> OutdatedPackage | None: + client = get_client(language, timeout) + if client is None: + return None + + if language == "java": + if ":" in package_name: + group_id, artifact_id = package_name.split(":", 1) + latest_version = client.get_latest_version(group_id, artifact_id) + else: + return None + else: + latest_version = client.get_latest_version(package_name) + + if latest_version is None: + return None + + from depaudit.utils import compare_versions + + comparison = compare_versions(current_version, latest_version) + major_available = False + minor_available = False + patch_available = False + + if comparison < 0: + curr_parts = current_version.split(".") + latest_parts = latest_version.split(".") + + while len(curr_parts) < 3: + curr_parts.append("0") + while len(latest_parts) < 3: + latest_parts.append("0") + + if latest_parts[0] != curr_parts[0]: + major_available = True + elif latest_parts[1] != curr_parts[1]: + minor_available = True + else: + patch_available = True + + return OutdatedPackage( + package_name=package_name, + current_version=current_version, + latest_version=latest_version, + language=language, + minor_available=minor_available, + patch_available=patch_available, + major_available=major_available, + )