from __future__ import annotations import re from dataclasses import dataclass from typing import Any import requests from depaudit.checks import OutdatedPackage @dataclass class RegistryClient: base_url: str timeout: int = 30 def get_latest_version(self, package_name: str) -> str | None: raise NotImplementedError def parse_version(self, version_data: Any) -> str: raise NotImplementedError class NPMClient(RegistryClient): def __init__(self, timeout: int = 30): super().__init__("https://registry.npmjs.org", timeout) def get_latest_version(self, package_name: str) -> str | None: try: url = f"{self.base_url}/{package_name}/latest" response = requests.get(url, timeout=self.timeout) if response.status_code == 200: data = response.json() return data.get("version") return None except Exception: return None def parse_version(self, version_data: Any) -> str: if isinstance(version_data, dict): return version_data.get("version", "") return str(version_data) class PyPIClient(RegistryClient): def __init__(self, timeout: int = 30): super().__init__("https://pypi.org/pypi", timeout) def get_latest_version(self, package_name: str) -> str | None: try: url = f"{self.base_url}/{package_name}/json" response = requests.get(url, timeout=self.timeout) if response.status_code == 200: data = response.json() return data.get("info", {}).get("version") return None except Exception: return None def parse_version(self, version_data: Any) -> str: return str(version_data) class CratesClient(RegistryClient): def __init__(self, timeout: int = 30): super().__init__("https://crates.io/api/v1", timeout) def get_latest_version(self, package_name: str) -> str | None: try: url = f"{self.base_url}/crates/{package_name}" response = requests.get(url, timeout=self.timeout) if response.status_code == 200: data = response.json() versions = data.get("versions", []) if versions: return versions[0].get("num") return None except Exception: return None def parse_version(self, version_data: Any) -> str: return str(version_data) class GoClient(RegistryClient): def __init__(self, timeout: int = 30): super().__init__("https://pkg.go.dev", timeout) def get_latest_version(self, package_name: str) -> str | None: try: url = f"{self.base_url}/info?module={package_name}" response = requests.get(url, timeout=self.timeout) if response.status_code == 200: data = response.json() version = data.get("Version") if version: match = re.match(r"v?([\d.]+)", version) if match: return match.group(0) return None except Exception: return None def parse_version(self, version_data: Any) -> str: return str(version_data) class MavenClient(RegistryClient): def __init__(self, timeout: int = 30): super().__init__("https://repo1.maven.org/maven2", timeout) def get_latest_version(self, group_id: str, artifact_id: str) -> str | None: try: group_path = group_id.replace(".", "/") metadata_url = f"{self.base_url}/{group_path}/{artifact_id}/maven-metadata.xml" response = requests.get(metadata_url, timeout=self.timeout) if response.status_code == 200: content = response.text match = re.search(r"([^<]+)", content) if match: return match.group(1) return None except Exception: return None def parse_version(self, version_data: Any) -> str: return str(version_data) CLIENTS = { "javascript": NPMClient, "python": PyPIClient, "rust": CratesClient, "go": GoClient, "java": MavenClient, } def get_client(language: str, timeout: int = 30) -> RegistryClient | None: client_class = CLIENTS.get(language) if client_class: return client_class(timeout) return None def check_outdated( package_name: str, current_version: str, language: str, timeout: int = 30, ) -> OutdatedPackage | None: client = get_client(language, timeout) if client is None: return None if language == "java": if ":" in package_name: group_id, artifact_id = package_name.split(":", 1) latest_version = client.get_latest_version(group_id, artifact_id) else: return None else: latest_version = client.get_latest_version(package_name) if latest_version is None: return None from depaudit.utils import compare_versions comparison = compare_versions(current_version, latest_version) major_available = False minor_available = False patch_available = False if comparison < 0: curr_parts = current_version.split(".") latest_parts = latest_version.split(".") while len(curr_parts) < 3: curr_parts.append("0") while len(latest_parts) < 3: latest_parts.append("0") if latest_parts[0] != curr_parts[0]: major_available = True elif latest_parts[1] != curr_parts[1]: minor_available = True else: patch_available = True return OutdatedPackage( package_name=package_name, current_version=current_version, latest_version=latest_version, language=language, minor_available=minor_available, patch_available=patch_available, major_available=major_available, )