Add analyzers (patterns, severity, analyzer)
Some checks failed
CI / test (3.10) (push) Has started running
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-02 10:08:35 +00:00
parent 09792d2bba
commit 7bbe910333

View File

@@ -1,200 +1,193 @@
import re """Log analyzer orchestrator."""
from collections import Counter from collections import Counter
from typing import Optional from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from loglens.analyzers.patterns import PatternLibrary from loglens.analyzers.patterns import PatternLibrary
from loglens.analyzers.severity import SeverityClassifier from loglens.analyzers.severity import SeverityClassifier
from loglens.parsers.base import LogFormat, ParsedEntry from loglens.parsers.base import LogFormat, ParsedLogEntry
from loglens.parsers.factory import ParserFactory
@dataclass
class AnalysisResult: class AnalysisResult:
"""Result of log analysis.""" """Result of log analysis."""
def __init__( entries: list[ParsedLogEntry] = field(default_factory=list)
self, format_detected: LogFormat = LogFormat.UNKNOWN
total_lines: int, total_lines: int = 0
entries: list[ParsedEntry], parsed_count: int = 0
format_detected: LogFormat, error_count: int = 0
error_count: int = 0, warning_count: int = 0
warning_count: int = 0, critical_count: int = 0
critical_count: int = 0, debug_count: int = 0
debug_count: int = 0, pattern_matches: dict[str, int] = field(default_factory=dict)
suggestions: Optional[list[str]] = None, severity_breakdown: dict[str, int] = field(default_factory=dict)
): top_errors: list[dict[str, Any]] = field(default_factory=list)
self.total_lines = total_lines host_breakdown: dict[str, int] = field(default_factory=dict)
self.entries = entries time_range: Optional[tuple] = None
self.format_detected = format_detected analysis_time: datetime = field(default_factory=datetime.now)
self.error_count = error_count suggestions: list[str] = field(default_factory=list)
self.warning_count = warning_count
self.critical_count = critical_count
self.debug_count = debug_count
self.suggestions = suggestions or []
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"total_lines": self.total_lines,
"entries": [e.to_dict() for e in self.entries],
"format_detected": self.format_detected.value,
"error_count": self.error_count,
"warning_count": self.warning_count,
"critical_count": self.critical_count,
"debug_count": self.debug_count,
"suggestions": self.suggestions,
}
class LogAnalyzer: class LogAnalyzer:
"""Main analyzer for log files.""" """Orchestrates log parsing and analysis."""
def __init__(self): def __init__(self, config: Optional[dict[str, Any]] = None):
self.patterns = PatternLibrary() self.parser_factory = ParserFactory()
self.severity_classifier = SeverityClassifier() self.pattern_library = PatternLibrary()
self.severity_classifier = SeverityClassifier(
custom_rules=config.get("severity_rules") if config else None
)
self.config = config or {}
def analyze( def analyze(self, lines: list[str], format: Optional[LogFormat] = None) -> AnalysisResult:
self, lines: list[str], format_enum: Optional[LogFormat] = None
) -> AnalysisResult:
"""Analyze a list of log lines.""" """Analyze a list of log lines."""
entries = [] result = AnalysisResult(total_lines=len(lines), analysis_time=datetime.now())
error_count = 0
warning_count = 0
critical_count = 0
debug_count = 0
detected_format = format_enum if not lines:
return result
for line in lines: if format is None:
if not line.strip(): format = self.parser_factory.detect_format_batch(lines)
continue
entry = self._parse_line(line, format_enum) result.format_detected = format
if entry:
entries.append(entry)
severity = self._classify_entry(entry)
entry.severity = severity
if severity == "critical": entries = self.parser_factory.parse_lines(lines, format)
critical_count += 1 result.entries = entries
elif severity == "error": result.parsed_count = len(entries)
error_count += 1
elif severity == "warning":
warning_count += 1
elif severity == "debug":
debug_count += 1
if detected_format is None: for entry in entries:
detected_format = entry.format self._analyze_entry(entry)
suggestions = self._generate_suggestions(entries) self._compute_statistics(result)
return AnalysisResult( return result
total_lines=len(lines),
entries=entries, def _analyze_entry(self, entry: ParsedLogEntry) -> None:
format_detected=detected_format or LogFormat.RAW, """Analyze a single entry."""
error_count=error_count, message = entry.message or ""
warning_count=warning_count, raw_text = entry.raw_line
critical_count=critical_count,
debug_count=debug_count, patterns = self.pattern_library.detect(raw_text)
suggestions=suggestions, if patterns:
pattern, match = patterns[0]
entry.error_pattern = pattern.name
severity = self.severity_classifier.classify(
level=entry.level, message=message, pattern_match=entry.error_pattern
) )
entry.severity = severity.value
def analyze_file( def _compute_statistics(self, result: AnalysisResult) -> None:
self, file_path: str, format_enum: Optional[LogFormat] = None """Compute statistics from analyzed entries."""
) -> AnalysisResult: severity_counts = Counter()
"""Analyze a log file.""" pattern_counts = Counter()
with open(file_path, "r") as f: host_counts = Counter()
lines = f.readlines() timestamps = []
return self.analyze(lines, format_enum)
def _parse_line( for entry in result.entries:
self, line: str, format_enum: Optional[LogFormat] = None severity = entry.severity or "unknown"
) -> Optional[ParsedEntry]: severity_counts[severity] += 1
"""Parse a single log line."""
from loglens.parsers.factory import ParserFactory
if format_enum: if entry.error_pattern:
parser = ParserFactory.get_parser(format_enum) pattern_counts[entry.error_pattern] += 1
entry = parser.parse(line)
if entry:
return entry
for fmt in LogFormat: if entry.host:
if fmt == LogFormat.RAW: host_counts[entry.host] += 1
continue
parser = ParserFactory.get_parser(fmt)
entry = parser.parse(line)
if entry:
return entry
return ParsedEntry( if entry.timestamp:
raw_line=line.strip(), timestamps.append(entry.timestamp)
format=LogFormat.RAW,
timestamp=None,
level=None,
message=line.strip(),
metadata={},
)
def _classify_entry(self, entry: ParsedEntry) -> str: result.severity_breakdown = dict(severity_counts)
"""Classify severity of an entry.""" result.pattern_matches = dict(pattern_counts)
content = entry.message result.host_breakdown = dict(host_counts)
patterns_by_severity = self.patterns.get_patterns_for_content(content) result.critical_count = severity_counts.get("critical", 0)
result.error_count = severity_counts.get("error", 0)
result.warning_count = severity_counts.get("warning", 0)
result.debug_count = severity_counts.get("debug", 0)
if patterns_by_severity: if timestamps:
severities = [p.severity for p in patterns_by_severity] result.time_range = (min(timestamps), max(timestamps))
if "critical" in severities:
return "critical"
elif "error" in severities:
return "error"
elif "warning" in severities:
return "warning"
elif "debug" in severities:
return "debug"
return self.severity_classifier.classify(content, entry.level) result.top_errors = [
{"pattern": name, "count": count} for name, count in pattern_counts.most_common(10)
]
def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]: result.suggestions = self._generate_suggestions(result)
def _generate_suggestions(self, result: AnalysisResult) -> list[str]:
"""Generate suggestions based on analysis.""" """Generate suggestions based on analysis."""
suggestions = [] suggestions = []
error_entries = [e for e in entries if e.severity in ("error", "critical")] if result.critical_count > 0:
if not error_entries:
return ["No errors detected. Keep up the good work!"]
error_messages = [e.message for e in error_entries]
error_counter = Counter(error_messages)
common_errors = error_counter.most_common(5)
if len(common_errors) > 3:
suggestions.append( suggestions.append(
f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages." f"Found {result.critical_count} critical errors. "
"Review immediately - these may indicate system failures."
) )
for error_msg, count in common_errors[:3]: if result.error_count > 10:
if count > 1:
suggestions.append(f"'{error_msg[:50]}...' occurred {count} times")
stack_trace_entries = [
e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower()
]
if stack_trace_entries:
suggestions.append( suggestions.append(
"Multiple stack traces detected. Consider checking the exception types and their root causes." f"High error volume detected ({result.error_count} errors). "
"Consider implementing automated alerting."
) )
connection_errors = [ if result.pattern_matches:
e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower() top_pattern = max(result.pattern_matches, key=result.pattern_matches.get)
]
if len(connection_errors) > len(error_entries) * 0.3:
suggestions.append( suggestions.append(
"High proportion of connection/timeout errors. Check network connectivity and service availability." f"Most common issue: '{top_pattern}' "
f"({result.pattern_matches[top_pattern]} occurrences). "
"Prioritize fixing this pattern."
)
if result.host_breakdown:
top_host = max(result.host_breakdown, key=result.host_breakdown.get)
if result.host_breakdown[top_host] > len(result.entries) * 0.5:
suggestions.append(
f"Host '{top_host}' shows high error concentration. "
"Check this host's configuration and resources."
) )
return suggestions return suggestions
def list_patterns_by_group(self) -> dict[str, list[dict]]: def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult:
"""List all patterns grouped by category.""" """Analyze a log file."""
return self.patterns.get_all_patterns() with open(file_path, encoding="utf-8", errors="replace") as f:
lines = f.readlines()
return self.analyze(lines, format)
def analyze_stdin(self) -> AnalysisResult:
"""Analyze from stdin."""
import sys
lines = sys.stdin.readlines()
return self.analyze(lines)
def get_pattern_info(self, pattern_name: str) -> Optional[dict[str, Any]]:
"""Get information about a pattern."""
for pattern in self.pattern_library.list_patterns():
if pattern.name == pattern_name:
return {
"name": pattern.name,
"pattern": pattern.pattern,
"severity": pattern.severity,
"description": pattern.description,
"suggestion": pattern.suggestion,
"group": pattern.group,
"enabled": pattern.enabled,
}
return None
def list_patterns_by_group(self) -> dict[str, list[dict[str, Any]]]:
"""List all patterns organized by group."""
result = {}
for group_name, patterns in self.pattern_library.list_groups().items():
result[group_name] = [
{"name": p.name, "severity": p.severity, "description": p.description}
for p in patterns
]
return result