Add analyzers: patterns, severity, and main analyzer
Some checks failed
Some checks failed
This commit is contained in:
201
loglens/analyzers/analyzer.py
Normal file
201
loglens/analyzers/analyzer.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""Log analyzer orchestrator."""
|
||||
|
||||
from collections import Counter, defaultdict
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loglens.parsers.base import LogFormat, ParsedLogEntry
|
||||
from loglens.parsers.factory import ParserFactory
|
||||
from loglens.analyzers.patterns import ErrorPattern, PatternLibrary
|
||||
from loglens.analyzers.severity import SeverityClassifier, SeverityLevel
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnalysisResult:
|
||||
"""Result of log analysis."""
|
||||
entries: List[ParsedLogEntry] = field(default_factory=list)
|
||||
format_detected: LogFormat = LogFormat.UNKNOWN
|
||||
total_lines: int = 0
|
||||
parsed_count: int = 0
|
||||
error_count: int = 0
|
||||
warning_count: int = 0
|
||||
critical_count: int = 0
|
||||
debug_count: int = 0
|
||||
pattern_matches: Dict[str, int] = field(default_factory=dict)
|
||||
severity_breakdown: Dict[str, int] = field(default_factory=dict)
|
||||
top_errors: List[Dict[str, Any]] = field(default_factory=list)
|
||||
host_breakdown: Dict[str, int] = field(default_factory=dict)
|
||||
time_range: Optional[tuple] = None
|
||||
analysis_time: datetime = field(default_factory=datetime.now)
|
||||
suggestions: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class LogAnalyzer:
|
||||
"""Orchestrates log parsing and analysis."""
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
self.parser_factory = ParserFactory()
|
||||
self.pattern_library = PatternLibrary()
|
||||
self.severity_classifier = SeverityClassifier(
|
||||
custom_rules=config.get("severity_rules") if config else None
|
||||
)
|
||||
self.config = config or {}
|
||||
|
||||
def analyze(self, lines: List[str], format: Optional[LogFormat] = None) -> AnalysisResult:
|
||||
"""Analyze a list of log lines."""
|
||||
result = AnalysisResult(
|
||||
total_lines=len(lines),
|
||||
analysis_time=datetime.now()
|
||||
)
|
||||
|
||||
if not lines:
|
||||
return result
|
||||
|
||||
if format is None:
|
||||
format = self.parser_factory.detect_format_batch(lines)
|
||||
|
||||
result.format_detected = format
|
||||
|
||||
entries = self.parser_factory.parse_lines(lines, format)
|
||||
result.entries = entries
|
||||
result.parsed_count = len(entries)
|
||||
|
||||
for entry in entries:
|
||||
self._analyze_entry(entry)
|
||||
|
||||
self._compute_statistics(result)
|
||||
|
||||
return result
|
||||
|
||||
def _analyze_entry(self, entry: ParsedLogEntry) -> None:
|
||||
"""Analyze a single entry."""
|
||||
message = entry.message or ""
|
||||
raw_text = entry.raw_line
|
||||
|
||||
patterns = self.pattern_library.detect(raw_text)
|
||||
if patterns:
|
||||
pattern, match = patterns[0]
|
||||
entry.error_pattern = pattern.name
|
||||
|
||||
severity = self.severity_classifier.classify(
|
||||
level=entry.level,
|
||||
message=message,
|
||||
pattern_match=entry.error_pattern
|
||||
)
|
||||
entry.severity = severity.value
|
||||
|
||||
def _compute_statistics(self, result: AnalysisResult) -> None:
|
||||
"""Compute statistics from analyzed entries."""
|
||||
severity_counts = Counter()
|
||||
pattern_counts = Counter()
|
||||
host_counts = Counter()
|
||||
timestamps = []
|
||||
|
||||
for entry in result.entries:
|
||||
severity = entry.severity or "unknown"
|
||||
severity_counts[severity] += 1
|
||||
|
||||
if entry.error_pattern:
|
||||
pattern_counts[entry.error_pattern] += 1
|
||||
|
||||
if entry.host:
|
||||
host_counts[entry.host] += 1
|
||||
|
||||
if entry.timestamp:
|
||||
timestamps.append(entry.timestamp)
|
||||
|
||||
result.severity_breakdown = dict(severity_counts)
|
||||
result.pattern_matches = dict(pattern_counts)
|
||||
result.host_breakdown = dict(host_counts)
|
||||
|
||||
result.critical_count = severity_counts.get("critical", 0)
|
||||
result.error_count = severity_counts.get("error", 0)
|
||||
result.warning_count = severity_counts.get("warning", 0)
|
||||
result.debug_count = severity_counts.get("debug", 0)
|
||||
|
||||
if timestamps:
|
||||
result.time_range = (min(timestamps), max(timestamps))
|
||||
|
||||
result.top_errors = [
|
||||
{"pattern": name, "count": count}
|
||||
for name, count in pattern_counts.most_common(10)
|
||||
]
|
||||
|
||||
result.suggestions = self._generate_suggestions(result)
|
||||
|
||||
def _generate_suggestions(self, result: AnalysisResult) -> List[str]:
|
||||
"""Generate suggestions based on analysis."""
|
||||
suggestions = []
|
||||
|
||||
if result.critical_count > 0:
|
||||
suggestions.append(
|
||||
f"Found {result.critical_count} critical errors. "
|
||||
"Review immediately - these may indicate system failures."
|
||||
)
|
||||
|
||||
if result.error_count > 10:
|
||||
suggestions.append(
|
||||
f"High error volume detected ({result.error_count} errors). "
|
||||
"Consider implementing automated alerting."
|
||||
)
|
||||
|
||||
if result.pattern_matches:
|
||||
top_pattern = max(result.pattern_matches, key=result.pattern_matches.get)
|
||||
suggestions.append(
|
||||
f"Most common issue: '{top_pattern}' "
|
||||
f"({result.pattern_matches[top_pattern]} occurrences). "
|
||||
"Prioritize fixing this pattern."
|
||||
)
|
||||
|
||||
if result.host_breakdown:
|
||||
top_host = max(result.host_breakdown, key=result.host_breakdown.get)
|
||||
if result.host_breakdown[top_host] > len(result.entries) * 0.5:
|
||||
suggestions.append(
|
||||
f"Host '{top_host}' shows high error concentration. "
|
||||
"Check this host's configuration and resources."
|
||||
)
|
||||
|
||||
return suggestions
|
||||
|
||||
def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult:
|
||||
"""Analyze a log file."""
|
||||
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
return self.analyze(lines, format)
|
||||
|
||||
def analyze_stdin(self) -> AnalysisResult:
|
||||
"""Analyze from stdin."""
|
||||
import sys
|
||||
lines = sys.stdin.readlines()
|
||||
return self.analyze(lines)
|
||||
|
||||
def get_pattern_info(self, pattern_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get information about a pattern."""
|
||||
for pattern in self.pattern_library.list_patterns():
|
||||
if pattern.name == pattern_name:
|
||||
return {
|
||||
"name": pattern.name,
|
||||
"pattern": pattern.pattern,
|
||||
"severity": pattern.severity,
|
||||
"description": pattern.description,
|
||||
"suggestion": pattern.suggestion,
|
||||
"group": pattern.group,
|
||||
"enabled": pattern.enabled
|
||||
}
|
||||
return None
|
||||
|
||||
def list_patterns_by_group(self) -> Dict[str, List[Dict[str, Any]]]:
|
||||
"""List all patterns organized by group."""
|
||||
result = {}
|
||||
for group_name, patterns in self.pattern_library.list_groups().items():
|
||||
result[group_name] = [
|
||||
{
|
||||
"name": p.name,
|
||||
"severity": p.severity,
|
||||
"description": p.description
|
||||
}
|
||||
for p in patterns
|
||||
]
|
||||
return result
|
||||
Reference in New Issue
Block a user