Files
loglens-cli/loglens/cli/commands.py
7000pctAUTO 977f849282
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
Add CLI commands and main entry point
2026-02-02 08:01:17 +00:00

318 lines
11 KiB
Python

"""Click CLI commands for LogLens."""
import sys
import time
import logging
from typing import Optional
import click
from colorlog import ColoredFormatter
from loglens.parsers.base import LogFormat
from loglens.analyzers.analyzer import LogAnalyzer
from loglens.formatters.table_formatter import TableFormatter
from loglens.formatters.json_formatter import JSONFormatter
from loglens.formatters.text_formatter import TextFormatter
def setup_logging(verbosity: int = 0) -> None:
"""Setup logging configuration."""
log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"]
level_idx = min(verbosity, len(log_levels) - 1)
level = log_levels[level_idx]
handler = ColoredFormatter(
"%(log_color)s%(levelname)-8s%(reset)s %(message)s",
log_colors={
"DEBUG": "cyan",
"INFO": "green",
"WARNING": "yellow",
"ERROR": "red",
"CRITICAL": "red,bg_white",
}
)
logger = logging.getLogger("loglens")
logger.setLevel(level)
logger.handlers = [handler]
@click.group()
@click.option("--verbosity", "-v", count=True, help="Increase output verbosity")
@click.option("--config", type=click.Path(exists=True), help="Path to config file")
@click.pass_context
def main(ctx: click.Context, verbosity: int, config: str) -> None:
"""LogLens - Parse, analyze, and summarize log files."""
setup_logging(verbosity)
ctx.ensure_object(dict)
ctx.obj["config"] = config
@main.command("analyze")
@click.argument("files", type=click.Path(exists=True), nargs=-1)
@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]),
default="auto", help="Log format (auto-detect by default)")
@click.option("--output", type=click.Choice(["table", "json", "text"]), default="table",
help="Output format")
@click.option("--follow/--no-follow", default=False, help="Follow file changes")
@click.option("--max-entries", type=int, default=100, help="Maximum entries to display")
@click.option("--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)")
@click.pass_context
def analyze(
ctx: click.Context,
files: tuple,
format: str,
output: str,
follow: bool,
max_entries: int,
json: bool
) -> None:
"""Analyze log files and display summary."""
if json:
output = "json"
if not files and not sys.stdin.isatty():
lines = sys.stdin.readlines()
_analyze_lines(lines, format, output, max_entries)
elif not files:
click.echo("Error: No log files specified. Use FILE or pipe data from stdin.")
click.echo("Example: cat logfile.txt | loglens analyze")
click.echo(" loglens analyze /var/log/syslog")
ctx.exit(1)
else:
for file_path in files:
_analyze_file(file_path, format, output, max_entries, follow)
def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None:
"""Analyze lines from stdin."""
format_enum = None if format_str == "auto" else LogFormat(format_str)
analyzer = LogAnalyzer()
if format_enum is None:
result = analyzer.analyze(lines)
else:
result = analyzer.analyze(lines, format_enum)
_display_result(result, output, max_entries)
def _analyze_file(file_path: str, format_str: str, output: str, max_entries: int, follow: bool) -> None:
"""Analyze a single file."""
format_enum = None if format_str == "auto" else LogFormat(format_str)
analyzer = LogAnalyzer()
if follow:
_follow_file(file_path, analyzer, format_enum, output, max_entries)
else:
result = analyzer.analyze_file(file_path, format_enum)
_display_result(result, output, max_entries)
def _follow_file(file_path: str, analyzer: LogAnalyzer, format: Optional[LogFormat],
output: str, max_entries: int) -> None:
"""Follow a file and analyze in real-time."""
with open(file_path, "r") as f:
f.seek(0, 2)
buffer = []
click.echo(f"Following {file_path}... (Ctrl+C to stop)")
try:
while True:
line = f.readline()
if line:
buffer.append(line)
if len(buffer) >= 100:
result = analyzer.analyze(buffer, format)
_display_result(result, output, max_entries)
buffer = []
else:
time.sleep(0.5)
except KeyboardInterrupt:
if buffer:
result = analyzer.analyze(buffer, format)
_display_result(result, output, max_entries)
def _display_result(result, output: str, max_entries: int) -> None:
"""Display analysis result."""
if output == "json":
formatter = JSONFormatter()
click.echo(formatter.format(result))
elif output == "text":
formatter = TextFormatter()
click.echo(formatter.format(result))
else:
formatter = TableFormatter(max_entries=max_entries)
formatter.format(result)
@main.command("watch")
@click.argument("files", type=click.Path(exists=True), nargs=-1)
@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]),
default="auto", help="Log format")
@click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds")
@click.option("--max-entries", type=int, default=50, help="Maximum entries per update")
@click.pass_context
def watch(
ctx: click.Context,
files: tuple,
format: str,
interval: float,
max_entries: int
) -> None:
"""Watch log files and display live updates."""
if not files:
click.echo("Error: No files specified for watching.")
ctx.exit(1)
format_enum = None if format == "auto" else LogFormat(format)
analyzer = LogAnalyzer()
click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.")
try:
while True:
for file_path in files:
result = analyzer.analyze_file(file_path, format_enum)
click.clear()
click.echo(f"=== {file_path} ===")
formatter = TableFormatter(max_entries=max_entries)
formatter.format(result.entries)
time.sleep(interval)
except KeyboardInterrupt:
click.echo("\nStopped watching.")
@main.command("report")
@click.argument("files", type=click.Path(exists=True), nargs=-1)
@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]),
default="auto", help="Log format")
@click.option("--output", type=click.Path(), help="Output file path (default: stdout)")
@click.option("--json/--no-json", default=False, help="Output as JSON")
@click.pass_context
def report(
ctx: click.Context,
files: tuple,
format: str,
output: Optional[str],
json: bool
) -> None:
"""Generate detailed analysis report."""
if not files:
click.echo("Error: No log files specified.")
ctx.exit(1)
format_enum = None if format == "auto" else LogFormat(format)
analyzer = LogAnalyzer()
all_results = []
for file_path in files:
result = analyzer.analyze_file(file_path, format_enum)
all_results.append((file_path, result))
if json:
formatter = JSONFormatter()
report_data = {
"files_analyzed": len(files),
"results": [
{"file": path, "analysis": result}
for path, result in all_results
]
}
report_text = formatter.format(report_data)
else:
lines = []
lines.append("=" * 60)
lines.append("LOG ANALYSIS REPORT")
lines.append("=" * 60)
lines.append(f"Files Analyzed: {len(files)}")
lines.append("")
for file_path, result in all_results:
lines.append(f"=== {file_path} ===")
lines.append(f"Total Lines: {result.total_lines}")
lines.append(f"Format: {result.format_detected.value}")
lines.append(f"Critical: {result.critical_count} | Error: {result.error_count} | "
f"Warning: {result.warning_count} | Info: {result.debug_count}")
lines.append("")
if result.suggestions:
lines.append("Suggestions:")
for i, suggestion in enumerate(result.suggestions, 1):
lines.append(f" {i}. {suggestion}")
lines.append("")
report_text = "\n".join(lines)
if output:
with open(output, "w") as f:
f.write(report_text)
click.echo(f"Report written to {output}")
else:
click.echo(report_text)
@main.command("patterns")
@click.option("--group", help="Filter by pattern group")
@click.option("--severity", type=click.Choice(["critical", "error", "warning", "info", "debug"]),
help="Filter by severity")
@click.pass_context
def patterns(ctx: click.Context, group: str, severity: str) -> None:
"""List available error detection patterns."""
analyzer = LogAnalyzer()
patterns_by_group = analyzer.list_patterns_by_group()
if group:
if group in patterns_by_group:
patterns_to_show = {group: patterns_by_group[group]}
else:
click.echo(f"Unknown group: {group}")
ctx.exit(1)
else:
patterns_to_show = patterns_by_group
formatter = TableFormatter()
formatter.console.print("[bold]Available Error Patterns[/]")
for group_name, patterns in patterns_to_show.items():
formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]")
for pattern in patterns:
severity_color = {
"critical": "red",
"error": "red",
"warning": "yellow",
"info": "blue",
"debug": "grey"
}.get(pattern["severity"], "white")
formatter.console.print(
f" [bold]{pattern['name']}[/] "
f"[{severity_color}]({pattern['severity']})[/]"
)
if pattern["description"]:
formatter.console.print(f" {pattern['description']}")
@main.command("info")
@click.pass_context
def info(ctx: click.Context) -> None:
"""Display LogLens information."""
from loglens import __version__
click.echo(f"LogLens CLI v{__version__}")
click.echo("")
click.echo("Supported log formats:")
click.echo(" - JSON (JSON Lines, arrays)")
click.echo(" - Syslog (RFC 3164, RFC 5424)")
click.echo(" - Apache/Nginx (Common, Combined, Custom)")
click.echo("")
click.echo("Commands:")
click.echo(" analyze - Analyze log files")
click.echo(" watch - Watch files in real-time")
click.echo(" report - Generate detailed report")
click.echo(" patterns - List error patterns")
click.echo(" info - Show this information")