Add checks modules (vulnerabilities, outdated)
This commit is contained in:
200
depaudit/checks/outdated.py
Normal file
200
depaudit/checks/outdated.py
Normal file
@@ -0,0 +1,200 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
from depaudit.checks import OutdatedPackage
|
||||
|
||||
|
||||
@dataclass
|
||||
class RegistryClient:
|
||||
base_url: str
|
||||
timeout: int = 30
|
||||
|
||||
def get_latest_version(self, package_name: str) -> str | None:
|
||||
raise NotImplementedError
|
||||
|
||||
def parse_version(self, version_data: Any) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class NPMClient(RegistryClient):
|
||||
def __init__(self, timeout: int = 30):
|
||||
super().__init__("https://registry.npmjs.org", timeout)
|
||||
|
||||
def get_latest_version(self, package_name: str) -> str | None:
|
||||
try:
|
||||
url = f"{self.base_url}/{package_name}/latest"
|
||||
response = requests.get(url, timeout=self.timeout)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return data.get("version")
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def parse_version(self, version_data: Any) -> str:
|
||||
if isinstance(version_data, dict):
|
||||
return version_data.get("version", "")
|
||||
return str(version_data)
|
||||
|
||||
|
||||
class PyPIClient(RegistryClient):
|
||||
def __init__(self, timeout: int = 30):
|
||||
super().__init__("https://pypi.org/pypi", timeout)
|
||||
|
||||
def get_latest_version(self, package_name: str) -> str | None:
|
||||
try:
|
||||
url = f"{self.base_url}/{package_name}/json"
|
||||
response = requests.get(url, timeout=self.timeout)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
return data.get("info", {}).get("version")
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def parse_version(self, version_data: Any) -> str:
|
||||
return str(version_data)
|
||||
|
||||
|
||||
class CratesClient(RegistryClient):
|
||||
def __init__(self, timeout: int = 30):
|
||||
super().__init__("https://crates.io/api/v1", timeout)
|
||||
|
||||
def get_latest_version(self, package_name: str) -> str | None:
|
||||
try:
|
||||
url = f"{self.base_url}/crates/{package_name}"
|
||||
response = requests.get(url, timeout=self.timeout)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
versions = data.get("versions", [])
|
||||
if versions:
|
||||
return versions[0].get("num")
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def parse_version(self, version_data: Any) -> str:
|
||||
return str(version_data)
|
||||
|
||||
|
||||
class GoClient(RegistryClient):
|
||||
def __init__(self, timeout: int = 30):
|
||||
super().__init__("https://pkg.go.dev", timeout)
|
||||
|
||||
def get_latest_version(self, package_name: str) -> str | None:
|
||||
try:
|
||||
url = f"{self.base_url}/info?module={package_name}"
|
||||
response = requests.get(url, timeout=self.timeout)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
version = data.get("Version")
|
||||
if version:
|
||||
match = re.match(r"v?([\d.]+)", version)
|
||||
if match:
|
||||
return match.group(0)
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def parse_version(self, version_data: Any) -> str:
|
||||
return str(version_data)
|
||||
|
||||
|
||||
class MavenClient(RegistryClient):
|
||||
def __init__(self, timeout: int = 30):
|
||||
super().__init__("https://repo1.maven.org/maven2", timeout)
|
||||
|
||||
def get_latest_version(self, group_id: str, artifact_id: str) -> str | None:
|
||||
try:
|
||||
group_path = group_id.replace(".", "/")
|
||||
metadata_url = f"{self.base_url}/{group_path}/{artifact_id}/maven-metadata.xml"
|
||||
response = requests.get(metadata_url, timeout=self.timeout)
|
||||
if response.status_code == 200:
|
||||
content = response.text
|
||||
match = re.search(r"<release>([^<]+)</release>", content)
|
||||
if match:
|
||||
return match.group(1)
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def parse_version(self, version_data: Any) -> str:
|
||||
return str(version_data)
|
||||
|
||||
|
||||
CLIENTS = {
|
||||
"javascript": NPMClient,
|
||||
"python": PyPIClient,
|
||||
"rust": CratesClient,
|
||||
"go": GoClient,
|
||||
"java": MavenClient,
|
||||
}
|
||||
|
||||
|
||||
def get_client(language: str, timeout: int = 30) -> RegistryClient | None:
|
||||
client_class = CLIENTS.get(language)
|
||||
if client_class:
|
||||
return client_class(timeout)
|
||||
return None
|
||||
|
||||
|
||||
def check_outdated(
|
||||
package_name: str,
|
||||
current_version: str,
|
||||
language: str,
|
||||
timeout: int = 30,
|
||||
) -> OutdatedPackage | None:
|
||||
client = get_client(language, timeout)
|
||||
if client is None:
|
||||
return None
|
||||
|
||||
if language == "java":
|
||||
if ":" in package_name:
|
||||
group_id, artifact_id = package_name.split(":", 1)
|
||||
latest_version = client.get_latest_version(group_id, artifact_id)
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
latest_version = client.get_latest_version(package_name)
|
||||
|
||||
if latest_version is None:
|
||||
return None
|
||||
|
||||
from depaudit.utils import compare_versions
|
||||
|
||||
comparison = compare_versions(current_version, latest_version)
|
||||
major_available = False
|
||||
minor_available = False
|
||||
patch_available = False
|
||||
|
||||
if comparison < 0:
|
||||
curr_parts = current_version.split(".")
|
||||
latest_parts = latest_version.split(".")
|
||||
|
||||
while len(curr_parts) < 3:
|
||||
curr_parts.append("0")
|
||||
while len(latest_parts) < 3:
|
||||
latest_parts.append("0")
|
||||
|
||||
if latest_parts[0] != curr_parts[0]:
|
||||
major_available = True
|
||||
elif latest_parts[1] != curr_parts[1]:
|
||||
minor_available = True
|
||||
else:
|
||||
patch_available = True
|
||||
|
||||
return OutdatedPackage(
|
||||
package_name=package_name,
|
||||
current_version=current_version,
|
||||
latest_version=latest_version,
|
||||
language=language,
|
||||
minor_available=minor_available,
|
||||
patch_available=patch_available,
|
||||
major_available=major_available,
|
||||
)
|
||||
Reference in New Issue
Block a user