"""Click CLI commands for LogLens.""" import sys import time import logging from typing import Optional import click from colorlog import ColoredFormatter from loglens.parsers.base import LogFormat from loglens.analyzers.analyzer import LogAnalyzer from loglens.formatters.table_formatter import TableFormatter from loglens.formatters.json_formatter import JSONFormatter from loglens.formatters.text_formatter import TextFormatter def setup_logging(verbosity: int = 0) -> None: """Setup logging configuration.""" log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"] level_idx = min(verbosity, len(log_levels) - 1) level = log_levels[level_idx] handler = ColoredFormatter( "%(log_color)s%(levelname)-8s%(reset)s %(message)s", log_colors={ "DEBUG": "cyan", "INFO": "green", "WARNING": "yellow", "ERROR": "red", "CRITICAL": "red,bg_white", } ) logger = logging.getLogger("loglens") logger.setLevel(level) logger.handlers = [handler] @click.group() @click.option("--verbosity", "-v", count=True, help="Increase output verbosity") @click.option("--config", type=click.Path(exists=True), help="Path to config file") @click.pass_context def main(ctx: click.Context, verbosity: int, config: str) -> None: """LogLens - Parse, analyze, and summarize log files.""" setup_logging(verbosity) ctx.ensure_object(dict) ctx.obj["config"] = config @main.command("analyze") @click.argument("files", type=click.Path(exists=True), nargs=-1) @click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]), default="auto", help="Log format (auto-detect by default)") @click.option("--output", type=click.Choice(["table", "json", "text"]), default="table", help="Output format") @click.option("--follow/--no-follow", default=False, help="Follow file changes") @click.option("--max-entries", type=int, default=100, help="Maximum entries to display") @click.option("--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)") @click.pass_context def analyze( ctx: click.Context, files: tuple, format: str, output: str, follow: bool, max_entries: int, json: bool ) -> None: """Analyze log files and display summary.""" if json: output = "json" if not files and not sys.stdin.isatty(): lines = sys.stdin.readlines() _analyze_lines(lines, format, output, max_entries) elif not files: click.echo("Error: No log files specified. Use FILE or pipe data from stdin.") click.echo("Example: cat logfile.txt | loglens analyze") click.echo(" loglens analyze /var/log/syslog") ctx.exit(1) else: for file_path in files: _analyze_file(file_path, format, output, max_entries, follow) def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None: """Analyze lines from stdin.""" format_enum = None if format_str == "auto" else LogFormat(format_str) analyzer = LogAnalyzer() if format_enum is None: result = analyzer.analyze(lines) else: result = analyzer.analyze(lines, format_enum) _display_result(result, output, max_entries) def _analyze_file(file_path: str, format_str: str, output: str, max_entries: int, follow: bool) -> None: """Analyze a single file.""" format_enum = None if format_str == "auto" else LogFormat(format_str) analyzer = LogAnalyzer() if follow: _follow_file(file_path, analyzer, format_enum, output, max_entries) else: result = analyzer.analyze_file(file_path, format_enum) _display_result(result, output, max_entries) def _follow_file(file_path: str, analyzer: LogAnalyzer, format: Optional[LogFormat], output: str, max_entries: int) -> None: """Follow a file and analyze in real-time.""" with open(file_path, "r") as f: f.seek(0, 2) buffer = [] click.echo(f"Following {file_path}... (Ctrl+C to stop)") try: while True: line = f.readline() if line: buffer.append(line) if len(buffer) >= 100: result = analyzer.analyze(buffer, format) _display_result(result, output, max_entries) buffer = [] else: time.sleep(0.5) except KeyboardInterrupt: if buffer: result = analyzer.analyze(buffer, format) _display_result(result, output, max_entries) def _display_result(result, output: str, max_entries: int) -> None: """Display analysis result.""" if output == "json": formatter = JSONFormatter() click.echo(formatter.format(result)) elif output == "text": formatter = TextFormatter() click.echo(formatter.format(result)) else: formatter = TableFormatter(max_entries=max_entries) formatter.format(result) @main.command("watch") @click.argument("files", type=click.Path(exists=True), nargs=-1) @click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]), default="auto", help="Log format") @click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds") @click.option("--max-entries", type=int, default=50, help="Maximum entries per update") @click.pass_context def watch( ctx: click.Context, files: tuple, format: str, interval: float, max_entries: int ) -> None: """Watch log files and display live updates.""" if not files: click.echo("Error: No files specified for watching.") ctx.exit(1) format_enum = None if format == "auto" else LogFormat(format) analyzer = LogAnalyzer() click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.") try: while True: for file_path in files: result = analyzer.analyze_file(file_path, format_enum) click.clear() click.echo(f"=== {file_path} ===") formatter = TableFormatter(max_entries=max_entries) formatter.format(result.entries) time.sleep(interval) except KeyboardInterrupt: click.echo("\nStopped watching.") @main.command("report") @click.argument("files", type=click.Path(exists=True), nargs=-1) @click.option("--format", type=click.Choice(["json", "syslog", "apache", "auto"]), default="auto", help="Log format") @click.option("--output", type=click.Path(), help="Output file path (default: stdout)") @click.option("--json/--no-json", default=False, help="Output as JSON") @click.pass_context def report( ctx: click.Context, files: tuple, format: str, output: Optional[str], json: bool ) -> None: """Generate detailed analysis report.""" if not files: click.echo("Error: No log files specified.") ctx.exit(1) format_enum = None if format == "auto" else LogFormat(format) analyzer = LogAnalyzer() all_results = [] for file_path in files: result = analyzer.analyze_file(file_path, format_enum) all_results.append((file_path, result)) if json: formatter = JSONFormatter() report_data = { "files_analyzed": len(files), "results": [ {"file": path, "analysis": result} for path, result in all_results ] } report_text = formatter.format(report_data) else: lines = [] lines.append("=" * 60) lines.append("LOG ANALYSIS REPORT") lines.append("=" * 60) lines.append(f"Files Analyzed: {len(files)}") lines.append("") for file_path, result in all_results: lines.append(f"=== {file_path} ===") lines.append(f"Total Lines: {result.total_lines}") lines.append(f"Format: {result.format_detected.value}") lines.append(f"Critical: {result.critical_count} | Error: {result.error_count} | " f"Warning: {result.warning_count} | Info: {result.debug_count}") lines.append("") if result.suggestions: lines.append("Suggestions:") for i, suggestion in enumerate(result.suggestions, 1): lines.append(f" {i}. {suggestion}") lines.append("") report_text = "\n".join(lines) if output: with open(output, "w") as f: f.write(report_text) click.echo(f"Report written to {output}") else: click.echo(report_text) @main.command("patterns") @click.option("--group", help="Filter by pattern group") @click.option("--severity", type=click.Choice(["critical", "error", "warning", "info", "debug"]), help="Filter by severity") @click.pass_context def patterns(ctx: click.Context, group: str, severity: str) -> None: """List available error detection patterns.""" analyzer = LogAnalyzer() patterns_by_group = analyzer.list_patterns_by_group() if group: if group in patterns_by_group: patterns_to_show = {group: patterns_by_group[group]} else: click.echo(f"Unknown group: {group}") ctx.exit(1) else: patterns_to_show = patterns_by_group formatter = TableFormatter() formatter.console.print("[bold]Available Error Patterns[/]") for group_name, patterns in patterns_to_show.items(): formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]") for pattern in patterns: severity_color = { "critical": "red", "error": "red", "warning": "yellow", "info": "blue", "debug": "grey" }.get(pattern["severity"], "white") formatter.console.print( f" [bold]{pattern['name']}[/] " f"[{severity_color}]({pattern['severity']})[/]" ) if pattern["description"]: formatter.console.print(f" {pattern['description']}") @main.command("info") @click.pass_context def info(ctx: click.Context) -> None: """Display LogLens information.""" from loglens import __version__ click.echo(f"LogLens CLI v{__version__}") click.echo("") click.echo("Supported log formats:") click.echo(" - JSON (JSON Lines, arrays)") click.echo(" - Syslog (RFC 3164, RFC 5424)") click.echo(" - Apache/Nginx (Common, Combined, Custom)") click.echo("") click.echo("Commands:") click.echo(" analyze - Analyze log files") click.echo(" watch - Watch files in real-time") click.echo(" report - Generate detailed report") click.echo(" patterns - List error patterns") click.echo(" info - Show this information")