Add formatters (table, JSON, text)
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-02 10:09:25 +00:00
parent 72bec60e37
commit 9380411871

View File

@@ -1,75 +1,174 @@
from typing import Optional """Table formatter using Rich library."""
from typing import Any, Optional
from rich import box
from rich.console import Console from rich.console import Console
from rich.style import Style
from rich.table import Table from rich.table import Table
from rich.text import Text
from loglens.analyzers.analyzer import AnalysisResult, ParsedEntry from loglens.analyzers.analyzer import AnalysisResult
from loglens.formatters.base import OutputFormatter
from loglens.parsers.base import ParsedLogEntry
class TableFormatter: class TableFormatter(OutputFormatter):
"""Format analysis results as a table.""" """Formats output as rich tables."""
def __init__(self, max_entries: int = 100): SEVERITY_STYLES = {
"critical": Style(color="red", bold=True),
"error": Style(color="red"),
"warning": Style(color="yellow"),
"info": Style(color="blue"),
"debug": Style(color="bright_black"),
"unknown": Style(color="white"),
}
def __init__(
self,
console: Optional[Console] = None,
show_timestamps: bool = True,
max_entries: int = 100,
):
super().__init__()
self.console = console or Console()
self.show_timestamps = show_timestamps
self.max_entries = max_entries self.max_entries = max_entries
self.console = Console()
def format(self, result: AnalysisResult) -> None: def format(self, data: Any) -> str:
"""Format result as a table.""" """Format data as table."""
self._print_summary(result) if isinstance(data, AnalysisResult):
self._print_entries(result.entries) return self._format_analysis_result(data)
elif isinstance(data, list):
return self._format_entries(data)
else:
return str(data)
def _print_summary(self, result: AnalysisResult) -> None: def _format_analysis_result(self, result: AnalysisResult) -> str:
"""Print summary statistics.""" """Format analysis result as summary table."""
self.console.print("[bold]Log Analysis Summary[/]") summary_table = Table(title="Log Analysis Summary", box=box.ROUNDED)
self.console.print(f" Total Lines: {result.total_lines}") summary_table.add_column("Metric", style="cyan")
self.console.print(f" Format: {result.format_detected.value}") summary_table.add_column("Value", style="magenta")
severity_colors = { summary_table.add_row("Total Lines", str(result.total_lines))
"critical": "red", summary_table.add_row("Parsed Entries", str(result.parsed_count))
"error": "red", summary_table.add_row("Format Detected", result.format_detected.value.upper())
"warning": "yellow",
"info": "blue",
"debug": "grey",
}
for severity, count in [ if result.time_range:
("critical", result.critical_count), start, end = result.time_range
("error", result.error_count), summary_table.add_row("Time Range", f"{start} to {end}")
("warning", result.warning_count),
("info", result.info_count if hasattr(result, 'info_count') else 0), self.console.print(summary_table)
("debug", result.debug_count),
]: severity_table = Table(title="Severity Breakdown", box=box.ROUNDED)
color = severity_colors.get(severity, "white") severity_table.add_column("Severity", style="bold")
self.console.print(f" [{color}]{severity.upper()}: {count}[/]") severity_table.add_column("Count", justify="right")
severity_table.add_column("Percentage", justify="right")
total = result.parsed_count or 1
for level in ["critical", "error", "warning", "info", "debug"]:
count = getattr(result, f"{level}_count", 0)
pct = (count / total) * 100
severity_table.add_row(level.upper(), str(count), f"{pct:.1f}%")
self.console.print(severity_table)
if result.top_errors:
errors_table = Table(title="Top Error Patterns", box=box.ROUNDED)
errors_table.add_column("Pattern", style="red")
errors_table.add_column("Count", justify="right")
for item in result.top_errors[:10]:
errors_table.add_row(item["pattern"], str(item["count"]))
self.console.print(errors_table)
if result.suggestions: if result.suggestions:
self.console.print("\n[bold]Suggestions:[/]") suggestions_table = Table(title="Suggestions", box=box.ROUNDED)
for i, suggestion in enumerate(result.suggestions, 1): suggestions_table.add_column("Recommendation", style="yellow")
self.console.print(f" {i}. {suggestion}")
def _print_entries(self, entries: list[ParsedEntry]) -> None: for suggestion in result.suggestions:
"""Print log entries as a table.""" suggestions_table.add_row(suggestion)
if not entries:
return
table = Table(title="Log Entries") self.console.print(suggestions_table)
table.add_column("Timestamp", style="cyan", no_wrap=True)
table.add_column("Level", style="magenta")
table.add_column("Message", style="green")
for entry in entries[:self.max_entries]: return ""
severity_style = {
"critical": "bold red",
"error": "red",
"warning": "yellow",
"info": "blue",
"debug": "grey",
}.get(entry.severity, "white")
table.add_row( def _format_entries(self, entries: list[ParsedLogEntry]) -> str:
entry.timestamp or "N/A", """Format log entries as table."""
f"[{severity_style}]{entry.level or 'N/A'}[/]", table = Table(title="Log Entries", box=box.ROUNDED)
entry.message[:100] if entry.message else "N/A", table.add_column("#", justify="right", style="dim")
) if self.show_timestamps:
table.add_column("Timestamp", style="cyan")
table.add_column("Severity", style="bold")
table.add_column("Message", overflow="fold")
displayed = entries[: self.max_entries]
for entry in displayed:
row = [str(entry.line_number)]
if self.show_timestamps:
ts = entry.timestamp.strftime("%Y-%m-%d %H:%M:%S") if entry.timestamp else "-"
row.append(ts)
severity = entry.severity or "unknown"
style = self.SEVERITY_STYLES.get(severity, self.SEVERITY_STYLES["unknown"])
row.append(Text(severity.upper(), style=style))
message = entry.message or entry.raw_line[:100]
row.append(message)
table.add_row(*row)
if len(entries) > self.max_entries:
table.add_row(f"... and {len(entries) - self.max_entries} more", "", "", "")
self.console.print(table) self.console.print(table)
return ""
def format_entries_detailed(self, entries: list[ParsedLogEntry]) -> str:
"""Format entries with full details."""
for entry in entries[: self.max_entries]:
self._print_entry_detailed(entry)
return ""
def _print_entry_detailed(self, entry: ParsedLogEntry) -> None:
"""Print a single entry with full details."""
from rich.panel import Panel
severity = entry.severity or "unknown"
style = self.SEVERITY_STYLES.get(severity, self.SEVERITY_STYLES["unknown"])
content = []
if entry.timestamp:
content.append(f"[bold]Timestamp:[/] {entry.timestamp.isoformat()}")
content.append(f"[bold]Line:[/] {entry.line_number}")
if entry.level:
content.append(f"[bold]Level:[/] {entry.level}")
if entry.host:
content.append(f"[bold]Host:[/] {entry.host}")
if entry.error_pattern:
content.append(f"[bold]Pattern:[/] [red]{entry.error_pattern}[/]")
content.append("")
content.append(entry.message or entry.raw_line)
if entry.extra:
content.append("")
content.append("[bold]Extra Data:[/]")
for key, value in entry.extra.items():
content.append(f" {key}: {value}")
panel = Panel(
"\n".join(content), title=f"Entry #{entry.line_number}", style=style, box=box.SIMPLE
)
self.console.print(panel)
self.console.print()