Some checks failed
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035) - Removed unused imports across all modules - Fixed unused variables by replacing with _ prefix - Added missing Optional type imports - Reorganized imports for proper sorting (I001) - Applied black formatting to all source files
175 lines
6.0 KiB
Python
175 lines
6.0 KiB
Python
'''Table formatter using Rich library.'''
|
|
|
|
from typing import Any, Optional
|
|
|
|
from rich import box
|
|
from rich.console import Console
|
|
from rich.style import Style
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
|
|
from loglens.analyzers.analyzer import AnalysisResult
|
|
from loglens.formatters.base import OutputFormatter
|
|
from loglens.parsers.base import ParsedLogEntry
|
|
|
|
|
|
class TableFormatter(OutputFormatter):
|
|
'''Formats output as rich tables.'''
|
|
|
|
SEVERITY_STYLES = {
|
|
"critical": Style(color="red", bold=True),
|
|
"error": Style(color="red"),
|
|
"warning": Style(color="yellow"),
|
|
"info": Style(color="blue"),
|
|
"debug": Style(color="bright_black"),
|
|
"unknown": Style(color="white"),
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
console: Optional[Console] = None,
|
|
show_timestamps: bool = True,
|
|
max_entries: int = 100,
|
|
):
|
|
super().__init__()
|
|
self.console = console or Console()
|
|
self.show_timestamps = show_timestamps
|
|
self.max_entries = max_entries
|
|
|
|
def format(self, data: Any) -> str:
|
|
'''Format data as table.'''
|
|
if isinstance(data, AnalysisResult):
|
|
return self._format_analysis_result(data)
|
|
elif isinstance(data, list):
|
|
return self._format_entries(data)
|
|
else:
|
|
return str(data)
|
|
|
|
def _format_analysis_result(self, result: AnalysisResult) -> str:
|
|
'''Format analysis result as summary table.'''
|
|
summary_table = Table(title="Log Analysis Summary", box=box.ROUNDED)
|
|
summary_table.add_column("Metric", style="cyan")
|
|
summary_table.add_column("Value", style="magenta")
|
|
|
|
summary_table.add_row("Total Lines", str(result.total_lines))
|
|
summary_table.add_row("Parsed Entries", str(result.parsed_count))
|
|
summary_table.add_row("Format Detected", result.format_detected.value.upper())
|
|
|
|
if result.time_range:
|
|
start, end = result.time_range
|
|
summary_table.add_row("Time Range", f"{start} to {end}")
|
|
|
|
self.console.print(summary_table)
|
|
|
|
severity_table = Table(title="Severity Breakdown", box=box.ROUNDED)
|
|
severity_table.add_column("Severity", style="bold")
|
|
severity_table.add_column("Count", justify="right")
|
|
severity_table.add_column("Percentage", justify="right")
|
|
|
|
total = result.parsed_count or 1
|
|
for level in ["critical", "error", "warning", "info", "debug"]:
|
|
count = getattr(result, f"{level}_count", 0)
|
|
pct = (count / total) * 100
|
|
severity_table.add_row(level.upper(), str(count), f"{pct:.1f}%")
|
|
|
|
self.console.print(severity_table)
|
|
|
|
if result.top_errors:
|
|
errors_table = Table(title="Top Error Patterns", box=box.ROUNDED)
|
|
errors_table.add_column("Pattern", style="red")
|
|
errors_table.add_column("Count", justify="right")
|
|
|
|
for item in result.top_errors[:10]:
|
|
errors_table.add_row(item["pattern"], str(item["count"]))
|
|
|
|
self.console.print(errors_table)
|
|
|
|
if result.suggestions:
|
|
suggestions_table = Table(title="Suggestions", box=box.ROUNDED)
|
|
suggestions_table.add_column("Recommendation", style="yellow")
|
|
|
|
for suggestion in result.suggestions:
|
|
suggestions_table.add_row(suggestion)
|
|
|
|
self.console.print(suggestions_table)
|
|
|
|
return ""
|
|
|
|
def _format_entries(self, entries: list[ParsedLogEntry]) -> str:
|
|
'''Format log entries as table.'''
|
|
table = Table(title="Log Entries", box=box.ROUNDED)
|
|
table.add_column("#", justify="right", style="dim")
|
|
if self.show_timestamps:
|
|
table.add_column("Timestamp", style="cyan")
|
|
table.add_column("Severity", style="bold")
|
|
table.add_column("Message", overflow="fold")
|
|
|
|
displayed = entries[: self.max_entries]
|
|
for entry in displayed:
|
|
row = [str(entry.line_number)]
|
|
|
|
if self.show_timestamps:
|
|
ts = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S") if entry.timestamp else "-"
|
|
row.append(ts)
|
|
|
|
severity = entry.severity or "unknown"
|
|
style = self.SEVERITY_STYLES.get(severity, self.SEVERITY_STYLES["unknown"])
|
|
row.append(Text(severity.upper(), style=style))
|
|
|
|
message = entry.message or entry.raw_line[:100]
|
|
row.append(message)
|
|
|
|
table.add_row(*row)
|
|
|
|
if len(entries) > self.max_entries:
|
|
table.add_row(f"... and {len(entries) - self.max_entries} more", "", "", "")
|
|
|
|
self.console.print(table)
|
|
return ""
|
|
|
|
def format_entries_detailed(self, entries: list[ParsedLogEntry]) -> str:
|
|
'''Format entries with full details.'''
|
|
for entry in entries[: self.max_entries]:
|
|
self._print_entry_detailed(entry)
|
|
|
|
return ""
|
|
|
|
def _print_entry_detailed(self, entry: ParsedLogEntry) -> None:
|
|
'''Print a single entry with full details.'''
|
|
from rich.panel import Panel
|
|
|
|
severity = entry.severity or "unknown"
|
|
style = self.SEVERITY_STYLES.get(severity, self.SEVERITY_STYLES["unknown"])
|
|
|
|
content = []
|
|
|
|
if entry.timestamp:
|
|
content.append(f"[bold]Timestamp:[/] {entry.timestamp.isoformat()}")
|
|
|
|
content.append(f"[bold]Line:[/] {entry.line_number}")
|
|
|
|
if entry.level:
|
|
content.append(f"[bold]Level:[/] {entry.level}")
|
|
|
|
if entry.host:
|
|
content.append(f"[bold]Host:[/] {entry.host}")
|
|
|
|
if entry.error_pattern:
|
|
content.append(f"[bold]Pattern:[/] [red]{entry.error_pattern}[/]")
|
|
|
|
content.append("")
|
|
content.append(entry.message or entry.raw_line)
|
|
|
|
if entry.extra:
|
|
content.append("")
|
|
content.append("[bold]Extra Data:[/]")
|
|
for key, value in entry.extra.items():
|
|
content.append(f" {key}: {value}")
|
|
|
|
panel = Panel(
|
|
"\n".join(content), title=f"Entry #{entry.line_number}", style=style, box=box.SIMPLE
|
|
)
|
|
|
|
self.console.print(panel)
|
|
self.console.print()
|