201 lines
5.9 KiB
Python
201 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
from depaudit.checks import OutdatedPackage
|
|
|
|
|
|
@dataclass
|
|
class RegistryClient:
|
|
base_url: str
|
|
timeout: int = 30
|
|
|
|
def get_latest_version(self, package_name: str) -> str | None:
|
|
raise NotImplementedError
|
|
|
|
def parse_version(self, version_data: Any) -> str:
|
|
raise NotImplementedError
|
|
|
|
|
|
class NPMClient(RegistryClient):
|
|
def __init__(self, timeout: int = 30):
|
|
super().__init__("https://registry.npmjs.org", timeout)
|
|
|
|
def get_latest_version(self, package_name: str) -> str | None:
|
|
try:
|
|
url = f"{self.base_url}/{package_name}/latest"
|
|
response = requests.get(url, timeout=self.timeout)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("version")
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def parse_version(self, version_data: Any) -> str:
|
|
if isinstance(version_data, dict):
|
|
return version_data.get("version", "")
|
|
return str(version_data)
|
|
|
|
|
|
class PyPIClient(RegistryClient):
|
|
def __init__(self, timeout: int = 30):
|
|
super().__init__("https://pypi.org/pypi", timeout)
|
|
|
|
def get_latest_version(self, package_name: str) -> str | None:
|
|
try:
|
|
url = f"{self.base_url}/{package_name}/json"
|
|
response = requests.get(url, timeout=self.timeout)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("info", {}).get("version")
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def parse_version(self, version_data: Any) -> str:
|
|
return str(version_data)
|
|
|
|
|
|
class CratesClient(RegistryClient):
|
|
def __init__(self, timeout: int = 30):
|
|
super().__init__("https://crates.io/api/v1", timeout)
|
|
|
|
def get_latest_version(self, package_name: str) -> str | None:
|
|
try:
|
|
url = f"{self.base_url}/crates/{package_name}"
|
|
response = requests.get(url, timeout=self.timeout)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
versions = data.get("versions", [])
|
|
if versions:
|
|
return versions[0].get("num")
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def parse_version(self, version_data: Any) -> str:
|
|
return str(version_data)
|
|
|
|
|
|
class GoClient(RegistryClient):
|
|
def __init__(self, timeout: int = 30):
|
|
super().__init__("https://pkg.go.dev", timeout)
|
|
|
|
def get_latest_version(self, package_name: str) -> str | None:
|
|
try:
|
|
url = f"{self.base_url}/info?module={package_name}"
|
|
response = requests.get(url, timeout=self.timeout)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
version = data.get("Version")
|
|
if version:
|
|
match = re.match(r"v?([\d.]+)", version)
|
|
if match:
|
|
return match.group(0)
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def parse_version(self, version_data: Any) -> str:
|
|
return str(version_data)
|
|
|
|
|
|
class MavenClient(RegistryClient):
|
|
def __init__(self, timeout: int = 30):
|
|
super().__init__("https://repo1.maven.org/maven2", timeout)
|
|
|
|
def get_latest_version(self, group_id: str, artifact_id: str) -> str | None:
|
|
try:
|
|
group_path = group_id.replace(".", "/")
|
|
metadata_url = f"{self.base_url}/{group_path}/{artifact_id}/maven-metadata.xml"
|
|
response = requests.get(metadata_url, timeout=self.timeout)
|
|
if response.status_code == 200:
|
|
content = response.text
|
|
match = re.search(r"<release>([^<]+)</release>", content)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def parse_version(self, version_data: Any) -> str:
|
|
return str(version_data)
|
|
|
|
|
|
CLIENTS = {
|
|
"javascript": NPMClient,
|
|
"python": PyPIClient,
|
|
"rust": CratesClient,
|
|
"go": GoClient,
|
|
"java": MavenClient,
|
|
}
|
|
|
|
|
|
def get_client(language: str, timeout: int = 30) -> RegistryClient | None:
|
|
client_class = CLIENTS.get(language)
|
|
if client_class:
|
|
return client_class(timeout)
|
|
return None
|
|
|
|
|
|
def check_outdated(
|
|
package_name: str,
|
|
current_version: str,
|
|
language: str,
|
|
timeout: int = 30,
|
|
) -> OutdatedPackage | None:
|
|
client = get_client(language, timeout)
|
|
if client is None:
|
|
return None
|
|
|
|
if language == "java":
|
|
if ":" in package_name:
|
|
group_id, artifact_id = package_name.split(":", 1)
|
|
latest_version = client.get_latest_version(group_id, artifact_id)
|
|
else:
|
|
return None
|
|
else:
|
|
latest_version = client.get_latest_version(package_name)
|
|
|
|
if latest_version is None:
|
|
return None
|
|
|
|
from depaudit.utils import compare_versions
|
|
|
|
comparison = compare_versions(current_version, latest_version)
|
|
major_available = False
|
|
minor_available = False
|
|
patch_available = False
|
|
|
|
if comparison < 0:
|
|
curr_parts = current_version.split(".")
|
|
latest_parts = latest_version.split(".")
|
|
|
|
while len(curr_parts) < 3:
|
|
curr_parts.append("0")
|
|
while len(latest_parts) < 3:
|
|
latest_parts.append("0")
|
|
|
|
if latest_parts[0] != curr_parts[0]:
|
|
major_available = True
|
|
elif latest_parts[1] != curr_parts[1]:
|
|
minor_available = True
|
|
else:
|
|
patch_available = True
|
|
|
|
return OutdatedPackage(
|
|
package_name=package_name,
|
|
current_version=current_version,
|
|
latest_version=latest_version,
|
|
language=language,
|
|
minor_available=minor_available,
|
|
patch_available=patch_available,
|
|
major_available=major_available,
|
|
)
|