"""Click CLI commands for LogLens.""" import logging import sys import time from typing import Optional import click from colorlog import ColoredFormatter from loglens.analyzers.analyzer import LogAnalyzer from loglens.formatters.json_formatter import JSONFormatter from loglens.formatters.table_formatter import TableFormatter from loglens.formatters.text_formatter import TextFormatter from loglens.parsers.base import LogFormat def setup_logging(verbosity: int = 0) -> None: """Setup logging configuration.""" log_levels = ["ERROR", "WARNING", "INFO", "DEBUG"] level_idx = min(verbosity, len(log_levels) - 1) level = log_levels[level_idx] handler = ColoredFormatter( "%(log_color)s%(levelname)-8s%(reset)s %(message)s", log_colors={ "DEBUG": "cyan", "INFO": "green", "WARNING": "yellow", "ERROR": "red", "CRITICAL": "red,bg_white", }, ) logger = logging.getLogger("loglens") logger.setLevel(level) logger.handlers = [handler] @click.group() @click.option("--verbosity", "-v", count=True, help="Increase output verbosity") @click.option("--config", type=click.Path(exists=True), help="Path to config file") @click.version_option(version="0.1.0", prog_name="loglens") @click.pass_context def main(ctx: click.Context, verbosity: int, config: str) -> None: """LogLens - Parse, analyze, and summarize log files.""" from loglens import __version__ ctx.ensure_object(dict) ctx.obj["config"] = config ctx.obj["version"] = __version__ setup_logging(verbosity) @main.command("analyze") @click.argument("files", type=click.Path(exists=True), nargs=-1) @click.option( "--format", type=click.Choice(["json", "syslog", "apache", "auto"]), default="auto", help="Log format (auto-detect by default)", ) @click.option( "--output", type=click.Choice(["table", "json", "text"]), default="table", help="Output format" ) @click.option("--follow/--no-follow", default=False, help="Follow file changes") @click.option("--max-entries", type=int, default=100, help="Maximum entries to display") @click.option( "--json/--no-json", default=False, help="Output as JSON (shorthand for --output json)" ) @click.pass_context def analyze( ctx: click.Context, files: tuple, format: str, output: str, follow: bool, max_entries: int, json: bool, ) -> None: """Analyze log files and display summary.""" if json: output = "json" if not files and not sys.stdin.isatty(): lines = sys.stdin.readlines() _analyze_lines(lines, format, output, max_entries) elif not files: click.echo("Error: No log files specified. Use FILE or pipe data from stdin.") click.echo("Example: cat logfile.txt | loglens analyze") click.echo(" loglens analyze /var/log/syslog") ctx.exit(1) else: for file_path in files: _analyze_file(file_path, format, output, max_entries, follow) def _analyze_lines(lines: list, format_str: str, output: str, max_entries: int) -> None: """Analyze lines from stdin.""" format_enum = None if format_str == "auto" else LogFormat(format_str) analyzer = LogAnalyzer() if format_enum is None: result = analyzer.analyze(lines) else: result = analyzer.analyze(lines, format_enum) _display_result(result, output, max_entries) def _analyze_file( file_path: str, format_str: str, output: str, max_entries: int, follow: bool ) -> None: """Analyze a single file.""" format_enum = None if format_str == "auto" else LogFormat(format_str) analyzer = LogAnalyzer() if follow: _follow_file(file_path, analyzer, format_enum, output, max_entries) else: result = analyzer.analyze_file(file_path, format_enum) _display_result(result, output, max_entries) def _follow_file( file_path: str, analyzer: LogAnalyzer, format: Optional[LogFormat], output: str, max_entries: int, ) -> None: """Follow a file and analyze in real-time.""" with open(file_path) as f: f.seek(0, 2) buffer = [] click.echo(f"Following {file_path}... (Ctrl+C to stop)") try: while True: line = f.readline() if line: buffer.append(line) if len(buffer) >= 100: result = analyzer.analyze(buffer, format) _display_result(result, output, max_entries) buffer = [] else: time.sleep(0.5) except KeyboardInterrupt: if buffer: result = analyzer.analyze(buffer, format) _display_result(result, output, max_entries) def _display_result(result, output: str, max_entries: int) -> None: """Display analysis result.""" if output == "json": formatter = JSONFormatter() click.echo(formatter.format(result)) elif output == "text": formatter = TextFormatter() click.echo(formatter.format(result)) else: formatter = TableFormatter(max_entries=max_entries) formatter.format(result) @main.command("watch") @click.argument("files", type=click.Path(exists=True), nargs=-1) @click.option( "--format", type=click.Choice(["json", "syslog", "apache", "auto"]), default="auto", help="Log format", ) @click.option("--interval", type=float, default=1.0, help="Refresh interval in seconds") @click.option("--max-entries", type=int, default=50, help="Maximum entries per update") @click.pass_context def watch(ctx: click.Context, files: tuple, format: str, interval: float, max_entries: int) -> None: """Watch log files and display live updates.""" if not files: click.echo("Error: No files specified for watching.") ctx.exit(1) format_enum = None if format == "auto" else LogFormat(format) analyzer = LogAnalyzer() click.echo(f"Watching {len(files)} file(s). Press Ctrl+C to stop.") try: while True: for file_path in files: result = analyzer.analyze_file(file_path, format_enum) click.clear() click.echo(f"=== {file_path} ===") formatter = TableFormatter(max_entries=max_entries) formatter.format(result.entries) time.sleep(interval) except KeyboardInterrupt: click.echo("\nStopped watching.") @main.command("report") @click.argument("files", type=click.Path(exists=True), nargs=-1) @click.option( "--format", type=click.Choice(["json", "syslog", "apache", "auto"]), default="auto", help="Log format", ) @click.option("--output", type=click.Path(), help="Output file path (default: stdout)") @click.option("--json/--no-json", default=False, help="Output as JSON") @click.pass_context def report( ctx: click.Context, files: tuple, format: str, output: Optional[str], json: bool ) -> None: """Generate detailed analysis report.""" if not files: click.echo("Error: No log files specified.") ctx.exit(1) format_enum = None if format == "auto" else LogFormat(format) analyzer = LogAnalyzer() all_results = [] for file_path in files: result = analyzer.analyze_file(file_path, format_enum) all_results.append((file_path, result)) if json: formatter = JSONFormatter() report_data = { "files_analyzed": len(files), "results": [{"file": path, "analysis": result} for path, result in all_results], } report_text = formatter.format(report_data) else: lines = [] lines.append("=" * 60) lines.append("LOG ANALYSIS REPORT") lines.append("=" * 60) lines.append(f"Files Analyzed: {len(files)}") lines.append("") for file_path, result in all_results: lines.append(f"=== {file_path} ===") lines.append(f"Total Lines: {result.total_lines}") lines.append(f"Format: {result.format_detected.value}") lines.append( f"Critical: {result.critical_count} | Error: {result.error_count} | " f"Warning: {result.warning_count} | Info: {result.debug_count}" ) lines.append("") if result.suggestions: lines.append("Suggestions:") for i, suggestion in enumerate(result.suggestions, 1): lines.append(f" {i}. {suggestion}") lines.append("") report_text = "\n".join(lines) if output: with open(output, "w") as f: f.write(report_text) click.echo(f"Report written to {output}") else: click.echo(report_text) @main.command("patterns") @click.option("--group", help="Filter by pattern group") @click.option( "--severity", type=click.Choice(["critical", "error", "warning", "info", "debug"]), help="Filter by severity", ) @click.pass_context def patterns(ctx: click.Context, group: str, severity: str) -> None: """List available error detection patterns.""" analyzer = LogAnalyzer() patterns_by_group = analyzer.list_patterns_by_group() if group: if group in patterns_by_group: patterns_to_show = {group: patterns_by_group[group]} else: click.echo(f"Unknown group: {group}") ctx.exit(1) else: patterns_to_show = patterns_by_group formatter = TableFormatter() formatter.console.print("[bold]Available Error Patterns[/]") for group_name, patterns in patterns_to_show.items(): formatter.console.print(f"\n[bold cyan]{group_name.upper()}[/]") for pattern in patterns: severity_color = { "critical": "red", "error": "red", "warning": "yellow", "info": "blue", "debug": "grey", }.get(pattern["severity"], "white") formatter.console.print( f" [bold]{pattern['name']}[/] " f"[{severity_color}]({pattern['severity']})[/]" ) if pattern["description"]: formatter.console.print(f" {pattern['description']}") @main.command("info") @click.pass_context def info(ctx: click.Context) -> None: """Display LogLens information.""" from loglens import __version__ click.echo(f"LogLens CLI v{__version__}") click.echo("") click.echo("Supported log formats:") click.echo(" - JSON (JSON Lines, arrays)") click.echo(" - Syslog (RFC 3164, RFC 5424)") click.echo(" - Apache/Nginx (Common, Combined, Custom)") click.echo("") click.echo("Commands:") click.echo(" analyze - Analyze log files") click.echo(" watch - Watch files in real-time") click.echo(" report - Generate detailed report") click.echo(" patterns - List error patterns") click.echo(" info - Show this information")