Add formatters (table, JSON, text)
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled

This commit is contained in:
2026-02-02 10:09:30 +00:00
parent 0caa7c9585
commit cf5b0a02b0

View File

@@ -1,34 +1,93 @@
"""Text output formatter."""
from typing import Any from typing import Any
from loglens.analyzers.analyzer import AnalysisResult from loglens.analyzers.analyzer import AnalysisResult
from loglens.formatters.base import OutputFormatter
from loglens.parsers.base import ParsedLogEntry
class TextFormatter: class TextFormatter(OutputFormatter):
"""Format analysis results as plain text.""" """Simple text output formatter."""
def format(self, result: Any) -> str: def format(self, data: Any) -> str:
"""Format result as text.""" """Format data as text."""
if not hasattr(result, "to_dict"): if isinstance(data, AnalysisResult):
return str(result) return self._format_analysis_result(data)
elif isinstance(data, list):
return self._format_entries(data)
else:
return str(data)
data = result.to_dict() def _format_analysis_result(self, result: AnalysisResult) -> str:
"""Format analysis result as text summary."""
lines = [] lines = []
lines.append("=" * 60) lines.append("=" * 60)
lines.append("LOG ANALYSIS") lines.append("LOG ANALYSIS SUMMARY")
lines.append("=" * 60) lines.append("=" * 60)
lines.append(f"Total Lines: {data['total_lines']}") lines.append(f"Total Lines: {result.total_lines}")
lines.append(f"Format: {data['format_detected']}") lines.append(f"Parsed Entries: {result.parsed_count}")
lines.append("") lines.append(f"Format Detected: {result.format_detected.value.upper()}")
lines.append("Severity Counts:") lines.append(f"Analysis Time: {result.analysis_time.isoformat()}")
lines.append(f" Critical: {data['critical_count']}")
lines.append(f" Error: {data['error_count']}")
lines.append(f" Warning: {data['warning_count']}")
lines.append(f" Debug: {data['debug_count']}")
if data.get("suggestions"): if result.time_range:
lines.append(f"Time Range: {result.time_range[0]} to {result.time_range[1]}")
lines.append("")
lines.append("SEVERITY BREAKDOWN:")
lines.append("-" * 40)
for level in ["critical", "error", "warning", "info", "debug"]:
count = getattr(result, f"{level}_count", 0)
lines.append(f" {level.upper():10} : {count}")
lines.append("")
lines.append("TOP ERROR PATTERNS:")
lines.append("-" * 40)
for item in result.top_errors[:10]:
lines.append(f" {item['pattern']:30} : {item['count']}")
if result.suggestions:
lines.append("") lines.append("")
lines.append("Suggestions:") lines.append("SUGGESTIONS:")
for suggestion in data["suggestions"]: lines.append("-" * 40)
lines.append(f" - {suggestion}") for i, suggestion in enumerate(result.suggestions, 1):
lines.append(f" {i}. {suggestion}")
lines.append("=" * 60)
return "\n".join(lines)
def _format_entries(self, entries: list[ParsedLogEntry]) -> str:
"""Format log entries as text lines."""
lines = []
for entry in entries:
parts = []
if entry.timestamp:
parts.append(entry.timestamp.strftime("%Y-%m-%d %H:%M:%S"))
severity = (entry.severity or "UNKNOWN").upper()
parts.append(f"[{severity:8}]")
parts.append(entry.message or entry.raw_line[:100])
if entry.error_pattern:
parts.append(f" <{entry.error_pattern}>")
lines.append(" ".join(parts))
return "\n".join(lines)
def format_entries_compact(self, entries: list[ParsedLogEntry], max_lines: int = 100) -> str:
"""Format entries compactly."""
lines = []
for entry in entries[:max_lines]:
severity = (entry.severity or "?").upper()[0]
ts = entry.timestamp.strftime("%H:%M:%S") if entry.timestamp else "??:??:??"
message = entry.message or entry.raw_line[:80]
lines.append(f"{ts} {severity} {message}")
if len(entries) > max_lines:
lines.append(f"... and {len(entries) - max_lines} more entries")
return "\n".join(lines) return "\n".join(lines)