Add core CLI and configuration modules
This commit is contained in:
299
depaudit/cli.py
Normal file
299
depaudit/cli.py
Normal file
@@ -0,0 +1,299 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
import requests
|
||||
|
||||
from depaudit import __version__
|
||||
from depaudit.checks.outdated import OutdatedPackage
|
||||
from depaudit.checks.licenses import LicenseInfo
|
||||
from depaudit.checks.unused import UnusedDependency
|
||||
from depaudit.checks.vulnerabilities import Vulnerability
|
||||
from depaudit.checks.outdated import check_outdated
|
||||
from depaudit.checks.licenses import check_license, validate_license_compliance
|
||||
from depaudit.checks.unused import check_unused_dependencies
|
||||
from depaudit.config import config
|
||||
from depaudit.output import AuditResult
|
||||
from depaudit.output.factory import FormatterFactory
|
||||
from depaudit.parsers import ParsedManifest
|
||||
from depaudit.parsers.factory import ParserFactory
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.version_option(version=__version__)
|
||||
@click.option(
|
||||
"--config",
|
||||
type=click.Path(exists=True),
|
||||
help="Path to configuration file",
|
||||
)
|
||||
@click.option(
|
||||
"--no-color",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Disable colored output",
|
||||
)
|
||||
@click.option(
|
||||
"--verbose",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Enable verbose output",
|
||||
)
|
||||
@click.pass_context
|
||||
def main(ctx: click.Context, config: str | None, no_color: bool, verbose: bool) -> None:
|
||||
ctx.ensure_object(dict)
|
||||
if config:
|
||||
from depaudit.config import Config
|
||||
ctx.obj["config"] = Config(Path(config))
|
||||
else:
|
||||
ctx.obj["config"] = config
|
||||
ctx.obj["no_color"] = no_color
|
||||
ctx.obj["verbose"] = verbose
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument(
|
||||
"path",
|
||||
type=click.Path(exists=True, file_okay=True, dir_okay=True),
|
||||
default=".",
|
||||
)
|
||||
@click.option(
|
||||
"--format",
|
||||
type=click.Choice(["table", "json", "quiet"]),
|
||||
default=None,
|
||||
help="Output format",
|
||||
)
|
||||
@click.option(
|
||||
"--output",
|
||||
type=click.Path(),
|
||||
help="Output file path",
|
||||
)
|
||||
@click.option(
|
||||
"--no-cache",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Disable caching",
|
||||
)
|
||||
@click.option(
|
||||
"--severity",
|
||||
type=click.Choice(["critical", "high", "medium", "low", "all"]),
|
||||
default="all",
|
||||
help="Minimum severity level to report",
|
||||
)
|
||||
@click.option(
|
||||
"--skip-vulnerabilities",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Skip vulnerability scanning",
|
||||
)
|
||||
@click.option(
|
||||
"--skip-outdated",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Skip outdated package checking",
|
||||
)
|
||||
@click.option(
|
||||
"--skip-licenses",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Skip license compliance checking",
|
||||
)
|
||||
@click.option(
|
||||
"--skip-unused",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Skip unused dependency checking",
|
||||
)
|
||||
@click.pass_context
|
||||
def audit(
|
||||
ctx: click.Context,
|
||||
path: str,
|
||||
format: str,
|
||||
output: str | None,
|
||||
no_cache: bool,
|
||||
severity: str,
|
||||
skip_vulnerabilities: bool,
|
||||
skip_outdated: bool,
|
||||
skip_licenses: bool,
|
||||
skip_unused: bool,
|
||||
) -> None:
|
||||
start_time = time.time()
|
||||
|
||||
ctx_obj = ctx.obj if ctx.obj else {}
|
||||
cfg = ctx_obj.get("config") if ctx_obj else None
|
||||
if cfg is None:
|
||||
cfg = config
|
||||
use_color = not ctx_obj.get("no_color", False) and sys.stdout.isatty()
|
||||
verbosity = "debug" if ctx_obj.get("verbose", False) else "info"
|
||||
|
||||
target_path = Path(path)
|
||||
if target_path.is_file():
|
||||
paths_to_scan = [target_path]
|
||||
else:
|
||||
paths_to_scan = list(target_path.rglob("*"))
|
||||
paths_to_scan = [p for p in paths_to_scan if p.is_file()]
|
||||
|
||||
output_format = format or cfg.output_format
|
||||
|
||||
result = AuditResult()
|
||||
|
||||
for file_path in paths_to_scan:
|
||||
manifest = ParserFactory.parse(file_path)
|
||||
if manifest is None:
|
||||
continue
|
||||
|
||||
result.scanned_files.append(str(file_path))
|
||||
result.scanned_count += 1
|
||||
|
||||
if not skip_vulnerabilities and cfg.vulnerabilities_enabled:
|
||||
for dep in manifest.dependencies:
|
||||
vuln = check_vulnerability(dep, severity)
|
||||
if vuln:
|
||||
result.vulnerabilities.append(vuln.to_dict())
|
||||
|
||||
if not skip_outdated and cfg.outdated_enabled:
|
||||
for dep in manifest.dependencies:
|
||||
outdated = check_outdated(
|
||||
dep.name,
|
||||
dep.version,
|
||||
dep.language,
|
||||
cfg.network_timeout,
|
||||
)
|
||||
if outdated:
|
||||
result.outdated.append(outdated.to_dict())
|
||||
|
||||
if not skip_licenses and cfg.licenses_enabled:
|
||||
for dep in manifest.dependencies:
|
||||
license_info = check_license(dep.name, dep.license, source=str(file_path))
|
||||
is_compliant, message = validate_license_compliance(
|
||||
license_info,
|
||||
cfg.license_allowlist,
|
||||
cfg.license_blocklist,
|
||||
)
|
||||
if not is_compliant:
|
||||
result.license_issues.append({
|
||||
"package_name": dep.name,
|
||||
"license": license_info.license_type,
|
||||
"status": message,
|
||||
"file": str(file_path),
|
||||
})
|
||||
|
||||
if not skip_unused and cfg.unused_enabled:
|
||||
for file_path in paths_to_scan:
|
||||
manifest = ParserFactory.parse(file_path)
|
||||
if manifest:
|
||||
deps = [(d.name, d.version) for d in manifest.dependencies]
|
||||
unused = check_unused_dependencies(
|
||||
deps,
|
||||
file_path.parent,
|
||||
manifest.language,
|
||||
)
|
||||
for u in unused:
|
||||
result.unused.append(u.to_dict())
|
||||
|
||||
result.scan_duration = time.time() - start_time
|
||||
|
||||
formatter = FormatterFactory.get_formatter(
|
||||
output_format,
|
||||
use_color=use_color,
|
||||
verbosity=verbosity,
|
||||
)
|
||||
|
||||
output_text = formatter.format(result)
|
||||
|
||||
if output:
|
||||
with open(output, "w") as f:
|
||||
f.write(output_text)
|
||||
else:
|
||||
click.echo(output_text)
|
||||
|
||||
exit_code = calculate_exit_code(result, cfg)
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
def check_vulnerability(dep, severity_filter: str) -> Vulnerability | None:
|
||||
try:
|
||||
url = f"https://api.osv.dev/v1/query"
|
||||
payload = {
|
||||
"package": {"name": dep.name},
|
||||
"version": dep.version,
|
||||
}
|
||||
response = requests.post(url, json=payload, timeout=30)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
vulnerabilities = data.get("vulns", [])
|
||||
if vulnerabilities:
|
||||
vuln_data = vulnerabilities[0]
|
||||
vuln = Vulnerability.from_osv(vuln_data, dep.name, dep.version)
|
||||
if severity_filter == "all" or severity_order(vuln.severity) >= severity_order(severity_filter):
|
||||
return vuln
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def severity_order(severity: str) -> int:
|
||||
order = {"critical": 4, "high": 3, "medium": 2, "low": 1, "unknown": 0}
|
||||
return order.get(severity, 0)
|
||||
|
||||
|
||||
def calculate_exit_code(result: AuditResult, cfg) -> int:
|
||||
severity_filter = cfg.cicd_fail_on
|
||||
for vuln in result.vulnerabilities:
|
||||
if vuln.get("severity") in severity_filter:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("provider", type=click.Choice(["github", "gitlab"]))
|
||||
@click.argument("output_dir", type=click.Path(exists=True), default=".")
|
||||
@click.option("--schedule", help="Cron schedule for scans", default="0 0 * * 0")
|
||||
@click.option("--fail-critical/--no-fail-critical", default=True)
|
||||
@click.option("--fail-high/--no-fail-high", default=True)
|
||||
@click.pass_context
|
||||
def generate_cicd(
|
||||
ctx: click.Context,
|
||||
provider: str,
|
||||
output_dir: str,
|
||||
schedule: str,
|
||||
fail_critical: bool,
|
||||
fail_high: bool,
|
||||
) -> None:
|
||||
from depaudit.cicd import generate_cicd_config, CICDConfig
|
||||
|
||||
cfg = CICDConfig(
|
||||
provider=provider,
|
||||
schedule=schedule,
|
||||
fail_on_critical=fail_critical,
|
||||
fail_on_high=fail_high,
|
||||
)
|
||||
|
||||
output_path = generate_cicd_config(provider, Path(output_dir), cfg)
|
||||
click.echo(f"Generated CI/CD configuration at: {output_path}")
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("file_path", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def parse(ctx: click.Context, file_path: str) -> None:
|
||||
manifest = ParserFactory.parse(Path(file_path))
|
||||
if manifest is None:
|
||||
click.echo("No parser found for this file type")
|
||||
return
|
||||
|
||||
click.echo(f"Language: {manifest.language}")
|
||||
click.echo(f"Project: {manifest.project_name or 'Unknown'}")
|
||||
click.echo(f"Version: {manifest.project_version or 'Unknown'}")
|
||||
click.echo(f"Dependencies: {len(manifest.dependencies)}")
|
||||
|
||||
for dep in manifest.dependencies:
|
||||
click.echo(f" - {dep.name}@{dep.version}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user