From b886ea10e050a1b5346302b5f8449987cf17af6f Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 10:05:24 +0000 Subject: [PATCH] Add loglens package files (parsers, cli, config) --- loglens/cli/commands.py | 284 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) diff --git a/loglens/cli/commands.py b/loglens/cli/commands.py index 486f174..ece8b58 100644 --- a/loglens/cli/commands.py +++ b/loglens/cli/commands.py @@ -1,3 +1,5 @@ +"""Click CLI commands for LogLens.""" + import logging import sys import time @@ -48,3 +50,285 @@ def main(ctx: click.Context, verbosity: int, config: str) -> None: ctx.obj["config"] = config ctx.obj["version"] = __version__ setup_logging(verbosity) + + +@main.command("analyze") +@click.argument("files", type=click.Path(exists=True), nargs=-1) +@click.option( + "--format", + type=click.Choice(["json", "syslog", "apache", "auto"]), + default="auto", + help="Log format (auto-detect by default)", +) +@click.option( + "--output", type=click.Choice(["table", "json", "text"]), default="table", help="Output format" +) +@click.option("--follow/--no-follow", default=False, help="Follow file changes") +@click.option("--max-entries", type=int, default=100, help="Maximum entries to display") +@click.option( + "--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)" +) +@click.pass_context +def analyze( + ctx: click.Context, + files: tuple, + format: str, + output: str, + follow: bool, + max_entries: int, + json: bool, +) -> None: + """Analyze log files and display summary.""" + if json: + output = "json" + + if not files and not sys.stdin.isatty(): + lines = sys.stdin.readlines() + _analyze_lines(lines, format, output, max_entries) + elif not files: + click.echo("Error: No log files specified. Use FILE or pipe data from stdin.") + click.echo("Example: cat logfile.txt | loglens analyze") + click.echo(" loglens analyze /var/log/syslog") + ctx.exit(1) + else: + for file_path in files: + _analyze_file(file_path, format, output, max_entries, follow) + + +def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None: + """Analyze lines from stdin.""" + format_enum = None if format_str == "auto" else LogFormat(format_str) + analyzer = LogAnalyzer() + + if format_enum is None: + result = analyzer.analyze(lines) + else: + result = analyzer.analyze(lines, format_enum) + + _display_result(result, output, max_entries) + + +def _analyze_file( + file_path: str, format_str: str, output: str, max_entries: int, follow: bool +) -> None: + """Analyze a single file.""" + format_enum = None if format_str == "auto" else LogFormat(format_str) + analyzer = LogAnalyzer() + + if follow: + _follow_file(file_path, analyzer, format_enum, output, max_entries) + else: + result = analyzer.analyze_file(file_path, format_enum) + _display_result(result, output, max_entries) + + +def _follow_file( + file_path: str, + analyzer: LogAnalyzer, + format: Optional[LogFormat], + output: str, + max_entries: int, +) -> None: + """Follow a file and analyze in real-time.""" + with open(file_path) as f: + f.seek(0, 2) + buffer = [] + + click.echo(f"Following {file_path}... (Ctrl+C to stop)") + + try: + while True: + line = f.readline() + if line: + buffer.append(line) + if len(buffer) >= 100: + result = analyzer.analyze(buffer, format) + _display_result(result, output, max_entries) + buffer = [] + else: + time.sleep(0.5) + except KeyboardInterrupt: + if buffer: + result = analyzer.analyze(buffer, format) + _display_result(result, output, max_entries) + + +def _display_result(result, output: str, max_entries: int) -> None: + """Display analysis result.""" + if output == "json": + formatter = JSONFormatter() + click.echo(formatter.format(result)) + elif output == "text": + formatter = TextFormatter() + click.echo(formatter.format(result)) + else: + formatter = TableFormatter(max_entries=max_entries) + formatter.format(result) + + +@main.command("watch") +@click.argument("files", type=click.Path(exists=True), nargs=-1) +@click.option( + "--format", + type=click.Choice(["json", "syslog", "apache", "auto"]), + default="auto", + help="Log format", +) +@click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds") +@click.option("--max-entries", type=int, default=50, help="Maximum entries per update") +@click.pass_context +def watch(ctx: click.Context, files: tuple, format: str, interval: float, max_entries: int) -> None: + """Watch log files and display live updates.""" + if not files: + click.echo("Error: No files specified for watching.") + ctx.exit(1) + + format_enum = None if format == "auto" else LogFormat(format) + analyzer = LogAnalyzer() + + click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.") + + try: + while True: + for file_path in files: + result = analyzer.analyze_file(file_path, format_enum) + + click.clear() + click.echo(f"=== {file_path} ===") + formatter = TableFormatter(max_entries=max_entries) + formatter.format(result.entries) + + time.sleep(interval) + except KeyboardInterrupt: + click.echo("\nStopped watching.") + + +@main.command("report") +@click.argument("files", type=click.Path(exists=True), nargs=-1) +@click.option( + "--format", + type=click.Choice(["json", "syslog", "apache", "auto"]), + default="auto", + help="Log format", +) +@click.option("--output", type=click.Path(), help="Output file path (default: stdout)") +@click.option("--json/--no-json", default=False, help="Output as JSON") +@click.pass_context +def report( + ctx: click.Context, files: tuple, format: str, output: Optional[str], json: bool +) -> None: + """Generate detailed analysis report.""" + if not files: + click.echo("Error: No log files specified.") + ctx.exit(1) + + format_enum = None if format == "auto" else LogFormat(format) + analyzer = LogAnalyzer() + + all_results = [] + for file_path in files: + result = analyzer.analyze_file(file_path, format_enum) + all_results.append((file_path, result)) + + if json: + formatter = JSONFormatter() + report_data = { + "files_analyzed": len(files), + "results": [{"file": path, "analysis": result} for path, result in all_results], + } + report_text = formatter.format(report_data) + else: + lines = [] + lines.append("=" * 60) + lines.append("LOG ANALYSIS REPORT") + lines.append("=" * 60) + lines.append(f"Files Analyzed: {len(files)}") + lines.append("") + + for file_path, result in all_results: + lines.append(f"=== {file_path} ===") + lines.append(f"Total Lines: {result.total_lines}") + lines.append(f"Format: {result.format_detected.value}") + lines.append( + f"Critical: {result.critical_count} | Error: {result.error_count} | " + f"Warning: {result.warning_count} | Info: {result.debug_count}" + ) + lines.append("") + + if result.suggestions: + lines.append("Suggestions:") + for i, suggestion in enumerate(result.suggestions, 1): + lines.append(f" {i}. {suggestion}") + lines.append("") + + report_text = "\n".join(lines) + + if output: + with open(output, "w") as f: + f.write(report_text) + click.echo(f"Report written to {output}") + else: + click.echo(report_text) + + +@main.command("patterns") +@click.option("--group", help="Filter by pattern group") +@click.option( + "--severity", + type=click.Choice(["critical", "error", "warning", "info", "debug"]), + help="Filter by severity", +) +@click.pass_context +def patterns(ctx: click.Context, group: str, severity: str) -> None: + """List available error detection patterns.""" + analyzer = LogAnalyzer() + patterns_by_group = analyzer.list_patterns_by_group() + + if group: + if group in patterns_by_group: + patterns_to_show = {group: patterns_by_group[group]} + else: + click.echo(f"Unknown group: {group}") + ctx.exit(1) + else: + patterns_to_show = patterns_by_group + + formatter = TableFormatter() + formatter.console.print("[bold]Available Error Patterns[/]") + + for group_name, patterns in patterns_to_show.items(): + formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]") + for pattern in patterns: + severity_color = { + "critical": "red", + "error": "red", + "warning": "yellow", + "info": "blue", + "debug": "grey", + }.get(pattern["severity"], "white") + formatter.console.print( + f" [bold]{pattern['name']}[/] " f"[{severity_color}]({pattern['severity']})[/]" + ) + if pattern["description"]: + formatter.console.print(f" {pattern['description']}") + + +@main.command("info") +@click.pass_context +def info(ctx: click.Context) -> None: + """Display LogLens information.""" + from loglens import __version__ + + click.echo(f"LogLens CLI v{__version__}") + click.echo("") + click.echo("Supported log formats:") + click.echo(" - JSON (JSON Lines, arrays)") + click.echo(" - Syslog (RFC 3164, RFC 5424)") + click.echo(" - Apache/Nginx (Common, Combined, Custom)") + click.echo("") + click.echo("Commands:") + click.echo(" analyze - Analyze log files") + click.echo(" watch - Watch files in real-time") + click.echo(" report - Generate detailed report") + click.echo(" patterns - List error patterns") + click.echo(" info - Show this information")