Add core CLI and configuration modules
This commit is contained in:
168
depaudit/fix.py
Normal file
168
depaudit/fix.py
Normal file
@@ -0,0 +1,168 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from depaudit.checks import Vulnerability, OutdatedPackage
|
||||
|
||||
|
||||
@dataclass
|
||||
class FixSuggestion:
|
||||
package_name: str
|
||||
current_version: str
|
||||
suggested_version: str
|
||||
command: str
|
||||
description: str
|
||||
risk_level: str
|
||||
|
||||
|
||||
class FixStrategy:
|
||||
def generate_fix(self, issue) -> FixSuggestion:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class NPMFixStrategy(FixStrategy):
|
||||
def generate_fix(self, issue: OutdatedPackage) -> FixSuggestion:
|
||||
command = f"npm install {issue.package_name}@{issue.latest_version}"
|
||||
if issue.major_available:
|
||||
risk = "high"
|
||||
elif issue.minor_available:
|
||||
risk = "medium"
|
||||
else:
|
||||
risk = "low"
|
||||
|
||||
return FixSuggestion(
|
||||
package_name=issue.package_name,
|
||||
current_version=issue.current_version,
|
||||
suggested_version=issue.latest_version,
|
||||
command=command,
|
||||
description=f"Update {issue.package_name} from {issue.current_version} to {issue.latest_version}",
|
||||
risk_level=risk,
|
||||
)
|
||||
|
||||
def apply_fix(self, suggestion: FixSuggestion, preview: bool = False) -> bool:
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run: {suggestion.command}")
|
||||
return True
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
suggestion.command.split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class PipFixStrategy(FixStrategy):
|
||||
def generate_fix(self, issue: OutdatedPackage) -> FixSuggestion:
|
||||
command = f"pip install --upgrade {issue.package_name}=={issue.latest_version}"
|
||||
risk = "medium"
|
||||
|
||||
return FixSuggestion(
|
||||
package_name=issue.package_name,
|
||||
current_version=issue.current_version,
|
||||
suggested_version=issue.latest_version,
|
||||
command=command,
|
||||
description=f"Update {issue.package_name} from {issue.current_version} to {issue.latest_version}",
|
||||
risk_level=risk,
|
||||
)
|
||||
|
||||
def apply_fix(self, suggestion: FixSuggestion, preview: bool = False) -> bool:
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run: {suggestion.command}")
|
||||
return True
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pip", "install", "--upgrade", f"{suggestion.package_name}=={suggestion.suggested_version}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class CargoFixStrategy(FixStrategy):
|
||||
def generate_fix(self, issue: OutdatedPackage) -> FixSuggestion:
|
||||
command = f"cargo update -p {issue.package_name}"
|
||||
risk = "medium"
|
||||
|
||||
return FixSuggestion(
|
||||
package_name=issue.package_name,
|
||||
current_version=issue.current_version,
|
||||
suggested_version=issue.latest_version,
|
||||
command=command,
|
||||
description=f"Update {issue.package_name} from {issue.current_version} to {issue.latest_version}",
|
||||
risk_level=risk,
|
||||
)
|
||||
|
||||
def apply_fix(self, suggestion: FixSuggestion, preview: bool = False) -> bool:
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run: {suggestion.command}")
|
||||
return True
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["cargo", "update", "-p", suggestion.package_name],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class GoFixStrategy(FixStrategy):
|
||||
def generate_fix(self, issue: OutdatedPackage) -> FixSuggestion:
|
||||
command = f"go get {issue.package_name}@{issue.latest_version}"
|
||||
risk = "high"
|
||||
|
||||
return FixSuggestion(
|
||||
package_name=issue.package_name,
|
||||
current_version=issue.current_version,
|
||||
suggested_version=issue.latest_version,
|
||||
command=command,
|
||||
description=f"Update {issue.package_name} from {issue.current_version} to {issue.latest_version}",
|
||||
risk_level=risk,
|
||||
)
|
||||
|
||||
def apply_fix(self, suggestion: FixSuggestion, preview: bool = False) -> bool:
|
||||
if preview:
|
||||
print(f"[PREVIEW] Would run: {suggestion.command}")
|
||||
return True
|
||||
|
||||
try:
|
||||
os.chdir(Path.cwd())
|
||||
result = subprocess.run(
|
||||
["go", "get", f"{suggestion.package_name}@{suggestion.suggested_version}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
return result.returncode == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
STRATEGIES = {
|
||||
"javascript": NPMFixStrategy,
|
||||
"python": PipFixStrategy,
|
||||
"rust": CargoFixStrategy,
|
||||
"go": GoFixStrategy,
|
||||
}
|
||||
|
||||
|
||||
def get_strategy(language: str) -> FixStrategy | None:
|
||||
strategy_class = STRATEGIES.get(language)
|
||||
if strategy_class:
|
||||
return strategy_class()
|
||||
return None
|
||||
Reference in New Issue
Block a user