"""Log analyzer orchestrator.""" from collections import Counter, defaultdict from dataclasses import dataclass, field from datetime import datetime from typing import Any, Dict, List, Optional from loglens.parsers.base import LogFormat, ParsedLogEntry from loglens.parsers.factory import ParserFactory from loglens.analyzers.patterns import ErrorPattern, PatternLibrary from loglens.analyzers.severity import SeverityClassifier, SeverityLevel @dataclass class AnalysisResult: """Result of log analysis.""" entries: List[ParsedLogEntry] = field(default_factory=list) format_detected: LogFormat = LogFormat.UNKNOWN total_lines: int = 0 parsed_count: int = 0 error_count: int = 0 warning_count: int = 0 critical_count: int = 0 debug_count: int = 0 pattern_matches: Dict[str, int] = field(default_factory=dict) severity_breakdown: Dict[str, int] = field(default_factory=dict) top_errors: List[Dict[str, Any]] = field(default_factory=list) host_breakdown: Dict[str, int] = field(default_factory=dict) time_range: Optional[tuple] = None analysis_time: datetime = field(default_factory=datetime.now) suggestions: List[str] = field(default_factory=list) class LogAnalyzer: """Orchestrates log parsing and analysis.""" def __init__(self, config: Optional[Dict[str, Any]] = None): self.parser_factory = ParserFactory() self.pattern_library = PatternLibrary() self.severity_classifier = SeverityClassifier( custom_rules=config.get("severity_rules") if config else None ) self.config = config or {} def analyze(self, lines: List[str], format: Optional[LogFormat] = None) -> AnalysisResult: """Analyze a list of log lines.""" result = AnalysisResult( total_lines=len(lines), analysis_time=datetime.now() ) if not lines: return result if format is None: format = self.parser_factory.detect_format_batch(lines) result.format_detected = format entries = self.parser_factory.parse_lines(lines, format) result.entries = entries result.parsed_count = len(entries) for entry in entries: self._analyze_entry(entry) self._compute_statistics(result) return result def _analyze_entry(self, entry: ParsedLogEntry) -> None: """Analyze a single entry.""" message = entry.message or "" raw_text = entry.raw_line patterns = self.pattern_library.detect(raw_text) if patterns: pattern, match = patterns[0] entry.error_pattern = pattern.name severity = self.severity_classifier.classify( level=entry.level, message=message, pattern_match=entry.error_pattern ) entry.severity = severity.value def _compute_statistics(self, result: AnalysisResult) -> None: """Compute statistics from analyzed entries.""" severity_counts = Counter() pattern_counts = Counter() host_counts = Counter() timestamps = [] for entry in result.entries: severity = entry.severity or "unknown" severity_counts[severity] += 1 if entry.error_pattern: pattern_counts[entry.error_pattern] += 1 if entry.host: host_counts[entry.host] += 1 if entry.timestamp: timestamps.append(entry.timestamp) result.severity_breakdown = dict(severity_counts) result.pattern_matches = dict(pattern_counts) result.host_breakdown = dict(host_counts) result.critical_count = severity_counts.get("critical", 0) result.error_count = severity_counts.get("error", 0) result.warning_count = severity_counts.get("warning", 0) result.debug_count = severity_counts.get("debug", 0) if timestamps: result.time_range = (min(timestamps), max(timestamps)) result.top_errors = [ {"pattern": name, "count": count} for name, count in pattern_counts.most_common(10) ] result.suggestions = self._generate_suggestions(result) def _generate_suggestions(self, result: AnalysisResult) -> List[str]: """Generate suggestions based on analysis.""" suggestions = [] if result.critical_count > 0: suggestions.append( f"Found {result.critical_count} critical errors. " "Review immediately - these may indicate system failures." ) if result.error_count > 10: suggestions.append( f"High error volume detected ({result.error_count} errors). " "Consider implementing automated alerting." ) if result.pattern_matches: top_pattern = max(result.pattern_matches, key=result.pattern_matches.get) suggestions.append( f"Most common issue: '{top_pattern}' " f"({result.pattern_matches[top_pattern]} occurrences). " "Prioritize fixing this pattern." ) if result.host_breakdown: top_host = max(result.host_breakdown, key=result.host_breakdown.get) if result.host_breakdown[top_host] > len(result.entries) * 0.5: suggestions.append( f"Host '{top_host}' shows high error concentration. " "Check this host's configuration and resources." ) return suggestions def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult: """Analyze a log file.""" with open(file_path, 'r', encoding='utf-8', errors='replace') as f: lines = f.readlines() return self.analyze(lines, format) def analyze_stdin(self) -> AnalysisResult: """Analyze from stdin.""" import sys lines = sys.stdin.readlines() return self.analyze(lines) def get_pattern_info(self, pattern_name: str) -> Optional[Dict[str, Any]]: """Get information about a pattern.""" for pattern in self.pattern_library.list_patterns(): if pattern.name == pattern_name: return { "name": pattern.name, "pattern": pattern.pattern, "severity": pattern.severity, "description": pattern.description, "suggestion": pattern.suggestion, "group": pattern.group, "enabled": pattern.enabled } return None def list_patterns_by_group(self) -> Dict[str, List[Dict[str, Any]]]: """List all patterns organized by group.""" result = {} for group_name, patterns in self.pattern_library.list_groups().items(): result[group_name] = [ { "name": p.name, "severity": p.severity, "description": p.description } for p in patterns ] return result