diff --git a/loglens/cli/commands.py b/loglens/cli/commands.py new file mode 100644 index 0000000..4f2856c --- /dev/null +++ b/loglens/cli/commands.py @@ -0,0 +1,317 @@ +"""Click CLI commands for LogLens.""" + +import sys +import time +import logging +from typing import Optional + +import click +from colorlog import ColoredFormatter + +from loglens.parsers.base import LogFormat +from loglens.analyzers.analyzer import LogAnalyzer +from loglens.formatters.table_formatter import TableFormatter +from loglens.formatters.json_formatter import JSONFormatter +from loglens.formatters.text_formatter import TextFormatter + + +def setup_logging(verbosity: int = 0) -> None: + """Setup logging configuration.""" + log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"] + level_idx = min(verbosity, len(log_levels) - 1) + level = log_levels[level_idx] + + handler = ColoredFormatter( + "%(log_color)s%(levelname)-8s%(reset)s %(message)s", + log_colors={ + "DEBUG": "cyan", + "INFO": "green", + "WARNING": "yellow", + "ERROR": "red", + "CRITICAL": "red,bg_white", + } + ) + + logger = logging.getLogger("loglens") + logger.setLevel(level) + logger.handlers = [handler] + + +@click.group() +@click.option("--verbosity", "-v", count=True, help="Increase output verbosity") +@click.option("--config", type=click.Path(exists=True), help="Path to config file") +@click.pass_context +def main(ctx: click.Context, verbosity: int, config: str) -> None: + """LogLens - Parse, analyze, and summarize log files.""" + setup_logging(verbosity) + ctx.ensure_object(dict) + ctx.obj["config"] = config + + +@main.command("analyze") +@click.argument("files", type=click.Path(exists=True), nargs=-1) +@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]), + default="auto", help="Log format (auto-detect by default)") +@click.option("--output", type=click.Choice(["table", "json", "text"]), default="table", + help="Output format") +@click.option("--follow/--no-follow", default=False, help="Follow file changes") +@click.option("--max-entries", type=int, default=100, help="Maximum entries to display") +@click.option("--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)") +@click.pass_context +def analyze( + ctx: click.Context, + files: tuple, + format: str, + output: str, + follow: bool, + max_entries: int, + json: bool +) -> None: + """Analyze log files and display summary.""" + if json: + output = "json" + + if not files and not sys.stdin.isatty(): + lines = sys.stdin.readlines() + _analyze_lines(lines, format, output, max_entries) + elif not files: + click.echo("Error: No log files specified. Use FILE or pipe data from stdin.") + click.echo("Example: cat logfile.txt | loglens analyze") + click.echo(" loglens analyze /var/log/syslog") + ctx.exit(1) + else: + for file_path in files: + _analyze_file(file_path, format, output, max_entries, follow) + + +def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None: + """Analyze lines from stdin.""" + format_enum = None if format_str == "auto" else LogFormat(format_str) + analyzer = LogAnalyzer() + + if format_enum is None: + result = analyzer.analyze(lines) + else: + result = analyzer.analyze(lines, format_enum) + + _display_result(result, output, max_entries) + + +def _analyze_file(file_path: str, format_str: str, output: str, max_entries: int, follow: bool) -> None: + """Analyze a single file.""" + format_enum = None if format_str == "auto" else LogFormat(format_str) + analyzer = LogAnalyzer() + + if follow: + _follow_file(file_path, analyzer, format_enum, output, max_entries) + else: + result = analyzer.analyze_file(file_path, format_enum) + _display_result(result, output, max_entries) + + +def _follow_file(file_path: str, analyzer: LogAnalyzer, format: Optional[LogFormat], + output: str, max_entries: int) -> None: + """Follow a file and analyze in real-time.""" + with open(file_path, "r") as f: + f.seek(0, 2) + buffer = [] + + click.echo(f"Following {file_path}... (Ctrl+C to stop)") + + try: + while True: + line = f.readline() + if line: + buffer.append(line) + if len(buffer) >= 100: + result = analyzer.analyze(buffer, format) + _display_result(result, output, max_entries) + buffer = [] + else: + time.sleep(0.5) + except KeyboardInterrupt: + if buffer: + result = analyzer.analyze(buffer, format) + _display_result(result, output, max_entries) + + +def _display_result(result, output: str, max_entries: int) -> None: + """Display analysis result.""" + if output == "json": + formatter = JSONFormatter() + click.echo(formatter.format(result)) + elif output == "text": + formatter = TextFormatter() + click.echo(formatter.format(result)) + else: + formatter = TableFormatter(max_entries=max_entries) + formatter.format(result) + + +@main.command("watch") +@click.argument("files", type=click.Path(exists=True), nargs=-1) +@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]), + default="auto", help="Log format") +@click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds") +@click.option("--max-entries", type=int, default=50, help="Maximum entries per update") +@click.pass_context +def watch( + ctx: click.Context, + files: tuple, + format: str, + interval: float, + max_entries: int +) -> None: + """Watch log files and display live updates.""" + if not files: + click.echo("Error: No files specified for watching.") + ctx.exit(1) + + format_enum = None if format == "auto" else LogFormat(format) + analyzer = LogAnalyzer() + + click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.") + + try: + while True: + for file_path in files: + result = analyzer.analyze_file(file_path, format_enum) + + click.clear() + click.echo(f"=== {file_path} ===") + formatter = TableFormatter(max_entries=max_entries) + formatter.format(result.entries) + + time.sleep(interval) + except KeyboardInterrupt: + click.echo("\nStopped watching.") + + +@main.command("report") +@click.argument("files", type=click.Path(exists=True), nargs=-1) +@click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]), + default="auto", help="Log format") +@click.option("--output", type=click.Path(), help="Output file path (default: stdout)") +@click.option("--json/--no-json", default=False, help="Output as JSON") +@click.pass_context +def report( + ctx: click.Context, + files: tuple, + format: str, + output: Optional[str], + json: bool +) -> None: + """Generate detailed analysis report.""" + if not files: + click.echo("Error: No log files specified.") + ctx.exit(1) + + format_enum = None if format == "auto" else LogFormat(format) + analyzer = LogAnalyzer() + + all_results = [] + for file_path in files: + result = analyzer.analyze_file(file_path, format_enum) + all_results.append((file_path, result)) + + if json: + formatter = JSONFormatter() + report_data = { + "files_analyzed": len(files), + "results": [ + {"file": path, "analysis": result} + for path, result in all_results + ] + } + report_text = formatter.format(report_data) + else: + lines = [] + lines.append("=" * 60) + lines.append("LOG ANALYSIS REPORT") + lines.append("=" * 60) + lines.append(f"Files Analyzed: {len(files)}") + lines.append("") + + for file_path, result in all_results: + lines.append(f"=== {file_path} ===") + lines.append(f"Total Lines: {result.total_lines}") + lines.append(f"Format: {result.format_detected.value}") + lines.append(f"Critical: {result.critical_count} | Error: {result.error_count} | " + f"Warning: {result.warning_count} | Info: {result.debug_count}") + lines.append("") + + if result.suggestions: + lines.append("Suggestions:") + for i, suggestion in enumerate(result.suggestions, 1): + lines.append(f" {i}. {suggestion}") + lines.append("") + + report_text = "\n".join(lines) + + if output: + with open(output, "w") as f: + f.write(report_text) + click.echo(f"Report written to {output}") + else: + click.echo(report_text) + + +@main.command("patterns") +@click.option("--group", help="Filter by pattern group") +@click.option("--severity", type=click.Choice(["critical", "error", "warning", "info", "debug"]), + help="Filter by severity") +@click.pass_context +def patterns(ctx: click.Context, group: str, severity: str) -> None: + """List available error detection patterns.""" + analyzer = LogAnalyzer() + patterns_by_group = analyzer.list_patterns_by_group() + + if group: + if group in patterns_by_group: + patterns_to_show = {group: patterns_by_group[group]} + else: + click.echo(f"Unknown group: {group}") + ctx.exit(1) + else: + patterns_to_show = patterns_by_group + + formatter = TableFormatter() + formatter.console.print("[bold]Available Error Patterns[/]") + + for group_name, patterns in patterns_to_show.items(): + formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]") + for pattern in patterns: + severity_color = { + "critical": "red", + "error": "red", + "warning": "yellow", + "info": "blue", + "debug": "grey" + }.get(pattern["severity"], "white") + formatter.console.print( + f" [bold]{pattern['name']}[/] " + f"[{severity_color}]({pattern['severity']})[/]" + ) + if pattern["description"]: + formatter.console.print(f" {pattern['description']}") + + +@main.command("info") +@click.pass_context +def info(ctx: click.Context) -> None: + """Display LogLens information.""" + from loglens import __version__ + + click.echo(f"LogLens CLI v{__version__}") + click.echo("") + click.echo("Supported log formats:") + click.echo(" - JSON (JSON Lines, arrays)") + click.echo(" - Syslog (RFC 3164, RFC 5424)") + click.echo(" - Apache/Nginx (Common, Combined, Custom)") + click.echo("") + click.echo("Commands:") + click.echo(" analyze - Analyze log files") + click.echo(" watch - Watch files in real-time") + click.echo(" report - Generate detailed report") + click.echo(" patterns - List error patterns") + click.echo(" info - Show this information")