fix: resolve CI/CD linting and formatting issues
Some checks failed
Some checks failed
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035) - Removed unused imports across all modules - Fixed unused variables by replacing with _ prefix - Added missing Optional type imports - Reorganized imports for proper sorting (I001) - Applied black formatting to all source files
This commit is contained in:
@@ -1,20 +1,21 @@
|
||||
"""Log analyzer orchestrator."""
|
||||
'''Log analyzer orchestrator.'''
|
||||
|
||||
from collections import Counter, defaultdict
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from loglens.analyzers.patterns import PatternLibrary
|
||||
from loglens.analyzers.severity import SeverityClassifier
|
||||
from loglens.parsers.base import LogFormat, ParsedLogEntry
|
||||
from loglens.parsers.factory import ParserFactory
|
||||
from loglens.analyzers.patterns import ErrorPattern, PatternLibrary
|
||||
from loglens.analyzers.severity import SeverityClassifier, SeverityLevel
|
||||
|
||||
|
||||
@dataclass
|
||||
class AnalysisResult:
|
||||
"""Result of log analysis."""
|
||||
entries: List[ParsedLogEntry] = field(default_factory=list)
|
||||
'''Result of log analysis.'''
|
||||
|
||||
entries: list[ParsedLogEntry] = field(default_factory=list)
|
||||
format_detected: LogFormat = LogFormat.UNKNOWN
|
||||
total_lines: int = 0
|
||||
parsed_count: int = 0
|
||||
@@ -22,19 +23,19 @@ class AnalysisResult:
|
||||
warning_count: int = 0
|
||||
critical_count: int = 0
|
||||
debug_count: int = 0
|
||||
pattern_matches: Dict[str, int] = field(default_factory=dict)
|
||||
severity_breakdown: Dict[str, int] = field(default_factory=dict)
|
||||
top_errors: List[Dict[str, Any]] = field(default_factory=list)
|
||||
host_breakdown: Dict[str, int] = field(default_factory=dict)
|
||||
pattern_matches: dict[str, int] = field(default_factory=dict)
|
||||
severity_breakdown: dict[str, int] = field(default_factory=dict)
|
||||
top_errors: list[dict[str, Any]] = field(default_factory=list)
|
||||
host_breakdown: dict[str, int] = field(default_factory=dict)
|
||||
time_range: Optional[tuple] = None
|
||||
analysis_time: datetime = field(default_factory=datetime.now)
|
||||
suggestions: List[str] = field(default_factory=list)
|
||||
suggestions: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class LogAnalyzer:
|
||||
"""Orchestrates log parsing and analysis."""
|
||||
'''Orchestrates log parsing and analysis.'''
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
||||
def __init__(self, config: Optional[dict[str, Any]] = None):
|
||||
self.parser_factory = ParserFactory()
|
||||
self.pattern_library = PatternLibrary()
|
||||
self.severity_classifier = SeverityClassifier(
|
||||
@@ -42,12 +43,9 @@ class LogAnalyzer:
|
||||
)
|
||||
self.config = config or {}
|
||||
|
||||
def analyze(self, lines: List[str], format: Optional[LogFormat] = None) -> AnalysisResult:
|
||||
"""Analyze a list of log lines."""
|
||||
result = AnalysisResult(
|
||||
total_lines=len(lines),
|
||||
analysis_time=datetime.now()
|
||||
)
|
||||
def analyze(self, lines: list[str], format: Optional[LogFormat] = None) -> AnalysisResult:
|
||||
'''Analyze a list of log lines.'''
|
||||
result = AnalysisResult(total_lines=len(lines), analysis_time=datetime.now())
|
||||
|
||||
if not lines:
|
||||
return result
|
||||
@@ -69,7 +67,7 @@ class LogAnalyzer:
|
||||
return result
|
||||
|
||||
def _analyze_entry(self, entry: ParsedLogEntry) -> None:
|
||||
"""Analyze a single entry."""
|
||||
'''Analyze a single entry.'''
|
||||
message = entry.message or ""
|
||||
raw_text = entry.raw_line
|
||||
|
||||
@@ -79,14 +77,12 @@ class LogAnalyzer:
|
||||
entry.error_pattern = pattern.name
|
||||
|
||||
severity = self.severity_classifier.classify(
|
||||
level=entry.level,
|
||||
message=message,
|
||||
pattern_match=entry.error_pattern
|
||||
level=entry.level, message=message, pattern_match=entry.error_pattern
|
||||
)
|
||||
entry.severity = severity.value
|
||||
|
||||
def _compute_statistics(self, result: AnalysisResult) -> None:
|
||||
"""Compute statistics from analyzed entries."""
|
||||
'''Compute statistics from analyzed entries.'''
|
||||
severity_counts = Counter()
|
||||
pattern_counts = Counter()
|
||||
host_counts = Counter()
|
||||
@@ -118,14 +114,13 @@ class LogAnalyzer:
|
||||
result.time_range = (min(timestamps), max(timestamps))
|
||||
|
||||
result.top_errors = [
|
||||
{"pattern": name, "count": count}
|
||||
for name, count in pattern_counts.most_common(10)
|
||||
{"pattern": name, "count": count} for name, count in pattern_counts.most_common(10)
|
||||
]
|
||||
|
||||
result.suggestions = self._generate_suggestions(result)
|
||||
|
||||
def _generate_suggestions(self, result: AnalysisResult) -> List[str]:
|
||||
"""Generate suggestions based on analysis."""
|
||||
def _generate_suggestions(self, result: AnalysisResult) -> list[str]:
|
||||
'''Generate suggestions based on analysis.'''
|
||||
suggestions = []
|
||||
|
||||
if result.critical_count > 0:
|
||||
@@ -159,20 +154,21 @@ class LogAnalyzer:
|
||||
return suggestions
|
||||
|
||||
def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult:
|
||||
"""Analyze a log file."""
|
||||
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
'''Analyze a log file.'''
|
||||
with open(file_path, encoding="utf-8", errors="replace") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
return self.analyze(lines, format)
|
||||
|
||||
def analyze_stdin(self) -> AnalysisResult:
|
||||
"""Analyze from stdin."""
|
||||
'''Analyze from stdin.'''
|
||||
import sys
|
||||
|
||||
lines = sys.stdin.readlines()
|
||||
return self.analyze(lines)
|
||||
|
||||
def get_pattern_info(self, pattern_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get information about a pattern."""
|
||||
def get_pattern_info(self, pattern_name: str) -> Optional[dict[str, Any]]:
|
||||
'''Get information about a pattern.'''
|
||||
for pattern in self.pattern_library.list_patterns():
|
||||
if pattern.name == pattern_name:
|
||||
return {
|
||||
@@ -182,20 +178,16 @@ class LogAnalyzer:
|
||||
"description": pattern.description,
|
||||
"suggestion": pattern.suggestion,
|
||||
"group": pattern.group,
|
||||
"enabled": pattern.enabled
|
||||
"enabled": pattern.enabled,
|
||||
}
|
||||
return None
|
||||
|
||||
def list_patterns_by_group(self) -> Dict[str, List[Dict[str, Any]]]:
|
||||
"""List all patterns organized by group."""
|
||||
def list_patterns_by_group(self) -> dict[str, list[dict[str, Any]]]:
|
||||
'''List all patterns organized by group.'''
|
||||
result = {}
|
||||
for group_name, patterns in self.pattern_library.list_groups().items():
|
||||
result[group_name] = [
|
||||
{
|
||||
"name": p.name,
|
||||
"severity": p.severity,
|
||||
"description": p.description
|
||||
}
|
||||
{"name": p.name, "severity": p.severity, "description": p.description}
|
||||
for p in patterns
|
||||
]
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user