fix: add --version option to Click CLI group
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
This commit is contained in:
2026-02-02 09:24:59 +00:00
parent 5f4748b12e
commit 3e220bb139

View File

@@ -1,193 +1,200 @@
'''Log analyzer orchestrator.'''
import re
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional
from typing import Optional
from loglens.analyzers.patterns import PatternLibrary
from loglens.analyzers.severity import SeverityClassifier
from loglens.parsers.base import LogFormat, ParsedLogEntry
from loglens.parsers.factory import ParserFactory
from loglens.parsers.base import LogFormat, ParsedEntry
@dataclass
class AnalysisResult:
'''Result of log analysis.'''
"""Result of log analysis."""
entries: list[ParsedLogEntry] = field(default_factory=list)
format_detected: LogFormat = LogFormat.UNKNOWN
total_lines: int = 0
parsed_count: int = 0
error_count: int = 0
warning_count: int = 0
critical_count: int = 0
debug_count: int = 0
pattern_matches: dict[str, int] = field(default_factory=dict)
severity_breakdown: dict[str, int] = field(default_factory=dict)
top_errors: list[dict[str, Any]] = field(default_factory=list)
host_breakdown: dict[str, int] = field(default_factory=dict)
time_range: Optional[tuple] = None
analysis_time: datetime = field(default_factory=datetime.now)
suggestions: list[str] = field(default_factory=list)
def __init__(
self,
total_lines: int,
entries: list[ParsedEntry],
format_detected: LogFormat,
error_count: int = 0,
warning_count: int = 0,
critical_count: int = 0,
debug_count: int = 0,
suggestions: Optional[list[str]] = None,
):
self.total_lines = total_lines
self.entries = entries
self.format_detected = format_detected
self.error_count = error_count
self.warning_count = warning_count
self.critical_count = critical_count
self.debug_count = debug_count
self.suggestions = suggestions or []
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"total_lines": self.total_lines,
"entries": [e.to_dict() for e in self.entries],
"format_detected": self.format_detected.value,
"error_count": self.error_count,
"warning_count": self.warning_count,
"critical_count": self.critical_count,
"debug_count": self.debug_count,
"suggestions": self.suggestions,
}
class LogAnalyzer:
'''Orchestrates log parsing and analysis.'''
"""Main analyzer for log files."""
def __init__(self, config: Optional[dict[str, Any]] = None):
self.parser_factory = ParserFactory()
self.pattern_library = PatternLibrary()
self.severity_classifier = SeverityClassifier(
custom_rules=config.get("severity_rules") if config else None
def __init__(self):
self.patterns = PatternLibrary()
self.severity_classifier = SeverityClassifier()
def analyze(
self, lines: list[str], format_enum: Optional[LogFormat] = None
) -> AnalysisResult:
"""Analyze a list of log lines."""
entries = []
error_count = 0
warning_count = 0
critical_count = 0
debug_count = 0
detected_format = format_enum
for line in lines:
if not line.strip():
continue
entry = self._parse_line(line, format_enum)
if entry:
entries.append(entry)
severity = self._classify_entry(entry)
entry.severity = severity
if severity == "critical":
critical_count += 1
elif severity == "error":
error_count += 1
elif severity == "warning":
warning_count += 1
elif severity == "debug":
debug_count += 1
if detected_format is None:
detected_format = entry.format
suggestions = self._generate_suggestions(entries)
return AnalysisResult(
total_lines=len(lines),
entries=entries,
format_detected=detected_format or LogFormat.RAW,
error_count=error_count,
warning_count=warning_count,
critical_count=critical_count,
debug_count=debug_count,
suggestions=suggestions,
)
self.config = config or {}
def analyze(self, lines: list[str], format: Optional[LogFormat] = None) -> AnalysisResult:
'''Analyze a list of log lines.'''
result = AnalysisResult(total_lines=len(lines), analysis_time=datetime.now())
def analyze_file(
self, file_path: str, format_enum: Optional[LogFormat] = None
) -> AnalysisResult:
"""Analyze a log file."""
with open(file_path, "r") as f:
lines = f.readlines()
return self.analyze(lines, format_enum)
if not lines:
return result
def _parse_line(
self, line: str, format_enum: Optional[LogFormat] = None
) -> Optional[ParsedEntry]:
"""Parse a single log line."""
from loglens.parsers.factory import ParserFactory
if format is None:
format = self.parser_factory.detect_format_batch(lines)
if format_enum:
parser = ParserFactory.get_parser(format_enum)
entry = parser.parse(line)
if entry:
return entry
result.format_detected = format
for fmt in LogFormat:
if fmt == LogFormat.RAW:
continue
parser = ParserFactory.get_parser(fmt)
entry = parser.parse(line)
if entry:
return entry
entries = self.parser_factory.parse_lines(lines, format)
result.entries = entries
result.parsed_count = len(entries)
for entry in entries:
self._analyze_entry(entry)
self._compute_statistics(result)
return result
def _analyze_entry(self, entry: ParsedLogEntry) -> None:
'''Analyze a single entry.'''
message = entry.message or ""
raw_text = entry.raw_line
patterns = self.pattern_library.detect(raw_text)
if patterns:
pattern, match = patterns[0]
entry.error_pattern = pattern.name
severity = self.severity_classifier.classify(
level=entry.level, message=message, pattern_match=entry.error_pattern
return ParsedEntry(
raw_line=line.strip(),
format=LogFormat.RAW,
timestamp=None,
level=None,
message=line.strip(),
metadata={},
)
entry.severity = severity.value
def _compute_statistics(self, result: AnalysisResult) -> None:
'''Compute statistics from analyzed entries.'''
severity_counts = Counter()
pattern_counts = Counter()
host_counts = Counter()
timestamps = []
def _classify_entry(self, entry: ParsedEntry) -> str:
"""Classify severity of an entry."""
content = entry.message
for entry in result.entries:
severity = entry.severity or "unknown"
severity_counts[severity] += 1
patterns_by_severity = self.patterns.get_patterns_for_content(content)
if entry.error_pattern:
pattern_counts[entry.error_pattern] += 1
if patterns_by_severity:
severities = [p.severity for p in patterns_by_severity]
if "critical" in severities:
return "critical"
elif "error" in severities:
return "error"
elif "warning" in severities:
return "warning"
elif "debug" in severities:
return "debug"
if entry.host:
host_counts[entry.host] += 1
return self.severity_classifier.classify(content, entry.level)
if entry.timestamp:
timestamps.append(entry.timestamp)
result.severity_breakdown = dict(severity_counts)
result.pattern_matches = dict(pattern_counts)
result.host_breakdown = dict(host_counts)
result.critical_count = severity_counts.get("critical", 0)
result.error_count = severity_counts.get("error", 0)
result.warning_count = severity_counts.get("warning", 0)
result.debug_count = severity_counts.get("debug", 0)
if timestamps:
result.time_range = (min(timestamps), max(timestamps))
result.top_errors = [
{"pattern": name, "count": count} for name, count in pattern_counts.most_common(10)
]
result.suggestions = self._generate_suggestions(result)
def _generate_suggestions(self, result: AnalysisResult) -> list[str]:
'''Generate suggestions based on analysis.'''
def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]:
"""Generate suggestions based on analysis."""
suggestions = []
if result.critical_count > 0:
error_entries = [e for e in entries if e.severity in ("error", "critical")]
if not error_entries:
return ["No errors detected. Keep up the good work!"]
error_messages = [e.message for e in error_entries]
error_counter = Counter(error_messages)
common_errors = error_counter.most_common(5)
if len(common_errors) > 3:
suggestions.append(
f"Found {result.critical_count} critical errors. "
"Review immediately - these may indicate system failures."
f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages."
)
if result.error_count > 10:
for error_msg, count in common_errors[:3]:
if count > 1:
suggestions.append(f"'{error_msg[:50]}...' occurred {count} times")
stack_trace_entries = [
e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower()
]
if stack_trace_entries:
suggestions.append(
f"High error volume detected ({result.error_count} errors). "
"Consider implementing automated alerting."
"Multiple stack traces detected. Consider checking the exception types and their root causes."
)
if result.pattern_matches:
top_pattern = max(result.pattern_matches, key=result.pattern_matches.get)
connection_errors = [
e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower()
]
if len(connection_errors) > len(error_entries) * 0.3:
suggestions.append(
f"Most common issue: '{top_pattern}' "
f"({result.pattern_matches[top_pattern]} occurrences). "
"Prioritize fixing this pattern."
)
if result.host_breakdown:
top_host = max(result.host_breakdown, key=result.host_breakdown.get)
if result.host_breakdown[top_host] > len(result.entries) * 0.5:
suggestions.append(
f"Host '{top_host}' shows high error concentration. "
"Check this host's configuration and resources."
"High proportion of connection/timeout errors. Check network connectivity and service availability."
)
return suggestions
def analyze_file(self, file_path: str, format: Optional[LogFormat] = None) -> AnalysisResult:
'''Analyze a log file.'''
with open(file_path, encoding="utf-8", errors="replace") as f:
lines = f.readlines()
return self.analyze(lines, format)
def analyze_stdin(self) -> AnalysisResult:
'''Analyze from stdin.'''
import sys
lines = sys.stdin.readlines()
return self.analyze(lines)
def get_pattern_info(self, pattern_name: str) -> Optional[dict[str, Any]]:
'''Get information about a pattern.'''
for pattern in self.pattern_library.list_patterns():
if pattern.name == pattern_name:
return {
"name": pattern.name,
"pattern": pattern.pattern,
"severity": pattern.severity,
"description": pattern.description,
"suggestion": pattern.suggestion,
"group": pattern.group,
"enabled": pattern.enabled,
}
return None
def list_patterns_by_group(self) -> dict[str, list[dict[str, Any]]]:
'''List all patterns organized by group.'''
result = {}
for group_name, patterns in self.pattern_library.list_groups().items():
result[group_name] = [
{"name": p.name, "severity": p.severity, "description": p.description}
for p in patterns
]
return result
def list_patterns_by_group(self) -> dict[str, list[dict]]:
"""List all patterns grouped by category."""
return self.patterns.get_all_patterns()