Fix CI/CD: Add Gitea Actions workflow and fix linting issues

This commit is contained in:
Developer
2026-02-05 09:02:49 +00:00
commit d8325c4be2
111 changed files with 19657 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
from . import cli, config, core, formatters, git, hooks, llm, utils
__version__ = "0.1.0"
__all__ = [
"cli",
"config",
"core",
"formatters",
"git",
"hooks",
"llm",
"utils",
]

View File

@@ -0,0 +1,3 @@
from .cli import cli, main
__all__ = ["cli", "main"]

View File

@@ -0,0 +1,337 @@
import os
import sys
from pathlib import Path
from typing import Any, Union
import click
from rich import print as rprint
from ..config import Config, get_config
from ..core import ReviewEngine, ReviewResult
from ..formatters import get_formatter
from ..git import FileChange, GitRepo, get_staged_changes
from ..git import install_hook as git_install_hook
from ..llm import OllamaProvider
@click.group()
@click.option("--config", "-c", type=click.Path(exists=True), help="Path to config file")
@click.option("--endpoint", help="LLM endpoint URL", default=None)
@click.option("--model", "-m", help="Model name to use", default=None)
@click.pass_context
def cli(ctx: click.Context, config: str | None, endpoint: str | None, model: str | None):
ctx.ensure_object(dict)
cfg_path = config or os.environ.get("AICR_CONFIG_PATH")
cfg = get_config(cfg_path)
if endpoint:
cfg.llm.endpoint = endpoint
if model:
cfg.llm.model = model
ctx.obj["config"] = cfg
ctx.obj["repo_path"] = Path.cwd()
@cli.command()
@click.option("--strictness", "-s", type=click.Choice(["permissive", "balanced", "strict"]), default=None)
@click.option("--output", "-o", type=click.Choice(["terminal", "json", "markdown"]), default="terminal")
@click.option("--commit", "-C", help="Review a specific commit SHA", default=None)
@click.option("--hook", is_flag=True, help="Run in hook mode (exit non-zero on critical)")
@click.option("--file", "-f", multiple=True, help="Files to review (default: all staged)")
@click.pass_context
def review( # noqa: PLR0913
ctx: click.Context,
strictness: str | None,
output: str,
commit: str | None,
hook: bool,
file: tuple
):
cfg: Config = ctx.obj["config"]
if strictness is None:
strictness = cfg.review.strictness
try:
engine = ReviewEngine(config=cfg)
engine.set_repo(ctx.obj["repo_path"])
if commit:
result = engine.review_commit(commit, strictness=strictness)
else:
files = _get_files_to_review(ctx.obj["repo_path"], file)
if not files:
rprint("[yellow]No staged changes found. Stage files with 'git add <files>' first.[/yellow]")
if hook:
sys.exit(0)
return
result = engine.review_staged_changes(files, strictness=strictness)
formatter = get_formatter(output)
output_text = formatter.format(result)
rprint(output_text)
if output == "json":
ctx.obj["result_json"] = result.to_json()
elif output == "markdown":
ctx.obj["result_markdown"] = result.to_markdown()
_handle_hook_exit(result, hook, cfg)
except Exception as e:
rprint(f"[red]Error during review: {e}[/red]")
if hook:
sys.exit(1)
raise
def _get_files_to_review(repo_path: Path, file: tuple) -> list[FileChange]:
if file:
changes = []
for filename in file:
repo = GitRepo(repo_path)
diff = repo.get_staged_diff(filename)
if diff:
changes.append(FileChange(
filename=filename,
status="M",
diff=diff
))
return changes
return get_staged_changes(repo_path)
def _handle_hook_exit(result: ReviewResult, hook: bool, cfg: Config) -> None:
if not hook:
return
if result.has_critical_issues() and cfg.hooks.fail_on_critical:
rprint("\n[red]Critical issues found. Commit blocked.[/red]")
sys.exit(1)
if not result.has_issues():
rprint("[green]No issues found. Proceeding with commit.[/green]")
sys.exit(0)
if not cfg.hooks.fail_on_critical:
rprint("\n[yellow]Issues found but not blocking commit (fail_on_critical=false).[/yellow]")
sys.exit(0)
@cli.command()
@click.option("--local", is_flag=True, help="Install hook locally (in current repo)")
@click.option("--global", "global_", is_flag=True, help="Install hook globally")
@click.option("--force", is_flag=True, help="Overwrite existing hook")
@click.pass_context
def hook(ctx: click.Context, local: bool, global_: bool, force: bool):
ctx.ensure_object(dict)
if not local and not global_:
local = True
if global_:
home = Path.home()
git_template = home / ".git-template" / "hooks"
if not git_template.exists():
rprint("[yellow]Git template directory not found. Creating...[/yellow]")
git_template.mkdir(parents=True, exist_ok=True)
(git_template / "pre-commit").write_text(_get_hook_script())
rprint(f"[green]Global hook template created at {git_template}[/green]")
rprint("[yellow]Note: New repos will use this template. Existing repos need local install.[/yellow]")
else:
rprint("[green]Global hook template already exists.[/green]")
else:
repo_path = ctx.obj["repo_path"]
git_hooks = repo_path / ".git" / "hooks"
hook_path = git_hooks / "pre-commit"
if hook_path.exists() and not force:
rprint(f"[yellow]Hook already exists at {hook_path}. Use --force to overwrite.[/yellow]")
return
if git_install_hook(repo_path, "pre-commit", _get_hook_script()):
rprint(f"[green]Pre-commit hook installed at {hook_path}[/green]")
else:
rprint("[red]Failed to install hook.[/red]")
sys.exit(1)
def _get_hook_script() -> str:
return """#!/bin/bash
# Local AI Commit Reviewer - Pre-commit Hook
# Automatically reviews staged changes before committing
set -e
# Allow bypass with --no-verify
if [ "$1" = "--no-verify" ]; then
exit 0
fi
# Get the directory where this script is located
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Run the AI commit reviewer
cd "$SCRIPT_DIR/../.."
python -m aicr review --hook --strictness balanced || exit 1
"""
@cli.command()
@click.option("--set", "set_opt", nargs=2, multiple=True, help="Set config option (key value)")
@click.option("--get", help="Get config option value", default=None)
@click.option("--list", is_flag=True, help="List all config options")
@click.option("--path", is_flag=True, help="Show config file path")
@click.pass_context
def config(ctx: click.Context, set_opt: tuple, get: str | None, list_: bool, path: bool):
cfg: Config = ctx.obj["config"]
if path:
config_path = os.environ.get("AICR_CONFIG_PATH") or str(Path.cwd() / ".aicr.yaml")
rprint(f"Config path: {config_path}")
return
if get:
value = _get_nested_attr(cfg, get)
if value is not None:
rprint(f"{get}: {value}")
else:
rprint(f"[red]Unknown config option: {get}[/red]")
return
if list_:
for section in ["llm", "review", "languages", "hooks", "output", "logging"]:
section_obj = getattr(cfg, section, None)
if section_obj:
rprint(f"[bold]{section.upper()}[/bold]")
for key, value in section_obj.model_dump().items():
rprint(f" {key}: {value}")
return
if set_opt:
for key, value in set_opt:
_set_nested_attr(cfg, key, value)
rprint("[green]Configuration updated.[/green]")
return
rprint("[bold]Local AI Commit Reviewer Configuration[/bold]")
rprint(f"LLM Endpoint: {cfg.llm.endpoint}")
rprint(f"Model: {cfg.llm.model}")
rprint(f"Strictness: {cfg.review.strictness}")
def _get_nested_attr(obj, attr_path: str):
parts = attr_path.split(".")
current = obj
for part in parts:
if hasattr(current, part):
current = getattr(current, part)
else:
return None
return current
def _set_nested_attr(obj, attr_path: str, value: Any) -> None:
parts = attr_path.split(".")
current: Any = obj
for part in parts[:-1]:
if hasattr(current, part):
current = getattr(current, part)
final_attr = parts[-1]
if hasattr(current, final_attr):
attr = getattr(type(current), final_attr, None)
if attr is not None and hasattr(attr, "annotation"):
type_hint = attr.annotation # type: ignore[attr-defined]
if getattr(type_hint, "__origin__", None) is Union:
type_hint = type_hint.__args__[0]
if hasattr(type_hint, "__name__"):
if type_hint.__name__ == "int" and isinstance(value, str):
value = int(value)
elif type_hint.__name__ == "float" and isinstance(value, str):
value = float(value)
elif type_hint.__name__ == "bool" and isinstance(value, str):
value = value.lower() in ("true", "1", "yes")
setattr(current, final_attr, value)
@cli.command()
@click.pass_context
def models(ctx: click.Context):
cfg: Config = ctx.obj["config"]
try:
provider = OllamaProvider(
endpoint=cfg.llm.endpoint,
model=cfg.llm.model
)
if not provider.is_available():
rprint("[red]Ollama is not available. Make sure it's running.[/red]")
rprint("Start Ollama with: ollama serve")
sys.exit(1)
models = provider.list_models()
if not models:
rprint("[yellow]No models found. Pull a model first.[/yellow]")
rprint("Example: ollama pull codellama")
return
rprint("[bold]Available Models[/bold]\n")
for model in models:
rprint(f" {model.name}")
rprint(f" Size: {model.size}")
rprint(f" Modified: {model.modified}\n")
except Exception as e:
rprint(f"[red]Error listing models: {e}[/red]")
raise
@cli.command()
@click.pass_context
def status(ctx: click.Context):
cfg: Config = ctx.obj["config"]
rprint("[bold]Local AI Commit Reviewer Status[/bold]\n")
rprint("[bold]Configuration:[/bold]")
rprint(f" LLM Endpoint: {cfg.llm.endpoint}")
rprint(f" Model: {cfg.llm.model}")
rprint(f" Strictness: {cfg.review.strictness}\n")
try:
provider = OllamaProvider(
endpoint=cfg.llm.endpoint,
model=cfg.llm.model
)
if provider.is_available():
rprint("[green]✓ Ollama is running[/green]")
models = provider.list_models()
rprint(f" {len(models)} model(s) available")
else:
rprint("[red]✗ Ollama is not running[/red]")
rprint(" Start with: ollama serve")
except Exception as e:
rprint(f"[red]✗ Error checking Ollama: {e}[/red]")
repo = GitRepo(ctx.obj["repo_path"])
if repo.is_valid():
rprint("\n[green]✓ Valid Git repository[/green]")
branch = repo.get_current_branch()
rprint(f" Branch: {branch}")
staged = repo.get_staged_files()
rprint(f" Staged files: {len(staged)}")
else:
rprint("\n[yellow]⚠ Not a Git repository[/yellow]")
def main():
cli(obj={})
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
from .config import Config, ConfigLoader, Languages, StrictnessProfile, get_config
__all__ = ["Config", "ConfigLoader", "Languages", "StrictnessProfile", "get_config"]

