Add source models
This commit is contained in:
197
src/cli.py
Normal file
197
src/cli.py
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
"""CLI interface for Shell Safe Validator."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from .config import load_rules
|
||||||
|
from .models import Severity
|
||||||
|
from .validators import PatternValidator, SecurityValidator, BestPracticesValidator
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
def main():
|
||||||
|
"""Shell Safe Validator - Validate shell scripts for safety issues."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def validate_content(
|
||||||
|
content: str,
|
||||||
|
config_path: Optional[str] = None,
|
||||||
|
min_severity: str = "low",
|
||||||
|
output_format: str = "text",
|
||||||
|
) -> List[dict]:
|
||||||
|
"""Validate shell content and return findings."""
|
||||||
|
min_severity_level = Severity.from_string(min_severity)
|
||||||
|
lines = content.split("\n")
|
||||||
|
rules = load_rules(config_path)
|
||||||
|
pattern_validator = PatternValidator()
|
||||||
|
security_validator = SecurityValidator()
|
||||||
|
best_practices_validator = BestPracticesValidator(rules)
|
||||||
|
all_findings = []
|
||||||
|
for line_number, line in enumerate(lines, start=1):
|
||||||
|
for validator in [pattern_validator, security_validator, best_practices_validator]:
|
||||||
|
findings = validator.validate(line, line_number)
|
||||||
|
for finding in findings:
|
||||||
|
severity_level = Severity.from_string(finding.severity)
|
||||||
|
if severity_level >= min_severity_level:
|
||||||
|
all_findings.append(finding.to_dict())
|
||||||
|
return all_findings
|
||||||
|
|
||||||
|
|
||||||
|
def print_findings(findings: List[dict], output_format: str = "text") -> None:
|
||||||
|
"""Print findings in the specified format."""
|
||||||
|
if output_format == "json":
|
||||||
|
click.echo(json.dumps(findings, indent=2))
|
||||||
|
elif output_format == "compact":
|
||||||
|
for finding in findings:
|
||||||
|
severity = finding["severity"].upper()
|
||||||
|
line_info = f" Line {finding['line_number']}:" if finding["line_number"] else ""
|
||||||
|
click.echo(f"[{severity}]{line_info} {finding['message']}")
|
||||||
|
else:
|
||||||
|
for finding in findings:
|
||||||
|
severity = finding["severity"].upper()
|
||||||
|
line_info = f"Line {finding['line_number']}:" if finding["line_number"] else ""
|
||||||
|
symbol = "!" if finding["severity"] in ["critical", "high"] else "?"
|
||||||
|
click.echo(f"\n[{severity}] {symbol} {line_info}")
|
||||||
|
click.echo(f" Command: {finding.get('context', 'N/A')}")
|
||||||
|
click.echo(f" Matched: {finding['matched_text']}")
|
||||||
|
click.echo(f" {finding['message']}")
|
||||||
|
if finding["suggestion"]:
|
||||||
|
click.echo(f" Suggestion: {finding['suggestion']}")
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument("file", type=click.Path(exists=True))
|
||||||
|
@click.option("--config", "-c", help="Path to custom rules YAML file")
|
||||||
|
@click.option(
|
||||||
|
"--severity",
|
||||||
|
"-s",
|
||||||
|
default="low",
|
||||||
|
help="Minimum severity level (low, medium, high, critical)",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--output-format",
|
||||||
|
"-o",
|
||||||
|
type=click.Choice(["text", "json", "compact"]),
|
||||||
|
default="text",
|
||||||
|
help="Output format",
|
||||||
|
)
|
||||||
|
def validate(file: str, config: Optional[str], severity: str, output_format: str):
|
||||||
|
"""Validate a shell script file for safety issues."""
|
||||||
|
try:
|
||||||
|
file_path = Path(file)
|
||||||
|
with open(file_path, "r") as f:
|
||||||
|
content = f.read()
|
||||||
|
click.echo(f"Validating {file}...\n")
|
||||||
|
findings = validate_content(content, config, severity, output_format)
|
||||||
|
if findings:
|
||||||
|
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
|
||||||
|
for finding in findings:
|
||||||
|
severity_counts[finding["severity"]] += 1
|
||||||
|
print_findings(findings, output_format)
|
||||||
|
click.echo(
|
||||||
|
f"\nValidation complete: {len(findings)} issue(s) found "
|
||||||
|
f"({severity_counts['critical']} critical, "
|
||||||
|
f"{severity_counts['high']} high, "
|
||||||
|
f"{severity_counts['medium']} medium, "
|
||||||
|
f"{severity_counts['low']} low)"
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
click.echo("No issues found. Script looks safe!")
|
||||||
|
sys.exit(0)
|
||||||
|
except FileNotFoundError:
|
||||||
|
click.echo(f"Error: File not found: {file}", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
except PermissionError:
|
||||||
|
click.echo(f"Error: Permission denied: {file}", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument("command", nargs=-1)
|
||||||
|
@click.option(
|
||||||
|
"--severity",
|
||||||
|
"-s",
|
||||||
|
default="low",
|
||||||
|
help="Minimum severity level (low, medium, high, critical)",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--output-format",
|
||||||
|
"-o",
|
||||||
|
type=click.Choice(["text", "json", "compact"]),
|
||||||
|
default="text",
|
||||||
|
help="Output format",
|
||||||
|
)
|
||||||
|
def check(command: str, severity: str, output_format: str):
|
||||||
|
"""Check a single shell command for safety issues."""
|
||||||
|
if not command:
|
||||||
|
click.echo("Error: No command provided", err=True)
|
||||||
|
sys.exit(2)
|
||||||
|
command_str = " ".join(command)
|
||||||
|
findings = validate_content(command_str, None, severity, output_format)
|
||||||
|
if findings:
|
||||||
|
print_findings(findings, output_format)
|
||||||
|
click.echo(f"\nFound {len(findings)} issue(s)")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
click.echo("No issues found. Command looks safe!")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument("command", nargs=-1)
|
||||||
|
@click.option("--dry-run", is_flag=True, help="Show what would be executed without running")
|
||||||
|
@click.option("--force", is_flag=True, help="Skip confirmation prompt")
|
||||||
|
def safe_exec(command: str, dry_run: bool, force: bool):
|
||||||
|
"""Execute a command safely with validation and confirmation."""
|
||||||
|
if not command:
|
||||||
|
click.echo("Error: No command provided", err=True)
|
||||||
|
sys.exit(2)
|
||||||
|
command_str = " ".join(command)
|
||||||
|
findings = validate_content(command_str, None, "low", "text")
|
||||||
|
critical_high_findings = [
|
||||||
|
f for f in findings if f["severity"] in ["critical", "high"]
|
||||||
|
]
|
||||||
|
if dry_run:
|
||||||
|
click.echo(f"[DRY RUN] Would execute: {command_str}")
|
||||||
|
if findings:
|
||||||
|
click.echo(f" Issues found: {len(findings)}")
|
||||||
|
for f in findings[:5]:
|
||||||
|
click.echo(f" - [{f['severity'].upper()}] {f['message']}")
|
||||||
|
sys.exit(0)
|
||||||
|
if critical_high_findings:
|
||||||
|
click.echo("ERROR: Command contains critical/high severity issues:", err=True)
|
||||||
|
for f in critical_high_findings:
|
||||||
|
click.echo(f" [{f['severity'].upper()}] {f['message']}", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
if not force:
|
||||||
|
if findings:
|
||||||
|
click.echo(f"Warning: Command has {len(findings)} issue(s)")
|
||||||
|
for f in findings[:3]:
|
||||||
|
click.echo(f" [{f['severity'].upper()}] {f['message']}")
|
||||||
|
if len(findings) > 3:
|
||||||
|
click.echo(f" ... and {len(findings) - 3} more")
|
||||||
|
confirm = click.confirm(f"\nExecute: {command_str}")
|
||||||
|
if not confirm:
|
||||||
|
click.echo("Cancelled.")
|
||||||
|
sys.exit(0)
|
||||||
|
try:
|
||||||
|
import subprocess
|
||||||
|
result = subprocess.run(command_str, shell=True, capture_output=True, text=True)
|
||||||
|
if result.stdout:
|
||||||
|
click.echo(result.stdout)
|
||||||
|
if result.stderr:
|
||||||
|
click.echo(result.stderr, err=True)
|
||||||
|
sys.exit(result.returncode)
|
||||||
|
except Exception as e:
|
||||||
|
click.echo(f"Error executing command: {e}", err=True)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user