diff --git a/loglens/analyzers/analyzer.py b/loglens/analyzers/analyzer.py new file mode 100644 index 0000000..1a1368f --- /dev/null +++ b/loglens/analyzers/analyzer.py @@ -0,0 +1,201 @@ +"""Log analyzer orchestrator.""" + +from collections import Counter, defaultdict +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, List, Optional + +from loglens.parsers.base import LogFormat, ParsedLogEntry +from loglens.parsers.factory import ParserFactory +from loglens.analyzers.patterns import ErrorPattern, PatternLibrary +from loglens.analyzers.severity import SeverityClassifier, SeverityLevel + + +@dataclass +class AnalysisResult: + """Result of log analysis.""" + entries: List[ParsedLogEntry] = field(default_factory=list) + format_detected: LogFormat = LogFormat.UNKNOWN + total_lines: int = 0 + parsed_count: int = 0 + error_count: int = 0 + warning_count: int = 0 + critical_count: int = 0 + debug_count: int = 0 + pattern_matches: Dict[str, int] = field(default_factory=dict) + severity_breakdown: Dict[str, int] = field(default_factory=dict) + top_errors: List[Dict[str, Any]] = field(default_factory=list) + host_breakdown: Dict[str, int] = field(default_factory=dict) + time_range: Optional[tuple] = None + analysis_time: datetime = field(default_factory=datetime.now) + suggestions: List[str] = field(default_factory=list) + + +class LogAnalyzer: + """Orchestrates log parsing and analysis.""" + + def __init__(self, config: Optional[Dict[str, Any]] = None): + self.parser_factory = ParserFactory() + self.pattern_library = PatternLibrary() + self.severity_classifier = SeverityClassifier( + custom_rules=config.get("severity_rules") if config else None + ) + self.config = config or {} + + def analyze(self, lines: List[str], format: Optional[LogFormat] = None) -> AnalysisResult: + """Analyze a list of log lines.""" + result = AnalysisResult( + total_lines=len(lines), + analysis_time=datetime.now() + ) + + if not lines: + return result + + if format is None: + format = self.parser_factory.detect_format_batch(lines) + + result.format_detected = format + + entries = self.parser_factory.parse_lines(lines, format) + result.entries = entries + result.parsed_count = len(entries) + + for entry in entries: + self._analyze_entry(entry) + + self._compute_statistics(result) + + return result + + def _analyze_entry(self, entry: ParsedLogEntry) -> None: + """Analyze a single entry.""" + message = entry.message or "" + raw_text = entry.raw_line + + patterns = self.pattern_library.detect(raw_text) + if patterns: + pattern, match = patterns[0] + entry.error_pattern = pattern.name + + severity = self.severity_classifier.classify( + level=entry.level, + message=message, + pattern_match=entry.error_pattern + ) + entry.severity = severity.value + + def _compute_statistics(self, result: AnalysisResult) -> None: + """Compute statistics from analyzed entries.""" + severity_counts = Counter() + pattern_counts = Counter() + host_counts = Counter() + timestamps = [] + + for entry in result.entries: + severity = entry.severity or "unknown" + severity_counts[severity] += 1 + + if entry.error_pattern: + pattern_counts[entry.error_pattern] += 1 + + if entry.host: + host_counts[entry.host] += 1 + + if entry.timestamp: + timestamps.append(entry.timestamp) + + result.severity_breakdown = dict(severity_counts) + result.pattern_matches = dict(pattern_counts) + result.host_breakdown = dict(host_counts) + + result.critical_count = severity_counts.get("critical", 0) + result.error_count = severity_counts.get("error", 0) + result.warning_count = severity_counts.get("warning", 0) + result.debug_count = severity_counts.get("debug", 0) + + if timestamps: + result.time_range = (min(timestamps), max(timestamps)) + + result.top_errors = [ + {"pattern": name, "count": count} + for name, count in pattern_counts.most_common(10) + ] + + result.suggestions = self._generate_suggestions(result) + + def _generate_suggestions(self, result: AnalysisResult) -> List[str]: + """Generate suggestions based on analysis.""" + suggestions = [] + + if result.critical_count > 0: + suggestions.append( + f"Found {result.critical_count} critical errors. " + "Review immediately - these may indicate system failures." + ) + + if result.error_count > 10: + suggestions.append( + f"High error volume detected ({result.error_count} errors). " + "Consider implementing automated alerting." + ) + + if result.pattern_matches: + top_pattern = max(result.pattern_matches, key=result.pattern_matches.get) + suggestions.append( + f"Most common issue: '{top_pattern}' " + f"({result.pattern_matches[top_pattern]} occurrences). " + "Prioritize fixing this pattern." + ) + + if result.host_breakdown: + top_host = max(result.host_breakdown, key=result.host_breakdown.get) + if result.host_breakdown[top_host] > len(result.entries) * 0.5: + suggestions.append( + f"Host '{top_host}' shows high error concentration. " + "Check this host's configuration and resources." + ) + + return suggestions + + def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult: + """Analyze a log file.""" + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + lines = f.readlines() + + return self.analyze(lines, format) + + def analyze_stdin(self) -> AnalysisResult: + """Analyze from stdin.""" + import sys + lines = sys.stdin.readlines() + return self.analyze(lines) + + def get_pattern_info(self, pattern_name: str) -> Optional[Dict[str, Any]]: + """Get information about a pattern.""" + for pattern in self.pattern_library.list_patterns(): + if pattern.name == pattern_name: + return { + "name": pattern.name, + "pattern": pattern.pattern, + "severity": pattern.severity, + "description": pattern.description, + "suggestion": pattern.suggestion, + "group": pattern.group, + "enabled": pattern.enabled + } + return None + + def list_patterns_by_group(self) -> Dict[str, List[Dict[str, Any]]]: + """List all patterns organized by group.""" + result = {} + for group_name, patterns in self.pattern_library.list_groups().items(): + result[group_name] = [ + { + "name": p.name, + "severity": p.severity, + "description": p.description + } + for p in patterns + ] + return result