View File

@@ -0,0 +1,164 @@
import os
from pathlib import Path
from typing import Any
import yaml # type: ignore[import-untyped]
from pydantic import BaseModel, Field
class LLMConfig(BaseModel):
endpoint: str = "http://localhost:11434"
model: str = "codellama"
timeout: int = 120
max_tokens: int = 2048
temperature: float = 0.3
class ReviewSettings(BaseModel):
strictness: str = "balanced"
max_issues_per_file: int = 20
syntax_highlighting: bool = True
show_line_numbers: bool = True
class LanguageConfig(BaseModel):
enabled: bool = True
review_rules: list[str] = Field(default_factory=list)
max_line_length: int = 100
class Languages(BaseModel):
python: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["pep8", "type-hints", "docstrings"]))
javascript: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["airbnb"]))
typescript: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["airbnb"]))
go: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["golint", "staticcheck"]))
rust: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["clippy"]))
java: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["google-java"]))
c: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["cppcheck"]))
cpp: LanguageConfig = Field(default_factory=lambda: LanguageConfig(review_rules=["cppcheck"]))
def get_language_config(self, language: str) -> LanguageConfig | None:
return getattr(self, language.lower(), None)
class StrictnessProfile(BaseModel):
description: str = ""
check_security: bool = True
check_bugs: bool = True
check_style: bool = True
check_performance: bool = False
check_documentation: bool = False
min_severity: str = "info"
class StrictnessProfiles(BaseModel):
permissive: StrictnessProfile = Field(default_factory=lambda: StrictnessProfile(
description="Focus on critical issues only",
check_security=True,
check_bugs=True,
check_style=False,
check_performance=False,
check_documentation=False,
min_severity="warning"
))
balanced: StrictnessProfile = Field(default_factory=lambda: StrictnessProfile(
description="Balanced review of common issues",
check_security=True,
check_bugs=True,
check_style=True,
check_performance=False,
check_documentation=False,
min_severity="info"
))
strict: StrictnessProfile = Field(default_factory=lambda: StrictnessProfile(
description="Comprehensive review of all issues",
check_security=True,
check_bugs=True,
check_style=True,
check_performance=True,
check_documentation=True,
min_severity="info"
))
def get_profile(self, name: str) -> StrictnessProfile:
return getattr(self, name.lower(), self.balanced)
class HooksConfig(BaseModel):
enabled: bool = True
fail_on_critical: bool = True
allow_bypass: bool = True
class OutputConfig(BaseModel):
format: str = "terminal"
theme: str = "auto"
show_suggestions: bool = True
class LoggingConfig(BaseModel):
level: str = "info"
log_file: str = ""
structured: bool = False
class Config(BaseModel):
llm: LLMConfig = Field(default_factory=LLMConfig)
review: ReviewSettings = Field(default_factory=ReviewSettings)
languages: Languages = Field(default_factory=Languages)
strictness_profiles: StrictnessProfiles = Field(default_factory=StrictnessProfiles)
hooks: HooksConfig = Field(default_factory=HooksConfig)
output: OutputConfig = Field(default_factory=OutputConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
class ConfigLoader:
def __init__(self, config_path: str | None = None):
self.config_path = config_path
self.global_config: Path | None = None
self.project_config: Path | None = None
def find_config_files(self) -> tuple[Path | None, Path | None]:
env_config_path = os.environ.get("AICR_CONFIG_PATH")
if env_config_path:
env_path = Path(env_config_path)
if env_path.exists():
return env_path, None
self.global_config = Path.home() / ".aicr.yaml"
self.project_config = Path.cwd() / ".aicr.yaml"
if self.project_config.exists():
return self.project_config, self.global_config
if self.global_config.exists():
return self.global_config, None
return None, None
def load(self) -> Config:
config_path, global_path = self.find_config_files()
config_data: dict[str, Any] = {}
if global_path and global_path.exists():
with open(global_path) as f:
global_data = yaml.safe_load(f) or {}
config_data.update(global_data)
if config_path and config_path.exists():
with open(config_path) as f:
project_data = yaml.safe_load(f) or {}
config_data.update(project_data)
return Config(**config_data)
def save(self, config: Config, path: Path) -> None:
with open(path, "w") as f:
yaml.dump(config.model_dump(), f, default_flow_style=False)
def get_config(config_path: str | None = None) -> Config:
loader = ConfigLoader(config_path)
return loader.load()

View File

@@ -0,0 +1,3 @@
from .review_engine import Issue, IssueCategory, IssueSeverity, ReviewEngine, ReviewResult
__all__ = ["Issue", "IssueCategory", "IssueSeverity", "ReviewEngine", "ReviewResult"]

View File

@@ -0,0 +1,423 @@
import json
import re
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from ..config import Config, StrictnessProfile
from ..git import FileChange, GitRepo
from ..llm import LLMProvider, OllamaProvider
from ..llm.templates import ReviewPromptTemplates
class IssueSeverity(str, Enum):
CRITICAL = "critical"
WARNING = "warning"
INFO = "info"
class IssueCategory(str, Enum):
BUG = "bug"
SECURITY = "security"
STYLE = "style"
PERFORMANCE = "performance"
DOCUMENTATION = "documentation"
@dataclass
class Issue:
file: str
line: int
severity: IssueSeverity
category: IssueCategory
message: str
suggestion: str | None = None
raw_line: str | None = None
def to_dict(self) -> dict:
return {
"file": self.file,
"line": self.line,
"severity": self.severity.value,
"category": self.category.value,
"message": self.message,
"suggestion": self.suggestion,
"raw_line": self.raw_line
}
@classmethod
def from_dict(cls, data: dict) -> "Issue":
return cls(
file=data["file"],
line=data["line"],
severity=IssueSeverity(data["severity"]),
category=IssueCategory(data["category"]),
message=data["message"],
suggestion=data.get("suggestion"),
raw_line=data.get("raw_line")
)
@dataclass
class ReviewSummary:
critical_count: int = 0
warning_count: int = 0
info_count: int = 0
files_reviewed: int = 0
lines_changed: int = 0
overall_assessment: str = ""
issues_by_category: dict = field(default_factory=dict)
issues_by_file: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"critical_count": self.critical_count,
"warning_count": self.warning_count,
"info_count": self.info_count,
"files_reviewed": self.files_reviewed,
"lines_changed": self.lines_changed,
"overall_assessment": self.overall_assessment,
"issues_by_category": self.issues_by_category,
"issues_by_file": self.issues_by_file
}
@dataclass
class ReviewResult:
issues: list[Issue] = field(default_factory=list)
summary: ReviewSummary = field(default_factory=ReviewSummary)
model_used: str = ""
tokens_used: int = 0
review_mode: str = ""
error: str | None = None
def has_critical_issues(self) -> bool:
return any(issue.severity == IssueSeverity.CRITICAL for issue in self.issues)
def has_issues(self) -> bool:
return len(self.issues) > 0
def get_issues_by_severity(self, severity: IssueSeverity) -> list[Issue]:
return [issue for issue in self.issues if issue.severity == severity]
def get_issues_by_file(self, filename: str) -> list[Issue]:
return [issue for issue in self.issues if issue.file == filename]
def get_issues_by_category(self, category: IssueCategory) -> list[Issue]:
return [issue for issue in self.issues if issue.category == category]
def to_json(self) -> str:
return json.dumps({
"issues": [issue.to_dict() for issue in self.issues],
"summary": self.summary.to_dict(),
"model_used": self.model_used,
"tokens_used": self.tokens_used,
"review_mode": self.review_mode
}, indent=2)
def to_markdown(self) -> str:
lines = ["# AI Commit Review Results\n"]
lines.append("## Summary\n")
lines.append(f"- **Files Reviewed**: {self.summary.files_reviewed}")
lines.append(f"- **Lines Changed**: {self.summary.lines_changed}")
lines.append(f"- **Critical Issues**: {self.summary.critical_count}")
lines.append(f"- **Warnings**: {self.summary.warning_count}")
lines.append(f"- **Info**: {self.summary.info_count}")
lines.append(f"- **Assessment**: {self.summary.overall_assessment}\n")
if self.issues:
lines.append("## Issues Found\n")
for severity in [IssueSeverity.CRITICAL, IssueSeverity.WARNING, IssueSeverity.INFO]:
severity_issues = self.get_issues_by_severity(severity)
if severity_issues:
lines.append(f"### {severity.value.upper()} ({len(severity_issues)})\n")
for issue in severity_issues:
lines.append(f"#### {issue.file}:{issue.line}")
lines.append(f"- **Category**: {issue.category.value}")
lines.append(f"- **Message**: {issue.message}")
if issue.suggestion:
lines.append(f"- **Suggestion**: {issue.suggestion}")
lines.append("")
return "\n".join(lines)
class ReviewEngine:
def __init__(
self,
config: Config | None = None,
llm_provider: LLMProvider | None = None
):
self.config = config or Config()
self.llm_provider = llm_provider or OllamaProvider(
endpoint=self.config.llm.endpoint,
model=self.config.llm.model,
timeout=self.config.llm.timeout
)
self.repo: GitRepo | None = None
def set_repo(self, path: Path) -> None:
self.repo = GitRepo(path)
def _parse_llm_response(self, response_text: str, files: list[FileChange]) -> ReviewResult:
result = ReviewResult()
try:
json_match = re.search(r'\{[\s\S]*\}', response_text)
if json_match:
json_str = json_match.group()
data = json.loads(json_str)
issues_data = data.get("issues", [])
for issue_data in issues_data:
try:
issue = Issue.from_dict(issue_data)
result.issues.append(issue)
except Exception:
pass
summary_data = data.get("summary", {})
result.summary.critical_count = summary_data.get("critical_count", 0)
result.summary.warning_count = summary_data.get("warning_count", 0)
result.summary.info_count = summary_data.get("info_count", 0)
result.summary.overall_assessment = summary_data.get("overall_assessment", "")
else:
text_issues = self._parse_text_response(response_text, files)
result.issues = text_issues
except json.JSONDecodeError:
result.issues = self._parse_text_response(response_text, files)
return result
def _parse_text_response(self, response_text: str, files: list[FileChange]) -> list[Issue]: # noqa: ARG002
issues = []
lines = response_text.split("\n")
current_file = ""
for line in lines:
file_match = re.match(r'^\*\*(.+?)\*\*:\s*(\d+)', line)
if file_match:
current_file = file_match.group(1)
line_num = int(file_match.group(2))
severity = IssueSeverity.WARNING
if "critical" in line.lower():
severity = IssueSeverity.CRITICAL
elif "security" in line.lower():
severity = IssueSeverity.CRITICAL
category = IssueCategory.SECURITY
else:
category = IssueCategory.BUG
message = line
suggestion = None
if "->" in line:
parts = line.split("->")
message = parts[0].strip()
suggestion = "->".join(parts[1:]).strip()
issues.append(Issue(
file=current_file,
line=line_num,
severity=severity,
category=category,
message=message,
suggestion=suggestion
))
return issues
def _get_strictness_profile(self) -> StrictnessProfile:
return self.config.strictness_profiles.get_profile(
self.config.review.strictness
)
def _filter_issues_by_strictness(self, issues: list[Issue]) -> list[Issue]:
profile = self._get_strictness_profile()
severity_order = {
IssueSeverity.CRITICAL: 0,
IssueSeverity.WARNING: 1,
IssueSeverity.INFO: 2
}
min_severity = profile.min_severity.lower()
min_level = 2
if min_severity == "critical":
min_level = 0
elif min_severity == "warning":
min_level = 1
filtered = []
for issue in issues:
level = severity_order.get(issue.severity, 2)
if level <= min_level:
if issue.category == IssueCategory.SECURITY and not profile.check_security:
continue
if issue.category == IssueCategory.BUG and not profile.check_bugs:
continue
if issue.category == IssueCategory.STYLE and not profile.check_style:
continue
if issue.category == IssueCategory.PERFORMANCE and not profile.check_performance:
continue
if issue.category == IssueCategory.DOCUMENTATION and not profile.check_documentation:
continue
filtered.append(issue)
return filtered
def _aggregate_summary(self, issues: list[Issue], files: list[FileChange]) -> ReviewSummary:
summary = ReviewSummary()
summary.files_reviewed = len(files)
summary.lines_changed = sum(
sum(1 for line in f.diff.split("\n") if line.startswith("+") and not line.startswith("+++"))
for f in files
)
for issue in issues:
if issue.severity == IssueSeverity.CRITICAL:
summary.critical_count += 1
elif issue.severity == IssueSeverity.WARNING:
summary.warning_count += 1
else:
summary.info_count += 1
if issue.category.value not in summary.issues_by_category:
summary.issues_by_category[issue.category.value] = []
summary.issues_by_category[issue.category.value].append(issue.file)
if issue.file not in summary.issues_by_file:
summary.issues_by_file[issue.file] = []
summary.issues_by_file[issue.file].append(issue.line)
if summary.critical_count > 0:
summary.overall_assessment = "Critical issues found. Review recommended before committing."
elif summary.warning_count > 0:
summary.overall_assessment = "Warnings found. Consider addressing before committing."
elif summary.info_count > 0:
summary.overall_assessment = "Minor issues found. Ready for commit with optional fixes."
else:
summary.overall_assessment = "No issues found. Code is ready for commit."
return summary
def review_staged_changes(
self,
files: list[FileChange] | None = None,
strictness: str | None = None,
language: str | None = None
) -> ReviewResult:
if files is None:
if self.repo is None:
self.repo = GitRepo(Path.cwd())
files = self.repo.get_all_staged_changes()
if not files:
return ReviewResult(error="No staged changes found")
result = ReviewResult()
result.review_mode = strictness or self.config.review.strictness
if strictness is None:
strictness = self.config.review.strictness
all_issues = []
for file_change in files:
if not file_change.diff.strip():
continue
file_language = language
if not file_language and self.repo is not None:
file_language = self.repo.get_file_language(file_change.filename)
prompt = ReviewPromptTemplates.get_prompt(
diff=file_change.diff,
strictness=strictness,
language=file_language or ""
)
try:
if self.llm_provider.is_available():
response = self.llm_provider.generate(
prompt,
max_tokens=self.config.llm.max_tokens,
temperature=self.config.llm.temperature
)
result.model_used = response.model
result.tokens_used += response.tokens_used
file_result = self._parse_llm_response(response.text, [file_change])
all_issues.extend(file_result.issues)
else:
result.error = "LLM provider is not available"
return result
except Exception as e:
result.error = f"Review failed: {e!s}"
return result
filtered_issues = self._filter_issues_by_strictness(all_issues)
max_issues = self.config.review.max_issues_per_file
limited_issues = filtered_issues[:max_issues * len(files)]
result.issues = limited_issues
result.summary = self._aggregate_summary(limited_issues, files)
return result
def review_commit(
self,
sha: str,
strictness: str | None = None
) -> ReviewResult:
if self.repo is None:
self.repo = GitRepo(Path.cwd())
commit_info = self.repo.get_commit_info(sha)
if commit_info is None:
return ReviewResult(error=f"Commit {sha} not found")
result = ReviewResult()
result.review_mode = strictness or self.config.review.strictness
if strictness is None:
strictness = self.config.review.strictness
all_issues = []
for file_change in commit_info.changes:
if not file_change.diff.strip():
continue
prompt = ReviewPromptTemplates.get_commit_review_prompt(
diff=file_change.diff,
commit_message=commit_info.message,
strictness=strictness
)
try:
if self.llm_provider.is_available():
response = self.llm_provider.generate(
prompt,
max_tokens=self.config.llm.max_tokens,
temperature=self.config.llm.temperature
)
result.model_used = response.model
result.tokens_used += response.tokens_used
file_result = self._parse_llm_response(response.text, [file_change])
all_issues.extend(file_result.issues)
else:
result.error = "LLM provider is not available"
return result
except Exception as e:
result.error = f"Review failed: {e!s}"
return result
filtered_issues = self._filter_issues_by_strictness(all_issues)
result.issues = filtered_issues
result.summary = self._aggregate_summary(filtered_issues, commit_info.changes)
return result

