diff --git a/loglens/analyzers/analyzer.py b/loglens/analyzers/analyzer.py index d40dac1..b84d08a 100644 --- a/loglens/analyzers/analyzer.py +++ b/loglens/analyzers/analyzer.py @@ -1,193 +1,200 @@ -'''Log analyzer orchestrator.''' - +import re from collections import Counter -from dataclasses import dataclass, field -from datetime import datetime -from typing import Any, Optional +from typing import Optional from loglens.analyzers.patterns import PatternLibrary from loglens.analyzers.severity import SeverityClassifier -from loglens.parsers.base import LogFormat, ParsedLogEntry -from loglens.parsers.factory import ParserFactory +from loglens.parsers.base import LogFormat, ParsedEntry -@dataclass class AnalysisResult: - '''Result of log analysis.''' + """Result of log analysis.""" - entries: list[ParsedLogEntry] = field(default_factory=list) - format_detected: LogFormat = LogFormat.UNKNOWN - total_lines: int = 0 - parsed_count: int = 0 - error_count: int = 0 - warning_count: int = 0 - critical_count: int = 0 - debug_count: int = 0 - pattern_matches: dict[str, int] = field(default_factory=dict) - severity_breakdown: dict[str, int] = field(default_factory=dict) - top_errors: list[dict[str, Any]] = field(default_factory=list) - host_breakdown: dict[str, int] = field(default_factory=dict) - time_range: Optional[tuple] = None - analysis_time: datetime = field(default_factory=datetime.now) - suggestions: list[str] = field(default_factory=list) + def __init__( + self, + total_lines: int, + entries: list[ParsedEntry], + format_detected: LogFormat, + error_count: int = 0, + warning_count: int = 0, + critical_count: int = 0, + debug_count: int = 0, + suggestions: Optional[list[str]] = None, + ): + self.total_lines = total_lines + self.entries = entries + self.format_detected = format_detected + self.error_count = error_count + self.warning_count = warning_count + self.critical_count = critical_count + self.debug_count = debug_count + self.suggestions = suggestions or [] + + def to_dict(self) -> dict: + """Convert to dictionary.""" + return { + "total_lines": self.total_lines, + "entries": [e.to_dict() for e in self.entries], + "format_detected": self.format_detected.value, + "error_count": self.error_count, + "warning_count": self.warning_count, + "critical_count": self.critical_count, + "debug_count": self.debug_count, + "suggestions": self.suggestions, + } class LogAnalyzer: - '''Orchestrates log parsing and analysis.''' + """Main analyzer for log files.""" - def __init__(self, config: Optional[dict[str, Any]] = None): - self.parser_factory = ParserFactory() - self.pattern_library = PatternLibrary() - self.severity_classifier = SeverityClassifier( - custom_rules=config.get("severity_rules") if config else None + def __init__(self): + self.patterns = PatternLibrary() + self.severity_classifier = SeverityClassifier() + + def analyze( + self, lines: list[str], format_enum: Optional[LogFormat] = None + ) -> AnalysisResult: + """Analyze a list of log lines.""" + entries = [] + error_count = 0 + warning_count = 0 + critical_count = 0 + debug_count = 0 + + detected_format = format_enum + + for line in lines: + if not line.strip(): + continue + + entry = self._parse_line(line, format_enum) + if entry: + entries.append(entry) + severity = self._classify_entry(entry) + entry.severity = severity + + if severity == "critical": + critical_count += 1 + elif severity == "error": + error_count += 1 + elif severity == "warning": + warning_count += 1 + elif severity == "debug": + debug_count += 1 + + if detected_format is None: + detected_format = entry.format + + suggestions = self._generate_suggestions(entries) + + return AnalysisResult( + total_lines=len(lines), + entries=entries, + format_detected=detected_format or LogFormat.RAW, + error_count=error_count, + warning_count=warning_count, + critical_count=critical_count, + debug_count=debug_count, + suggestions=suggestions, ) - self.config = config or {} - def analyze(self, lines: list[str], format: Optional[LogFormat] = None) -> AnalysisResult: - '''Analyze a list of log lines.''' - result = AnalysisResult(total_lines=len(lines), analysis_time=datetime.now()) + def analyze_file( + self, file_path: str, format_enum: Optional[LogFormat] = None + ) -> AnalysisResult: + """Analyze a log file.""" + with open(file_path, "r") as f: + lines = f.readlines() + return self.analyze(lines, format_enum) - if not lines: - return result + def _parse_line( + self, line: str, format_enum: Optional[LogFormat] = None + ) -> Optional[ParsedEntry]: + """Parse a single log line.""" + from loglens.parsers.factory import ParserFactory - if format is None: - format = self.parser_factory.detect_format_batch(lines) + if format_enum: + parser = ParserFactory.get_parser(format_enum) + entry = parser.parse(line) + if entry: + return entry - result.format_detected = format + for fmt in LogFormat: + if fmt == LogFormat.RAW: + continue + parser = ParserFactory.get_parser(fmt) + entry = parser.parse(line) + if entry: + return entry - entries = self.parser_factory.parse_lines(lines, format) - result.entries = entries - result.parsed_count = len(entries) - - for entry in entries: - self._analyze_entry(entry) - - self._compute_statistics(result) - - return result - - def _analyze_entry(self, entry: ParsedLogEntry) -> None: - '''Analyze a single entry.''' - message = entry.message or "" - raw_text = entry.raw_line - - patterns = self.pattern_library.detect(raw_text) - if patterns: - pattern, match = patterns[0] - entry.error_pattern = pattern.name - - severity = self.severity_classifier.classify( - level=entry.level, message=message, pattern_match=entry.error_pattern + return ParsedEntry( + raw_line=line.strip(), + format=LogFormat.RAW, + timestamp=None, + level=None, + message=line.strip(), + metadata={}, ) - entry.severity = severity.value - def _compute_statistics(self, result: AnalysisResult) -> None: - '''Compute statistics from analyzed entries.''' - severity_counts = Counter() - pattern_counts = Counter() - host_counts = Counter() - timestamps = [] + def _classify_entry(self, entry: ParsedEntry) -> str: + """Classify severity of an entry.""" + content = entry.message - for entry in result.entries: - severity = entry.severity or "unknown" - severity_counts[severity] += 1 + patterns_by_severity = self.patterns.get_patterns_for_content(content) - if entry.error_pattern: - pattern_counts[entry.error_pattern] += 1 + if patterns_by_severity: + severities = [p.severity for p in patterns_by_severity] + if "critical" in severities: + return "critical" + elif "error" in severities: + return "error" + elif "warning" in severities: + return "warning" + elif "debug" in severities: + return "debug" - if entry.host: - host_counts[entry.host] += 1 + return self.severity_classifier.classify(content, entry.level) - if entry.timestamp: - timestamps.append(entry.timestamp) - - result.severity_breakdown = dict(severity_counts) - result.pattern_matches = dict(pattern_counts) - result.host_breakdown = dict(host_counts) - - result.critical_count = severity_counts.get("critical", 0) - result.error_count = severity_counts.get("error", 0) - result.warning_count = severity_counts.get("warning", 0) - result.debug_count = severity_counts.get("debug", 0) - - if timestamps: - result.time_range = (min(timestamps), max(timestamps)) - - result.top_errors = [ - {"pattern": name, "count": count} for name, count in pattern_counts.most_common(10) - ] - - result.suggestions = self._generate_suggestions(result) - - def _generate_suggestions(self, result: AnalysisResult) -> list[str]: - '''Generate suggestions based on analysis.''' + def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]: + """Generate suggestions based on analysis.""" suggestions = [] - if result.critical_count > 0: + error_entries = [e for e in entries if e.severity in ("error", "critical")] + + if not error_entries: + return ["No errors detected. Keep up the good work!"] + + error_messages = [e.message for e in error_entries] + error_counter = Counter(error_messages) + + common_errors = error_counter.most_common(5) + + if len(common_errors) > 3: suggestions.append( - f"Found {result.critical_count} critical errors. " - "Review immediately - these may indicate system failures." + f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages." ) - if result.error_count > 10: + for error_msg, count in common_errors[:3]: + if count > 1: + suggestions.append(f"'{error_msg[:50]}...' occurred {count} times") + + stack_trace_entries = [ + e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower() + ] + if stack_trace_entries: suggestions.append( - f"High error volume detected ({result.error_count} errors). " - "Consider implementing automated alerting." + "Multiple stack traces detected. Consider checking the exception types and their root causes." ) - if result.pattern_matches: - top_pattern = max(result.pattern_matches, key=result.pattern_matches.get) + connection_errors = [ + e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower() + ] + if len(connection_errors) > len(error_entries) * 0.3: suggestions.append( - f"Most common issue: '{top_pattern}' " - f"({result.pattern_matches[top_pattern]} occurrences). " - "Prioritize fixing this pattern." + "High proportion of connection/timeout errors. Check network connectivity and service availability." ) - if result.host_breakdown: - top_host = max(result.host_breakdown, key=result.host_breakdown.get) - if result.host_breakdown[top_host] > len(result.entries) * 0.5: - suggestions.append( - f"Host '{top_host}' shows high error concentration. " - "Check this host's configuration and resources." - ) - return suggestions - def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult: - '''Analyze a log file.''' - with open(file_path, encoding="utf-8", errors="replace") as f: - lines = f.readlines() - - return self.analyze(lines, format) - - def analyze_stdin(self) -> AnalysisResult: - '''Analyze from stdin.''' - import sys - - lines = sys.stdin.readlines() - return self.analyze(lines) - - def get_pattern_info(self, pattern_name: str) -> Optional[dict[str, Any]]: - '''Get information about a pattern.''' - for pattern in self.pattern_library.list_patterns(): - if pattern.name == pattern_name: - return { - "name": pattern.name, - "pattern": pattern.pattern, - "severity": pattern.severity, - "description": pattern.description, - "suggestion": pattern.suggestion, - "group": pattern.group, - "enabled": pattern.enabled, - } - return None - - def list_patterns_by_group(self) -> dict[str, list[dict[str, Any]]]: - '''List all patterns organized by group.''' - result = {} - for group_name, patterns in self.pattern_library.list_groups().items(): - result[group_name] = [ - {"name": p.name, "severity": p.severity, "description": p.description} - for p in patterns - ] - return result + def list_patterns_by_group(self) -> dict[str, list[dict]]: + """List all patterns grouped by category.""" + return self.patterns.get_all_patterns()