diff --git a/loglens/cli/commands.py b/loglens/cli/commands.py index 709da58..486f174 100644 --- a/loglens/cli/commands.py +++ b/loglens/cli/commands.py @@ -1,5 +1,3 @@ -'''Click CLI commands for LogLens.''' - import logging import sys import time @@ -16,7 +14,7 @@ from loglens.parsers.base import LogFormat def setup_logging(verbosity: int = 0) -> None: - '''Setup logging configuration.''' + """Setup logging configuration.""" log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"] level_idx = min(verbosity, len(log_levels) - 1) level = log_levels[level_idx] @@ -40,291 +38,13 @@ def setup_logging(verbosity: int = 0) -> None: @click.group() @click.option("--verbosity", "-v", count=True, help="Increase output verbosity") @click.option("--config", type=click.Path(exists=True), help="Path to config file") +@click.version_option(version="0.1.0", prog_name="loglens") @click.pass_context def main(ctx: click.Context, verbosity: int, config: str) -> None: - '''LogLens - Parse, analyze, and summarize log files.''' - setup_logging(verbosity) - ctx.ensure_object(dict) - ctx.obj["config"] = config - - -@main.command("analyze") -@click.argument("files", type=click.Path(exists=True), nargs=-1) -@click.option( - "--format", - type=click.Choice(["json", "syslog", "apache", "auto"]), - default="auto", - help="Log format (auto-detect by default)", -) -@click.option( - "--output", type=click.Choice(["table", "json", "text"]), default="table", help="Output format" -) -@click.option("--follow/--no-follow", default=False, help="Follow file changes") -@click.option("--max-entries", type=int, default=100, help="Maximum entries to display") -@click.option( - "--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)" -) -@click.pass_context -def analyze( - ctx: click.Context, - files: tuple, - format: str, - output: str, - follow: bool, - max_entries: int, - json: bool, -) -> None: - '''Analyze log files and display summary.''' - if json: - output = "json" - - if not files and not sys.stdin.isatty(): - lines = sys.stdin.readlines() - _analyze_lines(lines, format, output, max_entries) - elif not files: - click.echo("Error: No log files specified. Use FILE or pipe data from stdin.") - click.echo("Example: cat logfile.txt | loglens analyze") - click.echo(" loglens analyze /var/log/syslog") - ctx.exit(1) - else: - for file_path in files: - _analyze_file(file_path, format, output, max_entries, follow) - - -def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None: - '''Analyze lines from stdin.''' - format_enum = None if format_str == "auto" else LogFormat(format_str) - analyzer = LogAnalyzer() - - if format_enum is None: - result = analyzer.analyze(lines) - else: - result = analyzer.analyze(lines, format_enum) - - _display_result(result, output, max_entries) - - -def _analyze_file( - file_path: str, format_str: str, output: str, max_entries: int, follow: bool -) -> None: - '''Analyze a single file.''' - format_enum = None if format_str == "auto" else LogFormat(format_str) - analyzer = LogAnalyzer() - - if follow: - _follow_file(file_path, analyzer, format_enum, output, max_entries) - else: - result = analyzer.analyze_file(file_path, format_enum) - _display_result(result, output, max_entries) - - -def _follow_file( - file_path: str, - analyzer: LogAnalyzer, - format: Optional[LogFormat], - output: str, - max_entries: int, -) -> None: - '''Follow a file and analyze in real-time.''' - with open(file_path) as f: - f.seek(0, 2) - buffer = [] - - click.echo(f"Following {file_path}... (Ctrl+C to stop)") - - try: - while True: - line = f.readline() - if line: - buffer.append(line) - if len(buffer) >= 100: - result = analyzer.analyze(buffer, format) - _display_result(result, output, max_entries) - buffer = [] - else: - time.sleep(0.5) - except KeyboardInterrupt: - if buffer: - result = analyzer.analyze(buffer, format) - _display_result(result, output, max_entries) - - -def _display_result(result, output: str, max_entries: int) -> None: - '''Display analysis result.''' - if output == "json": - formatter = JSONFormatter() - click.echo(formatter.format(result)) - elif output == "text": - formatter = TextFormatter() - click.echo(formatter.format(result)) - else: - formatter = TableFormatter(max_entries=max_entries) - formatter.format(result) - - -@main.command("watch") -@click.argument("files", type=click.Path(exists=True), nargs=-1) -@click.option( - "--format", - type=click.Choice(["json", "syslog", "apache", "auto"]), - default="auto", - help="Log format", -) -@click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds") -@click.option("--max-entries", type=int, default=50, help="Maximum entries per update") -@click.pass_context -def watch(ctx: click.Context, files: tuple, format: str, interval: float, max_entries: int) -> None: - '''Watch log files and display live updates.''' - if not files: - click.echo("Error: No files specified for watching.") - ctx.exit(1) - - format_enum = None if format == "auto" else LogFormat(format) - analyzer = LogAnalyzer() - - click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.") - - try: - while True: - for file_path in files: - result = analyzer.analyze_file(file_path, format_enum) - - click.clear() - click.echo(f"=== {file_path} ===") - formatter = TableFormatter(max_entries=max_entries) - formatter.format(result.entries) - - time.sleep(interval) - except KeyboardInterrupt: - click.echo("\nStopped watching.") - - -@main.command("report") -@click.argument("files", type=click.Path(exists=True), nargs=-1) -@click.option( - "--format", - type=click.Choice(["json", "syslog", "apache", "auto"]), - default="auto", - help="Log format", -) -@click.option("--output", type=click.Path(), help="Output file path (default: stdout)") -@click.option("--json/--no-json", default=False, help="Output as JSON") -@click.pass_context -def report( - ctx: click.Context, files: tuple, format: str, output: Optional[str], json: bool -) -> None: - '''Generate detailed analysis report.''' - if not files: - click.echo("Error: No log files specified.") - ctx.exit(1) - - format_enum = None if format == "auto" else LogFormat(format) - analyzer = LogAnalyzer() - - all_results = [] - for file_path in files: - result = analyzer.analyze_file(file_path, format_enum) - all_results.append((file_path, result)) - - if json: - formatter = JSONFormatter() - report_data = { - "files_analyzed": len(files), - "results": [{"file": path, "analysis": result} for path, result in all_results], - } - report_text = formatter.format(report_data) - else: - lines = [] - lines.append("=" * 60) - lines.append("LOG ANALYSIS REPORT") - lines.append("=" * 60) - lines.append(f"Files Analyzed: {len(files)}") - lines.append("") - - for file_path, result in all_results: - lines.append(f"=== {file_path} ===") - lines.append(f"Total Lines: {result.total_lines}") - lines.append(f"Format: {result.format_detected.value}") - lines.append( - f"Critical: {result.critical_count} | Error: {result.error_count} | " - f"Warning: {result.warning_count} | Info: {result.debug_count}" - ) - lines.append("") - - if result.suggestions: - lines.append("Suggestions:") - for i, suggestion in enumerate(result.suggestions, 1): - lines.append(f" {i}. {suggestion}") - lines.append("") - - report_text = "\n".join(lines) - - if output: - with open(output, "w") as f: - f.write(report_text) - click.echo(f"Report written to {output}") - else: - click.echo(report_text) - - -@main.command("patterns") -@click.option("--group", help="Filter by pattern group") -@click.option( - "--severity", - type=click.Choice(["critical", "error", "warning", "info", "debug"]), - help="Filter by severity", -) -@click.pass_context -def patterns(ctx: click.Context, group: str, severity: str) -> None: - '''List available error detection patterns.''' - analyzer = LogAnalyzer() - patterns_by_group = analyzer.list_patterns_by_group() - - if group: - if group in patterns_by_group: - patterns_to_show = {group: patterns_by_group[group]} - else: - click.echo(f"Unknown group: {group}") - ctx.exit(1) - else: - patterns_to_show = patterns_by_group - - formatter = TableFormatter() - formatter.console.print("[bold]Available Error Patterns[/]") - - for group_name, patterns in patterns_to_show.items(): - formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]") - for pattern in patterns: - severity_color = { - "critical": "red", - "error": "red", - "warning": "yellow", - "info": "blue", - "debug": "grey", - }.get(pattern["severity"], "white") - formatter.console.print( - f" [bold]{pattern['name']}[/] " f"[{severity_color}]({pattern['severity']})[/]" - ) - if pattern["description"]: - formatter.console.print(f" {pattern['description']}") - - -@main.command("info") -@click.pass_context -def info(ctx: click.Context) -> None: - '''Display LogLens information.''' + """LogLens - Parse, analyze, and summarize log files.""" from loglens import __version__ - click.echo(f"LogLens CLI v{__version__}") - click.echo("") - click.echo("Supported log formats:") - click.echo(" - JSON (JSON Lines, arrays)") - click.echo(" - Syslog (RFC 3164, RFC 5424)") - click.echo(" - Apache/Nginx (Common, Combined, Custom)") - click.echo("") - click.echo("Commands:") - click.echo(" analyze - Analyze log files") - click.echo(" watch - Watch files in real-time") - click.echo(" report - Generate detailed report") - click.echo(" patterns - List error patterns") - click.echo(" info - Show this information") + ctx.ensure_object(dict) + ctx.obj["config"] = config + ctx.obj["version"] = __version__ + setup_logging(verbosity)