View File

@@ -0,0 +1,3 @@
from .formatters import JSONFormatter, MarkdownFormatter, TerminalFormatter, get_formatter
__all__ = ["JSONFormatter", "MarkdownFormatter", "TerminalFormatter", "get_formatter"]

View File

@@ -0,0 +1,141 @@
from abc import ABC, abstractmethod
from rich.console import Console
from rich.panel import Panel
from rich.style import Style
from rich.table import Table
from rich.text import Text
from ..core import Issue, IssueCategory, IssueSeverity, ReviewResult
class BaseFormatter(ABC):
@abstractmethod
def format(self, result: ReviewResult) -> str:
pass
class TerminalFormatter(BaseFormatter):
def __init__(self, theme: str = "auto", show_line_numbers: bool = True):
self.console = Console()
self.show_line_numbers = show_line_numbers
self.use_colors = theme != "dark" if theme == "auto" else theme == "dark"
def _get_severity_style(self, severity: IssueSeverity) -> Style:
styles = {
IssueSeverity.CRITICAL: Style(color="red", bold=True),
IssueSeverity.WARNING: Style(color="yellow"),
IssueSeverity.INFO: Style(color="blue"),
}
return styles.get(severity, Style())
def _get_category_icon(self, category: IssueCategory) -> str:
icons = {
IssueCategory.BUG: "[BUG]",
IssueCategory.SECURITY: "[SECURITY]",
IssueCategory.STYLE: "[STYLE]",
IssueCategory.PERFORMANCE: "[PERF]",
IssueCategory.DOCUMENTATION: "[DOC]",
}
return icons.get(category, "")
def _format_issue(self, issue: Issue) -> Text:
text = Text()
text.append(f"{issue.file}:{issue.line} ", style="dim")
text.append(f"[{issue.severity.value.upper()}] ", self._get_severity_style(issue.severity))
text.append(f"{self._get_category_icon(issue.category)} ")
text.append(issue.message)
if issue.suggestion:
text.append("\n Suggestion: ", style="dim")
text.append(issue.suggestion)
return text
def format(self, result: ReviewResult) -> str:
output: list[Panel | Table | str] = []
if result.error:
output.append(Panel(
f"[red]Error: {result.error}[/red]",
title="Review Failed",
expand=False
))
return "\n".join(str(p) for p in output)
summary = result.summary
summary_panel = Panel(
f"[bold]Files Reviewed:[/bold] {summary.files_reviewed}\n"
f"[bold]Lines Changed:[/bold] {summary.lines_changed}\n\n"
f"[red]Critical:[/red] {summary.critical_count} "
f"[yellow]Warnings:[/yellow] {summary.warning_count} "
f"[blue]Info:[/blue] {summary.info_count}\n\n"
f"[bold]Assessment:[/bold] {summary.overall_assessment}",
title="Review Summary",
expand=False
)
output.append(summary_panel)
if result.issues:
issues_table = Table(title="Issues Found", show_header=True)
issues_table.add_column("File", style="dim")
issues_table.add_column("Line", justify="right", style="dim")
issues_table.add_column("Severity", width=10)
issues_table.add_column("Category", width=12)
issues_table.add_column("Message")
for issue in result.issues:
issues_table.add_row(
issue.file,
str(issue.line),
f"[{issue.severity.value.upper()}]",
f"[{issue.category.value.upper()}]",
issue.message,
style=self._get_severity_style(issue.severity)
)
output.append(issues_table)
suggestions_panel = Panel(
"\n".join(
f"[bold]{issue.file}:{issue.line}[/bold]\n"
f" {issue.message}\n"
+ (f" [green]→ {issue.suggestion}[/green]\n" if issue.suggestion else "")
for issue in result.issues if issue.suggestion
),
title="Suggestions",
expand=False
)
output.append(suggestions_panel)
model_info = Panel(
f"[bold]Model:[/bold] {result.model_used}\n"
f"[bold]Tokens Used:[/bold] {result.tokens_used}\n"
f"[bold]Mode:[/bold] {result.review_mode}",
title="Review Info",
expand=False
)
output.append(model_info)
return "\n".join(str(o) for o in output)
class JSONFormatter(BaseFormatter):
def format(self, result: ReviewResult) -> str:
return result.to_json()
class MarkdownFormatter(BaseFormatter):
def format(self, result: ReviewResult) -> str:
return result.to_markdown()
def get_formatter(format_type: str = "terminal", **kwargs) -> BaseFormatter:
formatters: dict[str, type[BaseFormatter]] = {
"terminal": TerminalFormatter,
"json": JSONFormatter,
"markdown": MarkdownFormatter,
}
formatter_class = formatters.get(format_type, TerminalFormatter)
return formatter_class(**kwargs) # type: ignore[arg-type]

