Fix mypy type errors in source code
Some checks failed
CI/CD / lint-and-test (push) Has been cancelled
Some checks failed
CI/CD / lint-and-test (push) Has been cancelled
This commit is contained in:
337
app/local-ai-commit-reviewer/src/cli/cli.py
Normal file
337
app/local-ai-commit-reviewer/src/cli/cli.py
Normal file
@@ -0,0 +1,337 @@
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
|
||||
import click
|
||||
from rich import print as rprint
|
||||
|
||||
from ..config import Config, get_config
|
||||
from ..core import ReviewEngine, ReviewResult
|
||||
from ..formatters import get_formatter
|
||||
from ..git import FileChange, GitRepo, get_staged_changes
|
||||
from ..git import install_hook as git_install_hook
|
||||
from ..llm import OllamaProvider
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--config", "-c", type=click.Path(exists=True), help="Path to config file")
|
||||
@click.option("--endpoint", help="LLM endpoint URL", default=None)
|
||||
@click.option("--model", "-m", help="Model name to use", default=None)
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context, config: str | None, endpoint: str | None, model: str | None):
|
||||
ctx.ensure_object(dict)
|
||||
cfg_path = config or os.environ.get("AICR_CONFIG_PATH")
|
||||
cfg = get_config(cfg_path)
|
||||
|
||||
if endpoint:
|
||||
cfg.llm.endpoint = endpoint
|
||||
if model:
|
||||
cfg.llm.model = model
|
||||
|
||||
ctx.obj["config"] = cfg
|
||||
ctx.obj["repo_path"] = Path.cwd()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--strictness", "-s", type=click.Choice(["permissive", "balanced", "strict"]), default=None)
|
||||
@click.option("--output", "-o", type=click.Choice(["terminal", "json", "markdown"]), default="terminal")
|
||||
@click.option("--commit", "-C", help="Review a specific commit SHA", default=None)
|
||||
@click.option("--hook", is_flag=True, help="Run in hook mode (exit non-zero on critical)")
|
||||
@click.option("--file", "-f", multiple=True, help="Files to review (default: all staged)")
|
||||
@click.pass_context
|
||||
def review( # noqa: PLR0913
|
||||
ctx: click.Context,
|
||||
strictness: str | None,
|
||||
output: str,
|
||||
commit: str | None,
|
||||
hook: bool,
|
||||
file: tuple
|
||||
):
|
||||
cfg: Config = ctx.obj["config"]
|
||||
|
||||
if strictness is None:
|
||||
strictness = cfg.review.strictness
|
||||
|
||||
try:
|
||||
engine = ReviewEngine(config=cfg)
|
||||
engine.set_repo(ctx.obj["repo_path"])
|
||||
|
||||
if commit:
|
||||
result = engine.review_commit(commit, strictness=strictness)
|
||||
else:
|
||||
files = _get_files_to_review(ctx.obj["repo_path"], file)
|
||||
|
||||
if not files:
|
||||
rprint("[yellow]No staged changes found. Stage files with 'git add <files>' first.[/yellow]")
|
||||
if hook:
|
||||
sys.exit(0)
|
||||
return
|
||||
|
||||
result = engine.review_staged_changes(files, strictness=strictness)
|
||||
|
||||
formatter = get_formatter(output)
|
||||
output_text = formatter.format(result)
|
||||
rprint(output_text)
|
||||
|
||||
if output == "json":
|
||||
ctx.obj["result_json"] = result.to_json()
|
||||
elif output == "markdown":
|
||||
ctx.obj["result_markdown"] = result.to_markdown()
|
||||
|
||||
_handle_hook_exit(result, hook, cfg)
|
||||
|
||||
except Exception as e:
|
||||
rprint(f"[red]Error during review: {e}[/red]")
|
||||
if hook:
|
||||
sys.exit(1)
|
||||
raise
|
||||
|
||||
|
||||
def _get_files_to_review(repo_path: Path, file: tuple) -> list[FileChange]:
|
||||
if file:
|
||||
changes = []
|
||||
for filename in file:
|
||||
repo = GitRepo(repo_path)
|
||||
diff = repo.get_staged_diff(filename)
|
||||
if diff:
|
||||
changes.append(FileChange(
|
||||
filename=filename,
|
||||
status="M",
|
||||
diff=diff
|
||||
))
|
||||
return changes
|
||||
return get_staged_changes(repo_path)
|
||||
|
||||
|
||||
def _handle_hook_exit(result: ReviewResult, hook: bool, cfg: Config) -> None:
|
||||
if not hook:
|
||||
return
|
||||
if result.has_critical_issues() and cfg.hooks.fail_on_critical:
|
||||
rprint("\n[red]Critical issues found. Commit blocked.[/red]")
|
||||
sys.exit(1)
|
||||
if not result.has_issues():
|
||||
rprint("[green]No issues found. Proceeding with commit.[/green]")
|
||||
sys.exit(0)
|
||||
if not cfg.hooks.fail_on_critical:
|
||||
rprint("\n[yellow]Issues found but not blocking commit (fail_on_critical=false).[/yellow]")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--local", is_flag=True, help="Install hook locally (in current repo)")
|
||||
@click.option("--global", "global_", is_flag=True, help="Install hook globally")
|
||||
@click.option("--force", is_flag=True, help="Overwrite existing hook")
|
||||
@click.pass_context
|
||||
def hook(ctx: click.Context, local: bool, global_: bool, force: bool):
|
||||
ctx.ensure_object(dict)
|
||||
|
||||
if not local and not global_:
|
||||
local = True
|
||||
|
||||
if global_:
|
||||
home = Path.home()
|
||||
git_template = home / ".git-template" / "hooks"
|
||||
if not git_template.exists():
|
||||
rprint("[yellow]Git template directory not found. Creating...[/yellow]")
|
||||
git_template.mkdir(parents=True, exist_ok=True)
|
||||
(git_template / "pre-commit").write_text(_get_hook_script())
|
||||
rprint(f"[green]Global hook template created at {git_template}[/green]")
|
||||
rprint("[yellow]Note: New repos will use this template. Existing repos need local install.[/yellow]")
|
||||
else:
|
||||
rprint("[green]Global hook template already exists.[/green]")
|
||||
else:
|
||||
repo_path = ctx.obj["repo_path"]
|
||||
git_hooks = repo_path / ".git" / "hooks"
|
||||
hook_path = git_hooks / "pre-commit"
|
||||
|
||||
if hook_path.exists() and not force:
|
||||
rprint(f"[yellow]Hook already exists at {hook_path}. Use --force to overwrite.[/yellow]")
|
||||
return
|
||||
|
||||
if git_install_hook(repo_path, "pre-commit", _get_hook_script()):
|
||||
rprint(f"[green]Pre-commit hook installed at {hook_path}[/green]")
|
||||
else:
|
||||
rprint("[red]Failed to install hook.[/red]")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _get_hook_script() -> str:
|
||||
return """#!/bin/bash
|
||||
# Local AI Commit Reviewer - Pre-commit Hook
|
||||
# Automatically reviews staged changes before committing
|
||||
|
||||
set -e
|
||||
|
||||
# Allow bypass with --no-verify
|
||||
if [ "$1" = "--no-verify" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Get the directory where this script is located
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
# Run the AI commit reviewer
|
||||
cd "$SCRIPT_DIR/../.."
|
||||
python -m aicr review --hook --strictness balanced || exit 1
|
||||
"""
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--set", "set_opt", nargs=2, multiple=True, help="Set config option (key value)")
|
||||
@click.option("--get", help="Get config option value", default=None)
|
||||
@click.option("--list", is_flag=True, help="List all config options")
|
||||
@click.option("--path", is_flag=True, help="Show config file path")
|
||||
@click.pass_context
|
||||
def config(ctx: click.Context, set_opt: tuple, get: str | None, list_: bool, path: bool):
|
||||
cfg: Config = ctx.obj["config"]
|
||||
|
||||
if path:
|
||||
config_path = os.environ.get("AICR_CONFIG_PATH") or str(Path.cwd() / ".aicr.yaml")
|
||||
rprint(f"Config path: {config_path}")
|
||||
return
|
||||
|
||||
if get:
|
||||
value = _get_nested_attr(cfg, get)
|
||||
if value is not None:
|
||||
rprint(f"{get}: {value}")
|
||||
else:
|
||||
rprint(f"[red]Unknown config option: {get}[/red]")
|
||||
return
|
||||
|
||||
if list_:
|
||||
for section in ["llm", "review", "languages", "hooks", "output", "logging"]:
|
||||
section_obj = getattr(cfg, section, None)
|
||||
if section_obj:
|
||||
rprint(f"[bold]{section.upper()}[/bold]")
|
||||
for key, value in section_obj.model_dump().items():
|
||||
rprint(f" {key}: {value}")
|
||||
return
|
||||
|
||||
if set_opt:
|
||||
for key, value in set_opt:
|
||||
_set_nested_attr(cfg, key, value)
|
||||
rprint("[green]Configuration updated.[/green]")
|
||||
return
|
||||
|
||||
rprint("[bold]Local AI Commit Reviewer Configuration[/bold]")
|
||||
rprint(f"LLM Endpoint: {cfg.llm.endpoint}")
|
||||
rprint(f"Model: {cfg.llm.model}")
|
||||
rprint(f"Strictness: {cfg.review.strictness}")
|
||||
|
||||
|
||||
def _get_nested_attr(obj, attr_path: str):
|
||||
parts = attr_path.split(".")
|
||||
current = obj
|
||||
for part in parts:
|
||||
if hasattr(current, part):
|
||||
current = getattr(current, part)
|
||||
else:
|
||||
return None
|
||||
return current
|
||||
|
||||
|
||||
def _set_nested_attr(obj, attr_path: str, value: Any) -> None:
|
||||
parts = attr_path.split(".")
|
||||
current: Any = obj
|
||||
for part in parts[:-1]:
|
||||
if hasattr(current, part):
|
||||
current = getattr(current, part)
|
||||
|
||||
final_attr = parts[-1]
|
||||
if hasattr(current, final_attr):
|
||||
attr = getattr(type(current), final_attr, None)
|
||||
if attr is not None and hasattr(attr, "annotation"):
|
||||
type_hint = getattr(attr, "annotation")
|
||||
if getattr(type_hint, "__origin__", None) is Union:
|
||||
type_hint = type_hint.__args__[0]
|
||||
if hasattr(type_hint, "__name__"):
|
||||
if type_hint.__name__ == "int" and isinstance(value, str):
|
||||
value = int(value)
|
||||
elif type_hint.__name__ == "float" and isinstance(value, str):
|
||||
value = float(value)
|
||||
elif type_hint.__name__ == "bool" and isinstance(value, str):
|
||||
value = value.lower() in ("true", "1", "yes")
|
||||
setattr(current, final_attr, value)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def models(ctx: click.Context):
|
||||
cfg: Config = ctx.obj["config"]
|
||||
|
||||
try:
|
||||
provider = OllamaProvider(
|
||||
endpoint=cfg.llm.endpoint,
|
||||
model=cfg.llm.model
|
||||
)
|
||||
|
||||
if not provider.is_available():
|
||||
rprint("[red]Ollama is not available. Make sure it's running.[/red]")
|
||||
rprint("Start Ollama with: ollama serve")
|
||||
sys.exit(1)
|
||||
|
||||
models = provider.list_models()
|
||||
|
||||
if not models:
|
||||
rprint("[yellow]No models found. Pull a model first.[/yellow]")
|
||||
rprint("Example: ollama pull codellama")
|
||||
return
|
||||
|
||||
rprint("[bold]Available Models[/bold]\n")
|
||||
for model in models:
|
||||
rprint(f" {model.name}")
|
||||
rprint(f" Size: {model.size}")
|
||||
rprint(f" Modified: {model.modified}\n")
|
||||
|
||||
except Exception as e:
|
||||
rprint(f"[red]Error listing models: {e}[/red]")
|
||||
raise
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def status(ctx: click.Context):
|
||||
cfg: Config = ctx.obj["config"]
|
||||
|
||||
rprint("[bold]Local AI Commit Reviewer Status[/bold]\n")
|
||||
|
||||
rprint("[bold]Configuration:[/bold]")
|
||||
rprint(f" LLM Endpoint: {cfg.llm.endpoint}")
|
||||
rprint(f" Model: {cfg.llm.model}")
|
||||
rprint(f" Strictness: {cfg.review.strictness}\n")
|
||||
|
||||
try:
|
||||
provider = OllamaProvider(
|
||||
endpoint=cfg.llm.endpoint,
|
||||
model=cfg.llm.model
|
||||
)
|
||||
|
||||
if provider.is_available():
|
||||
rprint("[green]✓ Ollama is running[/green]")
|
||||
models = provider.list_models()
|
||||
rprint(f" {len(models)} model(s) available")
|
||||
else:
|
||||
rprint("[red]✗ Ollama is not running[/red]")
|
||||
rprint(" Start with: ollama serve")
|
||||
except Exception as e:
|
||||
rprint(f"[red]✗ Error checking Ollama: {e}[/red]")
|
||||
|
||||
repo = GitRepo(ctx.obj["repo_path"])
|
||||
if repo.is_valid():
|
||||
rprint("\n[green]✓ Valid Git repository[/green]")
|
||||
branch = repo.get_current_branch()
|
||||
rprint(f" Branch: {branch}")
|
||||
|
||||
staged = repo.get_staged_files()
|
||||
rprint(f" Staged files: {len(staged)}")
|
||||
else:
|
||||
rprint("\n[yellow]⚠ Not a Git repository[/yellow]")
|
||||
|
||||
|
||||
def main():
|
||||
cli(obj={})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user