From 7bbe910333162804e6e0f36ff3a72dc18200e38f Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 10:08:35 +0000 Subject: [PATCH] Add analyzers (patterns, severity, analyzer) --- loglens/analyzers/analyzer.py | 299 +++++++++++++++++----------------- 1 file changed, 146 insertions(+), 153 deletions(-) diff --git a/loglens/analyzers/analyzer.py b/loglens/analyzers/analyzer.py index b84d08a..714fc05 100644 --- a/loglens/analyzers/analyzer.py +++ b/loglens/analyzers/analyzer.py @@ -1,200 +1,193 @@ -import re +"""Log analyzer orchestrator.""" + from collections import Counter -from typing import Optional +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional from loglens.analyzers.patterns import PatternLibrary from loglens.analyzers.severity import SeverityClassifier -from loglens.parsers.base import LogFormat, ParsedEntry +from loglens.parsers.base import LogFormat, ParsedLogEntry +from loglens.parsers.factory import ParserFactory +@dataclass class AnalysisResult: """Result of log analysis.""" - def __init__( - self, - total_lines: int, - entries: list[ParsedEntry], - format_detected: LogFormat, - error_count: int = 0, - warning_count: int = 0, - critical_count: int = 0, - debug_count: int = 0, - suggestions: Optional[list[str]] = None, - ): - self.total_lines = total_lines - self.entries = entries - self.format_detected = format_detected - self.error_count = error_count - self.warning_count = warning_count - self.critical_count = critical_count - self.debug_count = debug_count - self.suggestions = suggestions or [] - - def to_dict(self) -> dict: - """Convert to dictionary.""" - return { - "total_lines": self.total_lines, - "entries": [e.to_dict() for e in self.entries], - "format_detected": self.format_detected.value, - "error_count": self.error_count, - "warning_count": self.warning_count, - "critical_count": self.critical_count, - "debug_count": self.debug_count, - "suggestions": self.suggestions, - } + entries: list[ParsedLogEntry] = field(default_factory=list) + format_detected: LogFormat = LogFormat.UNKNOWN + total_lines: int = 0 + parsed_count: int = 0 + error_count: int = 0 + warning_count: int = 0 + critical_count: int = 0 + debug_count: int = 0 + pattern_matches: dict[str, int] = field(default_factory=dict) + severity_breakdown: dict[str, int] = field(default_factory=dict) + top_errors: list[dict[str, Any]] = field(default_factory=list) + host_breakdown: dict[str, int] = field(default_factory=dict) + time_range: Optional[tuple] = None + analysis_time: datetime = field(default_factory=datetime.now) + suggestions: list[str] = field(default_factory=list) class LogAnalyzer: - """Main analyzer for log files.""" + """Orchestrates log parsing and analysis.""" - def __init__(self): - self.patterns = PatternLibrary() - self.severity_classifier = SeverityClassifier() + def __init__(self, config: Optional[dict[str, Any]] = None): + self.parser_factory = ParserFactory() + self.pattern_library = PatternLibrary() + self.severity_classifier = SeverityClassifier( + custom_rules=config.get("severity_rules") if config else None + ) + self.config = config or {} - def analyze( - self, lines: list[str], format_enum: Optional[LogFormat] = None - ) -> AnalysisResult: + def analyze(self, lines: list[str], format: Optional[LogFormat] = None) -> AnalysisResult: """Analyze a list of log lines.""" - entries = [] - error_count = 0 - warning_count = 0 - critical_count = 0 - debug_count = 0 + result = AnalysisResult(total_lines=len(lines), analysis_time=datetime.now()) - detected_format = format_enum + if not lines: + return result - for line in lines: - if not line.strip(): - continue + if format is None: + format = self.parser_factory.detect_format_batch(lines) - entry = self._parse_line(line, format_enum) - if entry: - entries.append(entry) - severity = self._classify_entry(entry) - entry.severity = severity + result.format_detected = format - if severity == "critical": - critical_count += 1 - elif severity == "error": - error_count += 1 - elif severity == "warning": - warning_count += 1 - elif severity == "debug": - debug_count += 1 + entries = self.parser_factory.parse_lines(lines, format) + result.entries = entries + result.parsed_count = len(entries) - if detected_format is None: - detected_format = entry.format + for entry in entries: + self._analyze_entry(entry) - suggestions = self._generate_suggestions(entries) + self._compute_statistics(result) - return AnalysisResult( - total_lines=len(lines), - entries=entries, - format_detected=detected_format or LogFormat.RAW, - error_count=error_count, - warning_count=warning_count, - critical_count=critical_count, - debug_count=debug_count, - suggestions=suggestions, + return result + + def _analyze_entry(self, entry: ParsedLogEntry) -> None: + """Analyze a single entry.""" + message = entry.message or "" + raw_text = entry.raw_line + + patterns = self.pattern_library.detect(raw_text) + if patterns: + pattern, match = patterns[0] + entry.error_pattern = pattern.name + + severity = self.severity_classifier.classify( + level=entry.level, message=message, pattern_match=entry.error_pattern ) + entry.severity = severity.value - def analyze_file( - self, file_path: str, format_enum: Optional[LogFormat] = None - ) -> AnalysisResult: - """Analyze a log file.""" - with open(file_path, "r") as f: - lines = f.readlines() - return self.analyze(lines, format_enum) + def _compute_statistics(self, result: AnalysisResult) -> None: + """Compute statistics from analyzed entries.""" + severity_counts = Counter() + pattern_counts = Counter() + host_counts = Counter() + timestamps = [] - def _parse_line( - self, line: str, format_enum: Optional[LogFormat] = None - ) -> Optional[ParsedEntry]: - """Parse a single log line.""" - from loglens.parsers.factory import ParserFactory + for entry in result.entries: + severity = entry.severity or "unknown" + severity_counts[severity] += 1 - if format_enum: - parser = ParserFactory.get_parser(format_enum) - entry = parser.parse(line) - if entry: - return entry + if entry.error_pattern: + pattern_counts[entry.error_pattern] += 1 - for fmt in LogFormat: - if fmt == LogFormat.RAW: - continue - parser = ParserFactory.get_parser(fmt) - entry = parser.parse(line) - if entry: - return entry + if entry.host: + host_counts[entry.host] += 1 - return ParsedEntry( - raw_line=line.strip(), - format=LogFormat.RAW, - timestamp=None, - level=None, - message=line.strip(), - metadata={}, - ) + if entry.timestamp: + timestamps.append(entry.timestamp) - def _classify_entry(self, entry: ParsedEntry) -> str: - """Classify severity of an entry.""" - content = entry.message + result.severity_breakdown = dict(severity_counts) + result.pattern_matches = dict(pattern_counts) + result.host_breakdown = dict(host_counts) - patterns_by_severity = self.patterns.get_patterns_for_content(content) + result.critical_count = severity_counts.get("critical", 0) + result.error_count = severity_counts.get("error", 0) + result.warning_count = severity_counts.get("warning", 0) + result.debug_count = severity_counts.get("debug", 0) - if patterns_by_severity: - severities = [p.severity for p in patterns_by_severity] - if "critical" in severities: - return "critical" - elif "error" in severities: - return "error" - elif "warning" in severities: - return "warning" - elif "debug" in severities: - return "debug" + if timestamps: + result.time_range = (min(timestamps), max(timestamps)) - return self.severity_classifier.classify(content, entry.level) + result.top_errors = [ + {"pattern": name, "count": count} for name, count in pattern_counts.most_common(10) + ] - def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]: + result.suggestions = self._generate_suggestions(result) + + def _generate_suggestions(self, result: AnalysisResult) -> list[str]: """Generate suggestions based on analysis.""" suggestions = [] - error_entries = [e for e in entries if e.severity in ("error", "critical")] - - if not error_entries: - return ["No errors detected. Keep up the good work!"] - - error_messages = [e.message for e in error_entries] - error_counter = Counter(error_messages) - - common_errors = error_counter.most_common(5) - - if len(common_errors) > 3: + if result.critical_count > 0: suggestions.append( - f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages." + f"Found {result.critical_count} critical errors. " + "Review immediately - these may indicate system failures." ) - for error_msg, count in common_errors[:3]: - if count > 1: - suggestions.append(f"'{error_msg[:50]}...' occurred {count} times") - - stack_trace_entries = [ - e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower() - ] - if stack_trace_entries: + if result.error_count > 10: suggestions.append( - "Multiple stack traces detected. Consider checking the exception types and their root causes." + f"High error volume detected ({result.error_count} errors). " + "Consider implementing automated alerting." ) - connection_errors = [ - e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower() - ] - if len(connection_errors) > len(error_entries) * 0.3: + if result.pattern_matches: + top_pattern = max(result.pattern_matches, key=result.pattern_matches.get) suggestions.append( - "High proportion of connection/timeout errors. Check network connectivity and service availability." + f"Most common issue: '{top_pattern}' " + f"({result.pattern_matches[top_pattern]} occurrences). " + "Prioritize fixing this pattern." ) + if result.host_breakdown: + top_host = max(result.host_breakdown, key=result.host_breakdown.get) + if result.host_breakdown[top_host] > len(result.entries) * 0.5: + suggestions.append( + f"Host '{top_host}' shows high error concentration. " + "Check this host's configuration and resources." + ) + return suggestions - def list_patterns_by_group(self) -> dict[str, list[dict]]: - """List all patterns grouped by category.""" - return self.patterns.get_all_patterns() + def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult: + """Analyze a log file.""" + with open(file_path, encoding="utf-8", errors="replace") as f: + lines = f.readlines() + + return self.analyze(lines, format) + + def analyze_stdin(self) -> AnalysisResult: + """Analyze from stdin.""" + import sys + + lines = sys.stdin.readlines() + return self.analyze(lines) + + def get_pattern_info(self, pattern_name: str) -> Optional[dict[str, Any]]: + """Get information about a pattern.""" + for pattern in self.pattern_library.list_patterns(): + if pattern.name == pattern_name: + return { + "name": pattern.name, + "pattern": pattern.pattern, + "severity": pattern.severity, + "description": pattern.description, + "suggestion": pattern.suggestion, + "group": pattern.group, + "enabled": pattern.enabled, + } + return None + + def list_patterns_by_group(self) -> dict[str, list[dict[str, Any]]]: + """List all patterns organized by group.""" + result = {} + for group_name, patterns in self.pattern_library.list_groups().items(): + result[group_name] = [ + {"name": p.name, "severity": p.severity, "description": p.description} + for p in patterns + ] + return result