View File

@@ -0,0 +1,3 @@
from .git import FileChange, GitRepo, get_commit_context, get_staged_changes, install_hook
__all__ = ["FileChange", "GitRepo", "get_commit_context", "get_staged_changes", "install_hook"]

View File

@@ -0,0 +1,278 @@
import subprocess
from dataclasses import dataclass
from pathlib import Path
@dataclass
class FileChange:
filename: str
status: str
diff: str
old_content: str | None = None
new_content: str | None = None
@dataclass
class CommitInfo:
sha: str
message: str
author: str
date: str
changes: list[FileChange]
class GitRepo:
def __init__(self, path: Path | None = None):
self.path = path or Path.cwd()
self.repo = self._get_repo()
def _get_repo(self) -> Path | None:
try:
result = subprocess.run(
["git", "rev-parse", "--show-toplevel"],
cwd=self.path,
capture_output=True,
text=True,
timeout=5,
check=False
)
if result.returncode == 0:
return Path(result.stdout.strip())
except subprocess.TimeoutExpired:
pass
return None
def is_valid(self) -> bool:
return self.repo is not None and (self.repo / ".git").exists()
def get_staged_files(self) -> list[str]:
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-only"],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
if result.returncode == 0:
return result.stdout.strip().split("\n") if result.stdout.strip() else []
except subprocess.TimeoutExpired:
pass
return []
def get_staged_diff(self, filename: str) -> str:
try:
result = subprocess.run(
["git", "diff", "--cached", "--", filename],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
return result.stdout if result.returncode == 0 else ""
except subprocess.TimeoutExpired:
return ""
def get_all_staged_changes(self) -> list[FileChange]:
files = self.get_staged_files()
changes = []
for filename in files:
diff = self.get_staged_diff(filename)
status = self._get_file_status(filename)
changes.append(FileChange(
filename=filename,
status=status,
diff=diff
))
return changes
def _get_file_status(self, filename: str) -> str:
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-status", "--", filename],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip().split()[0]
except subprocess.TimeoutExpired:
pass
return "M"
def get_commit_info(self, sha: str) -> CommitInfo | None:
try:
message_result = subprocess.run(
["git", "log", "-1", "--format=%B", sha],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
author_result = subprocess.run(
["git", "log", "-1", "--format=%an|%ae|%ad", "--date=iso", sha],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
if message_result.returncode == 0 and author_result.returncode == 0:
message = message_result.stdout.strip()
author_parts = author_result.stdout.strip().split("|")
author = author_parts[0] if author_parts else "Unknown"
date = author_parts[2] if len(author_parts) > 2 else "" # noqa: PLR2004
changes = self._get_commit_changes(sha)
return CommitInfo(
sha=sha,
message=message,
author=author,
date=date,
changes=changes
)
except subprocess.TimeoutExpired:
pass
return None
def _get_commit_changes(self, sha: str) -> list[FileChange]:
try:
result = subprocess.run(
["git", "show", "--stat", sha],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
files = []
if result.returncode == 0:
for line in result.stdout.split("\n"):
if line.startswith(" ") and "|" in line:
filename = line.split("|")[0].strip()
diff = self._get_commit_file_diff(sha, filename)
files.append(FileChange(
filename=filename,
status="M",
diff=diff
))
return files
except subprocess.TimeoutExpired:
return []
return []
def _get_commit_file_diff(self, sha: str, filename: str) -> str:
try:
result = subprocess.run(
["git", "show", f"{sha} -- {filename}"],
cwd=self.repo,
capture_output=True,
text=True,
timeout=10,
check=False
)
return result.stdout if result.returncode == 0 else ""
except subprocess.TimeoutExpired:
return ""
def get_current_branch(self) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
cwd=self.repo,
capture_output=True,
text=True,
timeout=5,
check=False
)
return result.stdout.strip() if result.returncode == 0 else "unknown"
except subprocess.TimeoutExpired:
return "unknown"
def get_file_language(self, filename: str) -> str:
ext_map = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".go": "go",
".rs": "rust",
".java": "java",
".c": "c",
".cpp": "cpp",
".h": "c",
".hpp": "cpp",
".jsx": "javascript",
".tsx": "typescript",
}
ext = Path(filename).suffix.lower()
return ext_map.get(ext, "unknown")
def get_diff_stats(self, diff: str) -> tuple[int, int]:
additions = 0
deletions = 0
for line in diff.split("\n"):
if line.startswith("+") and not line.startswith("+++"):
additions += 1
elif line.startswith("-") and not line.startswith("---"):
deletions += 1
return additions, deletions
def get_staged_changes(path: Path | None = None) -> list[FileChange]:
repo = GitRepo(path)
return repo.get_all_staged_changes()
def get_commit_context(sha: str, path: Path | None = None) -> CommitInfo | None:
repo = GitRepo(path)
return repo.get_commit_info(sha)
def install_hook(repo_path: Path, hook_type: str = "pre-commit", content: str | None = None) -> bool:
hooks_dir = repo_path / ".git" / "hooks"
hooks_dir.mkdir(parents=True, exist_ok=True)
hook_path = hooks_dir / hook_type
if hook_path.exists():
backup_path = hooks_dir / f"{hook_type}.backup"
hook_path.rename(backup_path)
if content is None:
content = _get_default_hook_script()
try:
hook_path.write_text(content)
hook_path.chmod(0o755)
return True
except OSError:
return False
def _get_default_hook_script() -> str:
return """#!/bin/bash
# Local AI Commit Reviewer - Pre-commit Hook
# This hook runs code review on staged changes before committing
set -e
# Check if running with --no-verify
if [ "$1" = "--no-verify" ]; then
exit 0
fi
# Get the directory where this script is located
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Run the AI commit reviewer
cd "$SCRIPT_DIR/../.."
python -m aicr review --hook || exit 1
"""

View File

@@ -0,0 +1,3 @@
from .hooks import check_hook_installed, install_pre_commit_hook
__all__ = ["check_hook_installed", "install_pre_commit_hook"]

View File

@@ -0,0 +1,69 @@
from pathlib import Path
def get_hook_script() -> str:
return """#!/bin/bash
# Local AI Commit Reviewer - Pre-commit Hook
# Automatically reviews staged changes before committing
set -e
# Allow bypass with --no-verify
if [ "$1" = "--no-verify" ]; then
exit 0
fi
# Get the directory where this script is located
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Change to repository root
cd "$SCRIPT_DIR/../.."
# Run the AI commit reviewer
python -m aicr review --hook --strictness balanced || exit 1
"""
def install_pre_commit_hook(
repo_path: Path,
content: str | None = None,
force: bool = False
) -> bool:
hooks_dir = repo_path / ".git" / "hooks"
hooks_dir.mkdir(parents=True, exist_ok=True)
hook_path = hooks_dir / "pre-commit"
if hook_path.exists() and not force:
return False
if content is None:
content = get_hook_script()
try:
hook_path.write_text(content)
hook_path.chmod(0o755)
return True
except OSError:
return False
def check_hook_installed(repo_path: Path) -> bool:
hook_path = repo_path / ".git" / "hooks" / "pre-commit"
if not hook_path.exists():
return False
content = hook_path.read_text()
return "aicr" in content or "local-ai-commit-reviewer" in content
def uninstall_hook(repo_path: Path) -> bool:
hook_path = repo_path / ".git" / "hooks" / "pre-commit"
if not hook_path.exists():
return True
try:
hook_path.unlink()
return True
except OSError:
return False

View File

@@ -0,0 +1,4 @@
from .ollama import OllamaProvider
from .provider import LLMProvider
__all__ = ["LLMProvider", "OllamaProvider"]

View File

@@ -0,0 +1,143 @@
import asyncio
from collections.abc import AsyncIterator
from datetime import datetime
import ollama
from .provider import LLMProvider, LLMResponse, ModelInfo
class OllamaProvider(LLMProvider):
def __init__(
self,
endpoint: str = "http://localhost:11434",
model: str = "codellama",
timeout: int = 120
):
self.endpoint = endpoint
self.model = model
self.timeout = timeout
self._client: ollama.Client | None = None
@property
def client(self) -> ollama.Client:
if self._client is None:
self._client = ollama.Client(host=self.endpoint)
return self._client
def is_available(self) -> bool:
try:
self.health_check()
return True
except Exception:
return False
def health_check(self) -> bool:
try:
response = self.client.ps()
return response is not None
except Exception as e:
raise ConnectionError(f"Ollama health check failed: {e}") from None
def generate(self, prompt: str, **kwargs) -> LLMResponse:
try:
max_tokens = kwargs.get("max_tokens", 2048)
temperature = kwargs.get("temperature", 0.3)
response = self.client.chat(
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful code review assistant. Provide concise, constructive feedback on code changes."},
{"role": "user", "content": prompt}
],
options={
"num_predict": max_tokens,
"temperature": temperature,
},
stream=False
)
return LLMResponse(
text=response["message"]["content"],
model=self.model,
tokens_used=response.get("eval_count", 0),
finish_reason=response.get("done_reason", "stop")
)
except Exception as e:
raise RuntimeError(f"Ollama generation failed: {e}") from None
async def agenerate(self, prompt: str, **kwargs) -> LLMResponse:
try:
max_tokens = kwargs.get("max_tokens", 2048)
temperature = kwargs.get("temperature", 0.3)
response = await asyncio.to_thread(
self.client.chat,
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful code review assistant. Provide concise, constructive feedback on code changes."},
{"role": "user", "content": prompt}
],
options={
"num_predict": max_tokens,
"temperature": temperature,
},
stream=False
)
return LLMResponse(
text=response["message"]["content"],
model=self.model,
tokens_used=response.get("eval_count", 0),
finish_reason=response.get("done_reason", "stop")
)
except Exception as e:
raise RuntimeError(f"Ollama async generation failed: {e}") from None
async def stream_generate(self, prompt: str, **kwargs) -> AsyncIterator[str]: # type: ignore[misc]
try:
max_tokens = kwargs.get("max_tokens", 2048)
temperature = kwargs.get("temperature", 0.3)
response = self.client.chat(
model=self.model,
messages=[
{"role": "system", "content": "You are a helpful code review assistant. Provide concise, constructive feedback on code changes."},
{"role": "user", "content": prompt}
],
options={
"num_predict": max_tokens,
"temperature": temperature,
},
stream=True
)
for chunk in response:
if "message" in chunk and "content" in chunk["message"]:
yield chunk["message"]["content"]
except Exception as e:
raise RuntimeError(f"Ollama streaming failed: {e}") from None
def list_models(self) -> list[ModelInfo]:
try:
response = self.client.ps()
models = []
if response and "models" in response:
for model in response["models"]:
models.append(ModelInfo(
name=model.get("name", "unknown"),
size=model.get("size", "unknown"),
modified=model.get("modified", datetime.now().isoformat()),
digest=model.get("digest", "")
))
return models
except Exception:
return []
def pull_model(self, model_name: str) -> bool:
try:
for _ in self.client.pull(model_name, stream=True):
pass
return True
except Exception:
return False

