import re from collections import Counter from typing import Optional from loglens.analyzers.patterns import PatternLibrary from loglens.analyzers.severity import SeverityClassifier from loglens.parsers.base import LogFormat, ParsedEntry class AnalysisResult: """Result of log analysis.""" def __init__( self, total_lines: int, entries: list[ParsedEntry], format_detected: LogFormat, error_count: int = 0, warning_count: int = 0, critical_count: int = 0, debug_count: int = 0, suggestions: Optional[list[str]] = None, ): self.total_lines = total_lines self.entries = entries self.format_detected = format_detected self.error_count = error_count self.warning_count = warning_count self.critical_count = critical_count self.debug_count = debug_count self.suggestions = suggestions or [] def to_dict(self) -> dict: """Convert to dictionary.""" return { "total_lines": self.total_lines, "entries": [e.to_dict() for e in self.entries], "format_detected": self.format_detected.value, "error_count": self.error_count, "warning_count": self.warning_count, "critical_count": self.critical_count, "debug_count": self.debug_count, "suggestions": self.suggestions, } class LogAnalyzer: """Main analyzer for log files.""" def __init__(self): self.patterns = PatternLibrary() self.severity_classifier = SeverityClassifier() def analyze( self, lines: list[str], format_enum: Optional[LogFormat] = None ) -> AnalysisResult: """Analyze a list of log lines.""" entries = [] error_count = 0 warning_count = 0 critical_count = 0 debug_count = 0 detected_format = format_enum for line in lines: if not line.strip(): continue entry = self._parse_line(line, format_enum) if entry: entries.append(entry) severity = self._classify_entry(entry) entry.severity = severity if severity == "critical": critical_count += 1 elif severity == "error": error_count += 1 elif severity == "warning": warning_count += 1 elif severity == "debug": debug_count += 1 if detected_format is None: detected_format = entry.format suggestions = self._generate_suggestions(entries) return AnalysisResult( total_lines=len(lines), entries=entries, format_detected=detected_format or LogFormat.RAW, error_count=error_count, warning_count=warning_count, critical_count=critical_count, debug_count=debug_count, suggestions=suggestions, ) def analyze_file( self, file_path: str, format_enum: Optional[LogFormat] = None ) -> AnalysisResult: """Analyze a log file.""" with open(file_path, "r") as f: lines = f.readlines() return self.analyze(lines, format_enum) def _parse_line( self, line: str, format_enum: Optional[LogFormat] = None ) -> Optional[ParsedEntry]: """Parse a single log line.""" from loglens.parsers.factory import ParserFactory if format_enum: parser = ParserFactory.get_parser(format_enum) entry = parser.parse(line) if entry: return entry for fmt in LogFormat: if fmt == LogFormat.RAW: continue parser = ParserFactory.get_parser(fmt) entry = parser.parse(line) if entry: return entry return ParsedEntry( raw_line=line.strip(), format=LogFormat.RAW, timestamp=None, level=None, message=line.strip(), metadata={}, ) def _classify_entry(self, entry: ParsedEntry) -> str: """Classify severity of an entry.""" content = entry.message patterns_by_severity = self.patterns.get_patterns_for_content(content) if patterns_by_severity: severities = [p.severity for p in patterns_by_severity] if "critical" in severities: return "critical" elif "error" in severities: return "error" elif "warning" in severities: return "warning" elif "debug" in severities: return "debug" return self.severity_classifier.classify(content, entry.level) def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]: """Generate suggestions based on analysis.""" suggestions = [] error_entries = [e for e in entries if e.severity in ("error", "critical")] if not error_entries: return ["No errors detected. Keep up the good work!"] error_messages = [e.message for e in error_entries] error_counter = Counter(error_messages) common_errors = error_counter.most_common(5) if len(common_errors) > 3: suggestions.append( f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages." ) for error_msg, count in common_errors[:3]: if count > 1: suggestions.append(f"'{error_msg[:50]}...' occurred {count} times") stack_trace_entries = [ e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower() ] if stack_trace_entries: suggestions.append( "Multiple stack traces detected. Consider checking the exception types and their root causes." ) connection_errors = [ e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower() ] if len(connection_errors) > len(error_entries) * 0.3: suggestions.append( "High proportion of connection/timeout errors. Check network connectivity and service availability." ) return suggestions def list_patterns_by_group(self) -> dict[str, list[dict]]: """List all patterns grouped by category.""" return self.patterns.get_all_patterns()