diff --git a/loglens/analyzers/analyzer.py b/loglens/analyzers/analyzer.py index 1a1368f..d40dac1 100644 --- a/loglens/analyzers/analyzer.py +++ b/loglens/analyzers/analyzer.py @@ -1,20 +1,21 @@ -"""Log analyzer orchestrator.""" +'''Log analyzer orchestrator.''' -from collections import Counter, defaultdict +from collections import Counter from dataclasses import dataclass, field from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Optional +from loglens.analyzers.patterns import PatternLibrary +from loglens.analyzers.severity import SeverityClassifier from loglens.parsers.base import LogFormat, ParsedLogEntry from loglens.parsers.factory import ParserFactory -from loglens.analyzers.patterns import ErrorPattern, PatternLibrary -from loglens.analyzers.severity import SeverityClassifier, SeverityLevel @dataclass class AnalysisResult: - """Result of log analysis.""" - entries: List[ParsedLogEntry] = field(default_factory=list) + '''Result of log analysis.''' + + entries: list[ParsedLogEntry] = field(default_factory=list) format_detected: LogFormat = LogFormat.UNKNOWN total_lines: int = 0 parsed_count: int = 0 @@ -22,19 +23,19 @@ class AnalysisResult: warning_count: int = 0 critical_count: int = 0 debug_count: int = 0 - pattern_matches: Dict[str, int] = field(default_factory=dict) - severity_breakdown: Dict[str, int] = field(default_factory=dict) - top_errors: List[Dict[str, Any]] = field(default_factory=list) - host_breakdown: Dict[str, int] = field(default_factory=dict) + pattern_matches: dict[str, int] = field(default_factory=dict) + severity_breakdown: dict[str, int] = field(default_factory=dict) + top_errors: list[dict[str, Any]] = field(default_factory=list) + host_breakdown: dict[str, int] = field(default_factory=dict) time_range: Optional[tuple] = None analysis_time: datetime = field(default_factory=datetime.now) - suggestions: List[str] = field(default_factory=list) + suggestions: list[str] = field(default_factory=list) class LogAnalyzer: - """Orchestrates log parsing and analysis.""" + '''Orchestrates log parsing and analysis.''' - def __init__(self, config: Optional[Dict[str, Any]] = None): + def __init__(self, config: Optional[dict[str, Any]] = None): self.parser_factory = ParserFactory() self.pattern_library = PatternLibrary() self.severity_classifier = SeverityClassifier( @@ -42,12 +43,9 @@ class LogAnalyzer: ) self.config = config or {} - def analyze(self, lines: List[str], format: Optional[LogFormat] = None) -> AnalysisResult: - """Analyze a list of log lines.""" - result = AnalysisResult( - total_lines=len(lines), - analysis_time=datetime.now() - ) + def analyze(self, lines: list[str], format: Optional[LogFormat] = None) -> AnalysisResult: + '''Analyze a list of log lines.''' + result = AnalysisResult(total_lines=len(lines), analysis_time=datetime.now()) if not lines: return result @@ -69,7 +67,7 @@ class LogAnalyzer: return result def _analyze_entry(self, entry: ParsedLogEntry) -> None: - """Analyze a single entry.""" + '''Analyze a single entry.''' message = entry.message or "" raw_text = entry.raw_line @@ -79,14 +77,12 @@ class LogAnalyzer: entry.error_pattern = pattern.name severity = self.severity_classifier.classify( - level=entry.level, - message=message, - pattern_match=entry.error_pattern + level=entry.level, message=message, pattern_match=entry.error_pattern ) entry.severity = severity.value def _compute_statistics(self, result: AnalysisResult) -> None: - """Compute statistics from analyzed entries.""" + '''Compute statistics from analyzed entries.''' severity_counts = Counter() pattern_counts = Counter() host_counts = Counter() @@ -118,14 +114,13 @@ class LogAnalyzer: result.time_range = (min(timestamps), max(timestamps)) result.top_errors = [ - {"pattern": name, "count": count} - for name, count in pattern_counts.most_common(10) + {"pattern": name, "count": count} for name, count in pattern_counts.most_common(10) ] result.suggestions = self._generate_suggestions(result) - def _generate_suggestions(self, result: AnalysisResult) -> List[str]: - """Generate suggestions based on analysis.""" + def _generate_suggestions(self, result: AnalysisResult) -> list[str]: + '''Generate suggestions based on analysis.''' suggestions = [] if result.critical_count > 0: @@ -159,20 +154,21 @@ class LogAnalyzer: return suggestions def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult: - """Analyze a log file.""" - with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + '''Analyze a log file.''' + with open(file_path, encoding="utf-8", errors="replace") as f: lines = f.readlines() return self.analyze(lines, format) def analyze_stdin(self) -> AnalysisResult: - """Analyze from stdin.""" + '''Analyze from stdin.''' import sys + lines = sys.stdin.readlines() return self.analyze(lines) - def get_pattern_info(self, pattern_name: str) -> Optional[Dict[str, Any]]: - """Get information about a pattern.""" + def get_pattern_info(self, pattern_name: str) -> Optional[dict[str, Any]]: + '''Get information about a pattern.''' for pattern in self.pattern_library.list_patterns(): if pattern.name == pattern_name: return { @@ -182,20 +178,16 @@ class LogAnalyzer: "description": pattern.description, "suggestion": pattern.suggestion, "group": pattern.group, - "enabled": pattern.enabled + "enabled": pattern.enabled, } return None - def list_patterns_by_group(self) -> Dict[str, List[Dict[str, Any]]]: - """List all patterns organized by group.""" + def list_patterns_by_group(self) -> dict[str, list[dict[str, Any]]]: + '''List all patterns organized by group.''' result = {} for group_name, patterns in self.pattern_library.list_groups().items(): result[group_name] = [ - { - "name": p.name, - "severity": p.severity, - "description": p.description - } + {"name": p.name, "severity": p.severity, "description": p.description} for p in patterns ] return result