View File

@@ -0,0 +1,45 @@
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from dataclasses import dataclass
@dataclass
class LLMResponse:
text: str
model: str
tokens_used: int
finish_reason: str
@dataclass
class ModelInfo:
name: str
size: str
modified: str
digest: str
class LLMProvider(ABC):
@abstractmethod
def is_available(self) -> bool:
pass
@abstractmethod
def generate(self, prompt: str, **kwargs) -> LLMResponse:
pass
@abstractmethod
async def agenerate(self, prompt: str, **kwargs) -> LLMResponse:
pass
@abstractmethod
def stream_generate(self, prompt: str, **kwargs) -> AsyncIterator[str]:
pass
@abstractmethod
def list_models(self) -> list[ModelInfo]:
pass
@abstractmethod
def health_check(self) -> bool:
pass

View File

@@ -0,0 +1,133 @@
class ReviewPromptTemplates:
base_prompt: str = """You are an expert code reviewer analyzing staged changes in a Git repository.
Review the following code changes and provide detailed feedback on:
1. Potential bugs and security vulnerabilities
2. Code style and best practices violations
3. Performance concerns
4. Documentation issues
5. Suggestions for improvement
Respond in the following JSON format:
{{
"issues": [
{{
"file": "filename",
"line": line_number,
"severity": "critical|warning|info",
"category": "bug|security|style|performance|documentation",
"message": "description of the issue",
"suggestion": "suggested fix (if applicable)"
}}
],
"summary": {{
"critical_count": number,
"warning_count": number,
"info_count": number,
"overall_assessment": "brief summary"
}}
}}
Only include issues that match the strictness level: {strictness}
{strictness_settings}
Review the following diff:
```
{diff}
```
"""
permissive_settings: str = """Strictness: PERMISSIVE
- Only report critical security issues
- Only report definite bugs (not potential issues)
- Ignore style and formatting issues
- Ignore performance concerns
- Ignore documentation issues
"""
balanced_settings: str = """Strictness: BALANCED
- Report all security issues
- Report all definite bugs and potential bugs
- Report major style violations
- Ignore minor performance concerns
- Ignore documentation issues unless critical
"""
strict_settings: str = """Strictness: STRICT
- Report all security issues (even minor)
- Report all bugs (definite and potential)
- Report all style violations
- Report performance concerns
- Report documentation issues
- Suggest specific improvements
"""
@classmethod
def get_prompt(cls, diff: str, strictness: str = "balanced", language: str = "unknown") -> str:
settings_map = {
"permissive": cls.permissive_settings,
"balanced": cls.balanced_settings,
"strict": cls.strict_settings
}
settings = settings_map.get(strictness.lower(), cls.balanced_settings)
base = cls.base_prompt.format(
strictness=strictness.upper(),
strictness_settings=settings,
diff=diff
)
if language != "unknown":
base += f"\n\nNote: This code is in {language}. Apply {language}-specific best practices."
return base
@classmethod
def get_commit_review_prompt(cls, diff: str, commit_message: str, strictness: str = "balanced") -> str:
prompt = f"""Review the following commit with message: "{commit_message}"
Analyze whether the changes align with the commit message and provide feedback.
"""
prompt += cls.get_prompt(diff, strictness)
return prompt
@classmethod
def get_security_review_prompt(cls, diff: str) -> str:
template = """You are a security expert reviewing code changes for vulnerabilities.
Focus specifically on:
1. Injection vulnerabilities (SQL, command, code injection)
2. Authentication and authorization issues
3. Sensitive data exposure
4. Cryptographic weaknesses
5. Path traversal and file inclusion
6. Dependency security issues
Provide findings in JSON format:
```
{{
"vulnerabilities": [
{{
"file": "filename",
"line": line_number,
"severity": "critical|high|medium|low",
"type": "vulnerability type",
"description": "detailed description",
"exploit_scenario": "how it could be exploited",
"fix": "recommended fix"
}}
],
"secure_patterns": ["list of good security practices observed"],
"concerns": ["list of potential security concerns"]
}}
```
Review the following diff:
```
{diff}
```
"""
return template.format(diff=diff)

View File

@@ -0,0 +1,3 @@
from .utils import get_file_language, sanitize_output, setup_logging
__all__ = ["get_file_language", "sanitize_output", "setup_logging"]

View File

@@ -0,0 +1,66 @@
import logging
import sys
from pathlib import Path
def setup_logging(level: str = "info", log_file: str | None = None) -> logging.Logger:
logger = logging.getLogger("aicr")
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def get_file_language(filename: str) -> str:
ext_map = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".go": "go",
".rs": "rust",
".java": "java",
".c": "c",
".cpp": "cpp",
".h": "c",
".hpp": "cpp",
".jsx": "javascript",
".tsx": "typescript",
".rb": "ruby",
".php": "php",
".swift": "swift",
".kt": "kotlin",
".scala": "scala",
}
ext = Path(filename).suffix.lower()
return ext_map.get(ext, "unknown")
def sanitize_output(text: str) -> str:
return text.strip()
def truncate_text(text: str, max_length: int = 2000, suffix: str = "...") -> str:
if len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
def format_file_size(size: float) -> str:
KB_SIZE = 1024
for unit in ["B", "KB", "MB", "GB"]:
if size < KB_SIZE:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}TB"