Add CLI, utils, and fixers modules
This commit is contained in:
300
src/cli/__init__.py
Normal file
300
src/cli/__init__.py
Normal file
@@ -0,0 +1,300 @@
|
|||||||
|
"""CLI interface for the AI Code Refactor tool."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
import json
|
||||||
|
|
||||||
|
import click
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.table import Table
|
||||||
|
from rich.theme import Theme
|
||||||
|
|
||||||
|
from src.analyzers.base import AnalysisResult, SeverityLevel
|
||||||
|
from src.analyzers import ParserFactory
|
||||||
|
from src.rules import load_config
|
||||||
|
from src.rules.security import SQLInjectionAnalyzer, EvalUsageAnalyzer, PathTraversalAnalyzer
|
||||||
|
from src.rules.antipatterns import (
|
||||||
|
ExceptionSwallowAnalyzer,
|
||||||
|
MagicNumberAnalyzer,
|
||||||
|
DeepNestingAnalyzer,
|
||||||
|
LongFunctionAnalyzer,
|
||||||
|
)
|
||||||
|
from src.rules.secrets import HardcodedSecretAnalyzer
|
||||||
|
from src.rules.performance import InefficientLoopAnalyzer, RedundantOperationAnalyzer, UnnecessaryCopyAnalyzer
|
||||||
|
from src.fixers import FixerRegistry
|
||||||
|
from src.utils import (
|
||||||
|
get_file_language,
|
||||||
|
is_supported_file,
|
||||||
|
read_source_file,
|
||||||
|
write_source_file,
|
||||||
|
create_backup,
|
||||||
|
count_lines_of_code,
|
||||||
|
)
|
||||||
|
|
||||||
|
custom_theme = Theme({"severity.critical": "bold red", "severity.high": "red", "severity.medium": "yellow", "severity.low": "blue"})
|
||||||
|
console = Console(theme=custom_theme)
|
||||||
|
|
||||||
|
|
||||||
|
def get_analyzers():
|
||||||
|
"""Get list of all analyzers."""
|
||||||
|
return [
|
||||||
|
SQLInjectionAnalyzer(),
|
||||||
|
EvalUsageAnalyzer(),
|
||||||
|
PathTraversalAnalyzer(),
|
||||||
|
ExceptionSwallowAnalyzer(),
|
||||||
|
MagicNumberAnalyzer(),
|
||||||
|
DeepNestingAnalyzer(),
|
||||||
|
LongFunctionAnalyzer(),
|
||||||
|
HardcodedSecretAnalyzer(),
|
||||||
|
InefficientLoopAnalyzer(),
|
||||||
|
RedundantOperationAnalyzer(),
|
||||||
|
UnnecessaryCopyAnalyzer(),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_file(file_path: Path, config: Optional[dict] = None) -> AnalysisResult:
|
||||||
|
"""Analyze a single file."""
|
||||||
|
source_code = read_source_file(file_path)
|
||||||
|
if source_code is None:
|
||||||
|
return AnalysisResult(file_path=file_path, error="Could not read file")
|
||||||
|
|
||||||
|
language = get_file_language(file_path)
|
||||||
|
if language is None:
|
||||||
|
return AnalysisResult(file_path=file_path, error="Unsupported file type")
|
||||||
|
|
||||||
|
parser = ParserFactory.get_parser(language)
|
||||||
|
if parser is None:
|
||||||
|
return AnalysisResult(file_path=file_path, error=f"Parser not available for {language}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
tree = parser.parse(source_code)
|
||||||
|
except Exception as e:
|
||||||
|
return AnalysisResult(file_path=file_path, error=f"Parse error: {str(e)}")
|
||||||
|
|
||||||
|
findings = []
|
||||||
|
analyzers = get_analyzers()
|
||||||
|
|
||||||
|
for analyzer in analyzers:
|
||||||
|
if config and not _is_rule_enabled(analyzer.rule_id(), config):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
analyzer_findings = analyzer.analyze(source_code, file_path, tree)
|
||||||
|
findings.extend(analyzer_findings)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
lines_of_code = count_lines_of_code(source_code)
|
||||||
|
return AnalysisResult(file_path=file_path, findings=findings, lines_of_code=lines_of_code)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_rule_enabled(rule_id: str, config: dict) -> bool:
|
||||||
|
if "rules" not in config:
|
||||||
|
return True
|
||||||
|
rule_config = config["rules"].get(rule_id, {})
|
||||||
|
if "enabled" in rule_config:
|
||||||
|
return rule_config["enabled"]
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_path(path: Path, config: Optional[dict] = None) -> list[AnalysisResult]:
|
||||||
|
"""Analyze a file or directory."""
|
||||||
|
results = []
|
||||||
|
|
||||||
|
if path.is_file():
|
||||||
|
if is_supported_file(path):
|
||||||
|
results.append(analyze_file(path, config))
|
||||||
|
else:
|
||||||
|
for file_path in path.rglob("*"):
|
||||||
|
if file_path.is_file() and is_supported_file(file_path):
|
||||||
|
results.append(analyze_file(file_path, config))
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def print_results(results: list[AnalysisResult], json_output: bool = False):
|
||||||
|
"""Print analysis results."""
|
||||||
|
if json_output:
|
||||||
|
output = {
|
||||||
|
"files_analyzed": len(results),
|
||||||
|
"files_with_issues": sum(1 for r in results if r.has_issues()),
|
||||||
|
"results": [format_result_json(r) for r in results],
|
||||||
|
}
|
||||||
|
console.print(json.dumps(output, indent=2))
|
||||||
|
else:
|
||||||
|
for result in results:
|
||||||
|
if result.error:
|
||||||
|
console.print(f"[yellow]Warning: {result.file_path} - {result.error}[/]")
|
||||||
|
elif result.findings:
|
||||||
|
print_result_table(result)
|
||||||
|
|
||||||
|
|
||||||
|
def format_result_json(result: AnalysisResult) -> dict:
|
||||||
|
return {
|
||||||
|
"file": str(result.file_path),
|
||||||
|
"lines_of_code": result.lines_of_code,
|
||||||
|
"findings_count": len(result.findings),
|
||||||
|
"summary": result.summary(),
|
||||||
|
"findings": [f.to_dict() for f in result.findings],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def print_result_table(result: AnalysisResult):
|
||||||
|
"""Print formatted result table."""
|
||||||
|
table = Table(title=str(result.file_path))
|
||||||
|
table.add_column("Severity", width=10)
|
||||||
|
table.add_column("Line", width=6)
|
||||||
|
table.add_column("Rule", width=25)
|
||||||
|
table.add_column("Message", width=50)
|
||||||
|
|
||||||
|
severity_colors = {
|
||||||
|
SeverityLevel.CRITICAL: "severity.critical",
|
||||||
|
SeverityLevel.HIGH: "severity.high",
|
||||||
|
SeverityLevel.MEDIUM: "severity.medium",
|
||||||
|
SeverityLevel.LOW: "severity.low",
|
||||||
|
}
|
||||||
|
|
||||||
|
for finding in result.findings:
|
||||||
|
color = severity_colors.get(finding.severity, "white")
|
||||||
|
table.add_row(
|
||||||
|
f"[{color}]{finding.severity.value}[/]",
|
||||||
|
str(finding.line_number),
|
||||||
|
finding.rule_id.split(".")[-1],
|
||||||
|
finding.message,
|
||||||
|
)
|
||||||
|
|
||||||
|
console.print(table)
|
||||||
|
|
||||||
|
|
||||||
|
def print_summary(results: list[AnalysisResult]):
|
||||||
|
"""Print analysis summary."""
|
||||||
|
total_issues = sum(len(r.findings) for r in results)
|
||||||
|
critical = sum(r.critical_count() for r in results)
|
||||||
|
high = sum(r.high_count() for r in results)
|
||||||
|
medium = sum(r.medium_count() for r in results)
|
||||||
|
low = sum(r.low_count() for r in results)
|
||||||
|
files_with_issues = sum(1 for r in results if r.has_issues())
|
||||||
|
|
||||||
|
console.print(f"\n[bold]Analysis Summary[/]")
|
||||||
|
console.print(f" Files analyzed: {len(results)}")
|
||||||
|
console.print(f" Files with issues: {files_with_issues}")
|
||||||
|
console.print(f" Total issues: {total_issues}")
|
||||||
|
console.print(f" [red]Critical: {critical}[/]")
|
||||||
|
console.print(f" [red]High: {high}[/]")
|
||||||
|
console.print(f" [yellow]Medium: {medium}[/]")
|
||||||
|
console.print(f" [blue]Low: {low}[/]")
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
def cli():
|
||||||
|
"""AI Code Refactor CLI - Analyze and fix code issues."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("path", type=click.Path(exists=True, path_type=Path))
|
||||||
|
@click.option("--json", "json_output", is_flag=True, help="Output results as JSON")
|
||||||
|
@click.option("--config", type=click.Path(exists=True, path_type=Path), help="Path to config file")
|
||||||
|
@click.option("--fix", is_flag=True, help="Automatically fix detected issues")
|
||||||
|
def analyze(path: Path, json_output: bool, config: Path | None, fix: bool):
|
||||||
|
"""Analyze code for security issues, anti-patterns, and performance problems."""
|
||||||
|
cfg = load_config(config) if config else None
|
||||||
|
results = analyze_path(path, cfg)
|
||||||
|
|
||||||
|
if fix:
|
||||||
|
fixer_registry = FixerRegistry()
|
||||||
|
for result in results:
|
||||||
|
if result.findings:
|
||||||
|
source_code = read_source_file(result.file_path)
|
||||||
|
if source_code:
|
||||||
|
for finding in result.findings:
|
||||||
|
if fixer_registry.can_fix(finding):
|
||||||
|
tree = ParserFactory.get_parser(
|
||||||
|
get_file_language(result.file_path)
|
||||||
|
).parse(source_code)
|
||||||
|
source_code = fixer_registry.fix(source_code, finding, tree)
|
||||||
|
|
||||||
|
backup = create_backup(result.file_path)
|
||||||
|
if write_source_file(result.file_path, source_code):
|
||||||
|
console.print(f"[green]Fixed: {result.file_path}[/]")
|
||||||
|
if backup:
|
||||||
|
console.print(f" Backup created: {backup}")
|
||||||
|
else:
|
||||||
|
console.print(f"[red]Failed to write: {result.file_path}[/]")
|
||||||
|
|
||||||
|
print_results(results, json_output)
|
||||||
|
print_summary(results)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.argument("path", type=click.Path(exists=True, path_type=Path))
|
||||||
|
@click.option("--config", type=click.Path(exists=True, path_type=Path), help="Path to config file")
|
||||||
|
def fix(path: Path, config: Path | None):
|
||||||
|
"""Automatically fix detected issues in code."""
|
||||||
|
cfg = load_config(config) if config else None
|
||||||
|
results = analyze_path(path, cfg)
|
||||||
|
|
||||||
|
fixer_registry = FixerRegistry()
|
||||||
|
fixed_count = 0
|
||||||
|
|
||||||
|
for result in results:
|
||||||
|
if result.findings:
|
||||||
|
source_code = read_source_file(result.file_path)
|
||||||
|
if source_code:
|
||||||
|
tree = ParserFactory.get_parser(
|
||||||
|
get_file_language(result.file_path)
|
||||||
|
).parse(source_code) if get_file_language(result.file_path) else None
|
||||||
|
|
||||||
|
for finding in result.findings:
|
||||||
|
if fixer_registry.can_fix(finding) and tree:
|
||||||
|
source_code = fixer_registry.fix(source_code, finding, tree)
|
||||||
|
fixed_count += 1
|
||||||
|
|
||||||
|
if fixed_count > 0:
|
||||||
|
backup = create_backup(result.file_path)
|
||||||
|
if write_source_file(result.file_path, source_code):
|
||||||
|
console.print(f"[green]Fixed {fixed_count} issues in {result.file_path}[/]")
|
||||||
|
if backup:
|
||||||
|
console.print(f" Backup created: {backup}")
|
||||||
|
else:
|
||||||
|
console.print(f"[red]Failed to write: {result.file_path}[/]")
|
||||||
|
|
||||||
|
if fixed_count == 0:
|
||||||
|
console.print("[yellow]No fixable issues found[/]")
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
def rules():
|
||||||
|
"""List all available rules."""
|
||||||
|
table = Table(title="Available Rules")
|
||||||
|
table.add_column("Rule ID", width=30)
|
||||||
|
table.add_column("Category", width=15)
|
||||||
|
table.add_column("Severity", width=10)
|
||||||
|
|
||||||
|
for analyzer in get_analyzers():
|
||||||
|
severity_colors = {
|
||||||
|
SeverityLevel.CRITICAL: "severity.critical",
|
||||||
|
SeverityLevel.HIGH: "severity.high",
|
||||||
|
SeverityLevel.MEDIUM: "severity.medium",
|
||||||
|
SeverityLevel.LOW: "severity.low",
|
||||||
|
}
|
||||||
|
color = severity_colors.get(analyzer.severity(), "white")
|
||||||
|
table.add_row(
|
||||||
|
analyzer.rule_id(),
|
||||||
|
analyzer.category().value,
|
||||||
|
f"[{color}]{analyzer.severity().value}[/]",
|
||||||
|
)
|
||||||
|
|
||||||
|
console.print(table)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
def languages():
|
||||||
|
"""List supported languages."""
|
||||||
|
languages = ParserFactory.supported_languages()
|
||||||
|
console.print("[bold]Supported Languages[/]")
|
||||||
|
for lang in languages:
|
||||||
|
console.print(f" - {lang}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
cli()
|
||||||
Reference in New Issue
Block a user