diff --git a/loglens/formatters/table_formatter.py b/loglens/formatters/table_formatter.py new file mode 100644 index 0000000..2ac4cab --- /dev/null +++ b/loglens/formatters/table_formatter.py @@ -0,0 +1,184 @@ +"""Table formatter using Rich library.""" + +from datetime import datetime +from typing import Any, Dict, List, Optional +from rich.console import Console +from rich.table import Table +from rich.text import Text +from rich import box +from rich.style import Style + +from loglens.analyzers.analyzer import AnalysisResult +from loglens.analyzers.severity import SeverityLevel +from loglens.parsers.base import ParsedLogEntry +from loglens.formatters.base import OutputFormatter + + +class TableFormatter(OutputFormatter): + """Formats output as rich tables.""" + + SEVERITY_STYLES = { + "critical": Style(color="red", bold=True), + "error": Style(color="red"), + "warning": Style(color="yellow"), + "info": Style(color="blue"), + "debug": Style(color="bright_black"), + "unknown": Style(color="white"), + } + + def __init__(self, console: Console = None, show_timestamps: bool = True, + max_entries: int = 100): + super().__init__() + self.console = console or Console() + self.show_timestamps = show_timestamps + self.max_entries = max_entries + + def format(self, data: Any) -> str: + """Format data as table.""" + if isinstance(data, AnalysisResult): + return self._format_analysis_result(data) + elif isinstance(data, list): + return self._format_entries(data) + else: + return str(data) + + def _format_analysis_result(self, result: AnalysisResult) -> str: + """Format analysis result as summary table.""" + output = [] + + summary_table = Table(title="Log Analysis Summary", box=box.ROUNDED) + summary_table.add_column("Metric", style="cyan") + summary_table.add_column("Value", style="magenta") + + summary_table.add_row("Total Lines", str(result.total_lines)) + summary_table.add_row("Parsed Entries", str(result.parsed_count)) + summary_table.add_row("Format Detected", result.format_detected.value.upper()) + + if result.time_range: + start, end = result.time_range + summary_table.add_row("Time Range", f"{start} to {end}") + + self.console.print(summary_table) + + severity_table = Table(title="Severity Breakdown", box=box.ROUNDED) + severity_table.add_column("Severity", style="bold") + severity_table.add_column("Count", justify="right") + severity_table.add_column("Percentage", justify="right") + + total = result.parsed_count or 1 + for level in ["critical", "error", "warning", "info", "debug"]: + count = getattr(result, f"{level}_count", 0) + pct = (count / total) * 100 + severity_table.add_row( + level.upper(), + str(count), + f"{pct:.1f}%" + ) + + self.console.print(severity_table) + + if result.top_errors: + errors_table = Table(title="Top Error Patterns", box=box.ROUNDED) + errors_table.add_column("Pattern", style="red") + errors_table.add_column("Count", justify="right") + + for item in result.top_errors[:10]: + errors_table.add_row(item["pattern"], str(item["count"])) + + self.console.print(errors_table) + + if result.suggestions: + suggestions_table = Table(title="Suggestions", box=box.ROUNDED) + suggestions_table.add_column("Recommendation", style="yellow") + + for suggestion in result.suggestions: + suggestions_table.add_row(suggestion) + + self.console.print(suggestions_table) + + return "" + + def _format_entries(self, entries: List[ParsedLogEntry]) -> str: + """Format log entries as table.""" + table = Table(title="Log Entries", box=box.ROUNDED) + table.add_column("#", justify="right", style="dim") + if self.show_timestamps: + table.add_column("Timestamp", style="cyan") + table.add_column("Severity", style="bold") + table.add_column("Message", overflow="fold") + + displayed = entries[:self.max_entries] + for entry in displayed: + row = [str(entry.line_number)] + + if self.show_timestamps: + ts = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S") if entry.timestamp else "-" + row.append(ts) + + severity = entry.severity or "unknown" + style = self.SEVERITY_STYLES.get(severity, self.SEVERITY_STYLES["unknown"]) + row.append(Text(severity.upper(), style=style)) + + message = entry.message or entry.raw_line[:100] + row.append(message) + + table.add_row(*row) + + if len(entries) > self.max_entries: + table.add_row( + f"... and {len(entries) - self.max_entries} more", + "", "", "" + ) + + self.console.print(table) + return "" + + def format_entries_detailed(self, entries: List[ParsedLogEntry]) -> str: + """Format entries with full details.""" + for entry in entries[:self.max_entries]: + self._print_entry_detailed(entry) + + return "" + + def _print_entry_detailed(self, entry: ParsedLogEntry) -> None: + """Print a single entry with full details.""" + from rich.panel import Panel + from rich.columns import Columns + + severity = entry.severity or "unknown" + style = self.SEVERITY_STYLES.get(severity, self.SEVERITY_STYLES["unknown"]) + + content = [] + + if entry.timestamp: + content.append(f"[bold]Timestamp:[/] {entry.timestamp.isoformat()}") + + content.append(f"[bold]Line:[/] {entry.line_number}") + + if entry.level: + content.append(f"[bold]Level:[/] {entry.level}") + + if entry.host: + content.append(f"[bold]Host:[/] {entry.host}") + + if entry.error_pattern: + content.append(f"[bold]Pattern:[/] [red]{entry.error_pattern}[/]") + + content.append("") + content.append(entry.message or entry.raw_line) + + if entry.extra: + content.append("") + content.append("[bold]Extra Data:[/]") + for key, value in entry.extra.items(): + content.append(f" {key}: {value}") + + panel = Panel( + "\n".join(content), + title=f"Entry #{entry.line_number}", + style=style, + box=box.SIMPLE + ) + + self.console.print(panel) + self.console.print()