Initial upload: Local AI Commit Reviewer CLI with CI/CD workflow
This commit is contained in:
356
src/cli/cli.py
Normal file
356
src/cli/cli.py
Normal file
@@ -0,0 +1,356 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
import click
|
||||||
|
from rich import print as rprint
|
||||||
|
|
||||||
|
from ..config import Config, get_config
|
||||||
|
from ..core import ReviewEngine, ReviewResult
|
||||||
|
from ..formatters import get_formatter
|
||||||
|
from ..git import FileChange, GitRepo, get_staged_changes
|
||||||
|
from ..git import install_hook as git_install_hook
|
||||||
|
from ..llm import OllamaProvider
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
@click.option("--config", "-c", type=click.Path(exists=True), help="Path to config file")
|
||||||
|
@click.option("--endpoint", help="LLM endpoint URL", default=None)
|
||||||
|
@click.option("--model", "-m", help="Model name to use", default=None)
|
||||||
|
@click.pass_context
|
||||||
|
def cli(ctx: click.Context, config: str | None, endpoint: str | None, model: str | None):
|
||||||
|
ctx.ensure_object(dict)
|
||||||
|
cfg_path = config or os.environ.get("AICR_CONFIG_PATH")
|
||||||
|
cfg = get_config(cfg_path)
|
||||||
|
|
||||||
|
if endpoint:
|
||||||
|
cfg.llm.endpoint = endpoint
|
||||||
|
if model:
|
||||||
|
cfg.llm.model = model
|
||||||
|
|
||||||
|
ctx.obj["config"] = cfg
|
||||||
|
ctx.obj["repo_path"] = Path.cwd()
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
@click.option("--config", "-c", type=click.Path(exists=True), help="Path to config file")
|
||||||
|
@click.option("--endpoint", help="LLM endpoint URL", default=None)
|
||||||
|
@click.option("--model", "-m", help="Model name to use", default=None)
|
||||||
|
@click.pass_context
|
||||||
|
def cli(ctx: click.Context, config: str | None, endpoint: str | None, model: str | None):
|
||||||
|
ctx.ensure_object(dict)
|
||||||
|
cfg_path = config or os.environ.get("AICR_CONFIG_PATH")
|
||||||
|
cfg = get_config(cfg_path)
|
||||||
|
|
||||||
|
if endpoint:
|
||||||
|
cfg.llm.endpoint = endpoint
|
||||||
|
if model:
|
||||||
|
cfg.llm.model = model
|
||||||
|
|
||||||
|
ctx.obj["config"] = cfg
|
||||||
|
ctx.obj["repo_path"] = Path.cwd()
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.option("--strictness", "-s", type=click.Choice(["permissive", "balanced", "strict"]), default=None)
|
||||||
|
@click.option("--output", "-o", type=click.Choice(["terminal", "json", "markdown"]), default="terminal")
|
||||||
|
@click.option("--commit", "-C", help="Review a specific commit SHA", default=None)
|
||||||
|
@click.option("--hook", is_flag=True, help="Run in hook mode (exit non-zero on critical)")
|
||||||
|
@click.option("--file", "-f", multiple=True, help="Files to review (default: all staged)")
|
||||||
|
@click.pass_context
|
||||||
|
def review(
|
||||||
|
ctx: click.Context,
|
||||||
|
strictness: str | None,
|
||||||
|
output: str,
|
||||||
|
commit: str | None,
|
||||||
|
hook: bool,
|
||||||
|
file: tuple
|
||||||
|
):
|
||||||
|
cfg: Config = ctx.obj["config"]
|
||||||
|
|
||||||
|
if strictness is None:
|
||||||
|
strictness = cfg.review.strictness
|
||||||
|
|
||||||
|
try:
|
||||||
|
engine = ReviewEngine(config=cfg)
|
||||||
|
engine.set_repo(ctx.obj["repo_path"])
|
||||||
|
|
||||||
|
if commit:
|
||||||
|
result = engine.review_commit(commit, strictness=strictness)
|
||||||
|
else:
|
||||||
|
files = _get_files_to_review(ctx.obj["repo_path"], file)
|
||||||
|
|
||||||
|
if not files:
|
||||||
|
rprint("[yellow]No staged changes found. Stage files with 'git add <files>' first.[/yellow]")
|
||||||
|
if hook:
|
||||||
|
sys.exit(0)
|
||||||
|
return
|
||||||
|
|
||||||
|
result = engine.review_staged_changes(files, strictness=strictness)
|
||||||
|
|
||||||
|
formatter = get_formatter(output)
|
||||||
|
output_text = formatter.format(result)
|
||||||
|
rprint(output_text)
|
||||||
|
|
||||||
|
if output == "json":
|
||||||
|
ctx.obj["result_json"] = result.to_json()
|
||||||
|
elif output == "markdown":
|
||||||
|
ctx.obj["result_markdown"] = result.to_markdown()
|
||||||
|
|
||||||
|
_handle_hook_exit(result, hook, cfg)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
rprint(f"[red]Error during review: {e}[/red]")
|
||||||
|
if hook:
|
||||||
|
sys.exit(1)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def _get_files_to_review(repo_path: Path, file: tuple) -> list[FileChange]:
|
||||||
|
if file:
|
||||||
|
changes = []
|
||||||
|
for filename in file:
|
||||||
|
repo = GitRepo(repo_path)
|
||||||
|
diff = repo.get_staged_diff(filename)
|
||||||
|
if diff:
|
||||||
|
changes.append(FileChange(
|
||||||
|
filename=filename,
|
||||||
|
status="M",
|
||||||
|
diff=diff
|
||||||
|
))
|
||||||
|
return changes
|
||||||
|
return get_staged_changes(repo_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_hook_exit(result: ReviewResult, hook: bool, cfg: Config) -> None:
|
||||||
|
if not hook:
|
||||||
|
return
|
||||||
|
if result.has_critical_issues() and cfg.hooks.fail_on_critical:
|
||||||
|
rprint("\n[red]Critical issues found. Commit blocked.[/red]")
|
||||||
|
sys.exit(1)
|
||||||
|
if not result.has_issues():
|
||||||
|
rprint("[green]No issues found. Proceeding with commit.[/green]")
|
||||||
|
sys.exit(0)
|
||||||
|
if not cfg.hooks.fail_on_critical:
|
||||||
|
rprint("\n[yellow]Issues found but not blocking commit (fail_on_critical=false).[/yellow]")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.option("--local", is_flag=True, help="Install hook locally (in current repo)")
|
||||||
|
@click.option("--global", "global_", is_flag=True, help="Install hook globally")
|
||||||
|
@click.option("--force", is_flag=True, help="Overwrite existing hook")
|
||||||
|
@click.pass_context
|
||||||
|
def hook(ctx: click.Context, local: bool, global_: bool, force: bool):
|
||||||
|
ctx.ensure_object(dict)
|
||||||
|
|
||||||
|
if not local and not global_:
|
||||||
|
local = True
|
||||||
|
|
||||||
|
if global_:
|
||||||
|
home = Path.home()
|
||||||
|
git_template = home / ".git-template" / "hooks"
|
||||||
|
if not git_template.exists():
|
||||||
|
rprint("[yellow]Git template directory not found. Creating...[/yellow]")
|
||||||
|
git_template.mkdir(parents=True, exist_ok=True)
|
||||||
|
(git_template / "pre-commit").write_text(_get_hook_script())
|
||||||
|
rprint(f"[green]Global hook template created at {git_template}[/green]")
|
||||||
|
rprint("[yellow]Note: New repos will use this template. Existing repos need local install.[/yellow]")
|
||||||
|
else:
|
||||||
|
rprint("[green]Global hook template already exists.[/green]")
|
||||||
|
else:
|
||||||
|
repo_path = ctx.obj["repo_path"]
|
||||||
|
git_hooks = repo_path / ".git" / "hooks"
|
||||||
|
hook_path = git_hooks / "pre-commit"
|
||||||
|
|
||||||
|
if hook_path.exists() and not force:
|
||||||
|
rprint(f"[yellow]Hook already exists at {hook_path}. Use --force to overwrite.[/yellow]")
|
||||||
|
return
|
||||||
|
|
||||||
|
if git_install_hook(repo_path, "pre-commit", _get_hook_script()):
|
||||||
|
rprint(f"[green]Pre-commit hook installed at {hook_path}[/green]")
|
||||||
|
else:
|
||||||
|
rprint("[red]Failed to install hook.[/red]")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_hook_script() -> str:
|
||||||
|
return """#!/bin/bash
|
||||||
|
# Local AI Commit Reviewer - Pre-commit Hook
|
||||||
|
# Automatically reviews staged changes before committing
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# Allow bypass with --no-verify
|
||||||
|
if [ "$1" = "--no-verify" ]; then
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Get the directory where this script is located
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
|
||||||
|
# Run the AI commit reviewer
|
||||||
|
cd "$SCRIPT_DIR/../.."
|
||||||
|
python -m aicr review --hook --strictness balanced || exit 1
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.option("--set", "set_opt", nargs=2, multiple=True, help="Set config option (key value)")
|
||||||
|
@click.option("--get", help="Get config option value", default=None)
|
||||||
|
@click.option("--list", is_flag=True, help="List all config options")
|
||||||
|
@click.option("--path", is_flag=True, help="Show config file path")
|
||||||
|
@click.pass_context
|
||||||
|
def config(ctx: click.Context, set_opt: tuple, get: str | None, list_: bool, path: bool):
|
||||||
|
cfg: Config = ctx.obj["config"]
|
||||||
|
|
||||||
|
if path:
|
||||||
|
config_path = os.environ.get("AICR_CONFIG_PATH") or str(Path.cwd() / ".aicr.yaml")
|
||||||
|
rprint(f"Config path: {config_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if get:
|
||||||
|
value = _get_nested_attr(cfg, get)
|
||||||
|
if value is not None:
|
||||||
|
rprint(f"{get}: {value}")
|
||||||
|
else:
|
||||||
|
rprint(f"[red]Unknown config option: {get}[/red]")
|
||||||
|
return
|
||||||
|
|
||||||
|
if list_:
|
||||||
|
for section in ["llm", "review", "languages", "hooks", "output", "logging"]:
|
||||||
|
section_obj = getattr(cfg, section, None)
|
||||||
|
if section_obj:
|
||||||
|
rprint(f"[bold]{section.upper()}[/bold]")
|
||||||
|
for key, value in section_obj.model_dump().items():
|
||||||
|
rprint(f" {key}: {value}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if set_opt:
|
||||||
|
for key, value in set_opt:
|
||||||
|
_set_nested_attr(cfg, key, value)
|
||||||
|
rprint("[green]Configuration updated.[/green]")
|
||||||
|
return
|
||||||
|
|
||||||
|
rprint("[bold]Local AI Commit Reviewer Configuration[/bold]")
|
||||||
|
rprint(f"LLM Endpoint: {cfg.llm.endpoint}")
|
||||||
|
rprint(f"Model: {cfg.llm.model}")
|
||||||
|
rprint(f"Strictness: {cfg.review.strictness}")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_nested_attr(obj, attr_path: str):
|
||||||
|
parts = attr_path.split(".")
|
||||||
|
current = obj
|
||||||
|
for part in parts:
|
||||||
|
if hasattr(current, part):
|
||||||
|
current = getattr(current, part)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
return current
|
||||||
|
|
||||||
|
|
||||||
|
def _set_nested_attr(obj, attr_path: str, value: str):
|
||||||
|
parts = attr_path.split(".")
|
||||||
|
current = obj
|
||||||
|
for part in parts[:-1]:
|
||||||
|
if hasattr(current, part):
|
||||||
|
current = getattr(current, part)
|
||||||
|
|
||||||
|
final_attr = parts[-1]
|
||||||
|
if hasattr(current, final_attr):
|
||||||
|
attr = getattr(type(current), final_attr)
|
||||||
|
if hasattr(attr, "annotation"):
|
||||||
|
type_hint = attr.annotation
|
||||||
|
if getattr(type_hint, "__origin__", None) is Union:
|
||||||
|
type_hint = type_hint.__args__[0]
|
||||||
|
if hasattr(type_hint, "__name__"):
|
||||||
|
if type_hint.__name__ == "int":
|
||||||
|
value = int(value)
|
||||||
|
elif type_hint.__name__ == "float":
|
||||||
|
value = float(value)
|
||||||
|
elif type_hint.__name__ == "bool":
|
||||||
|
value = value.lower() in ("true", "1", "yes")
|
||||||
|
setattr(current, final_attr, value)
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.pass_context
|
||||||
|
def models(ctx: click.Context):
|
||||||
|
cfg: Config = ctx.obj["config"]
|
||||||
|
|
||||||
|
try:
|
||||||
|
provider = OllamaProvider(
|
||||||
|
endpoint=cfg.llm.endpoint,
|
||||||
|
model=cfg.llm.model
|
||||||
|
)
|
||||||
|
|
||||||
|
if not provider.is_available():
|
||||||
|
rprint("[red]Ollama is not available. Make sure it's running.[/red]")
|
||||||
|
rprint("Start Ollama with: ollama serve")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
models = provider.list_models()
|
||||||
|
|
||||||
|
if not models:
|
||||||
|
rprint("[yellow]No models found. Pull a model first.[/yellow]")
|
||||||
|
rprint("Example: ollama pull codellama")
|
||||||
|
return
|
||||||
|
|
||||||
|
rprint("[bold]Available Models[/bold]\n")
|
||||||
|
for model in models:
|
||||||
|
rprint(f" {model.name}")
|
||||||
|
rprint(f" Size: {model.size}")
|
||||||
|
rprint(f" Modified: {model.modified}\n")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
rprint(f"[red]Error listing models: {e}[/red]")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command()
|
||||||
|
@click.pass_context
|
||||||
|
def status(ctx: click.Context):
|
||||||
|
cfg: Config = ctx.obj["config"]
|
||||||
|
|
||||||
|
rprint("[bold]Local AI Commit Reviewer Status[/bold]\n")
|
||||||
|
|
||||||
|
rprint("[bold]Configuration:[/bold]")
|
||||||
|
rprint(f" LLM Endpoint: {cfg.llm.endpoint}")
|
||||||
|
rprint(f" Model: {cfg.llm.model}")
|
||||||
|
rprint(f" Strictness: {cfg.review.strictness}\n")
|
||||||
|
|
||||||
|
try:
|
||||||
|
provider = OllamaProvider(
|
||||||
|
endpoint=cfg.llm.endpoint,
|
||||||
|
model=cfg.llm.model
|
||||||
|
)
|
||||||
|
|
||||||
|
if provider.is_available():
|
||||||
|
rprint("[green]✓ Ollama is running[/green]")
|
||||||
|
models = provider.list_models()
|
||||||
|
rprint(f" {len(models)} model(s) available")
|
||||||
|
else:
|
||||||
|
rprint("[red]✗ Ollama is not running[/red]")
|
||||||
|
rprint(" Start with: ollama serve")
|
||||||
|
except Exception as e:
|
||||||
|
rprint(f"[red]✗ Error checking Ollama: {e}[/red]")
|
||||||
|
|
||||||
|
repo = GitRepo(ctx.obj["repo_path"])
|
||||||
|
if repo.is_valid():
|
||||||
|
rprint("\n[green]✓ Valid Git repository[/green]")
|
||||||
|
branch = repo.get_current_branch()
|
||||||
|
rprint(f" Branch: {branch}")
|
||||||
|
|
||||||
|
staged = repo.get_staged_files()
|
||||||
|
rprint(f" Staged files: {len(staged)}")
|
||||||
|
else:
|
||||||
|
rprint("\n[yellow]⚠ Not a Git repository[/yellow]")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
cli(obj={})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user