Files
loglens-cli/loglens/analyzers/analyzer.py
7000pctAUTO 3e220bb139
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
fix: add --version option to Click CLI group
- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
2026-02-02 09:24:59 +00:00

201 lines
6.5 KiB
Python

import re
from collections import Counter
from typing import Optional
from loglens.analyzers.patterns import PatternLibrary
from loglens.analyzers.severity import SeverityClassifier
from loglens.parsers.base import LogFormat, ParsedEntry
class AnalysisResult:
"""Result of log analysis."""
def __init__(
self,
total_lines: int,
entries: list[ParsedEntry],
format_detected: LogFormat,
error_count: int = 0,
warning_count: int = 0,
critical_count: int = 0,
debug_count: int = 0,
suggestions: Optional[list[str]] = None,
):
self.total_lines = total_lines
self.entries = entries
self.format_detected = format_detected
self.error_count = error_count
self.warning_count = warning_count
self.critical_count = critical_count
self.debug_count = debug_count
self.suggestions = suggestions or []
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"total_lines": self.total_lines,
"entries": [e.to_dict() for e in self.entries],
"format_detected": self.format_detected.value,
"error_count": self.error_count,
"warning_count": self.warning_count,
"critical_count": self.critical_count,
"debug_count": self.debug_count,
"suggestions": self.suggestions,
}
class LogAnalyzer:
"""Main analyzer for log files."""
def __init__(self):
self.patterns = PatternLibrary()
self.severity_classifier = SeverityClassifier()
def analyze(
self, lines: list[str], format_enum: Optional[LogFormat] = None
) -> AnalysisResult:
"""Analyze a list of log lines."""
entries = []
error_count = 0
warning_count = 0
critical_count = 0
debug_count = 0
detected_format = format_enum
for line in lines:
if not line.strip():
continue
entry = self._parse_line(line, format_enum)
if entry:
entries.append(entry)
severity = self._classify_entry(entry)
entry.severity = severity
if severity == "critical":
critical_count += 1
elif severity == "error":
error_count += 1
elif severity == "warning":
warning_count += 1
elif severity == "debug":
debug_count += 1
if detected_format is None:
detected_format = entry.format
suggestions = self._generate_suggestions(entries)
return AnalysisResult(
total_lines=len(lines),
entries=entries,
format_detected=detected_format or LogFormat.RAW,
error_count=error_count,
warning_count=warning_count,
critical_count=critical_count,
debug_count=debug_count,
suggestions=suggestions,
)
def analyze_file(
self, file_path: str, format_enum: Optional[LogFormat] = None
) -> AnalysisResult:
"""Analyze a log file."""
with open(file_path, "r") as f:
lines = f.readlines()
return self.analyze(lines, format_enum)
def _parse_line(
self, line: str, format_enum: Optional[LogFormat] = None
) -> Optional[ParsedEntry]:
"""Parse a single log line."""
from loglens.parsers.factory import ParserFactory
if format_enum:
parser = ParserFactory.get_parser(format_enum)
entry = parser.parse(line)
if entry:
return entry
for fmt in LogFormat:
if fmt == LogFormat.RAW:
continue
parser = ParserFactory.get_parser(fmt)
entry = parser.parse(line)
if entry:
return entry
return ParsedEntry(
raw_line=line.strip(),
format=LogFormat.RAW,
timestamp=None,
level=None,
message=line.strip(),
metadata={},
)
def _classify_entry(self, entry: ParsedEntry) -> str:
"""Classify severity of an entry."""
content = entry.message
patterns_by_severity = self.patterns.get_patterns_for_content(content)
if patterns_by_severity:
severities = [p.severity for p in patterns_by_severity]
if "critical" in severities:
return "critical"
elif "error" in severities:
return "error"
elif "warning" in severities:
return "warning"
elif "debug" in severities:
return "debug"
return self.severity_classifier.classify(content, entry.level)
def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]:
"""Generate suggestions based on analysis."""
suggestions = []
error_entries = [e for e in entries if e.severity in ("error", "critical")]
if not error_entries:
return ["No errors detected. Keep up the good work!"]
error_messages = [e.message for e in error_entries]
error_counter = Counter(error_messages)
common_errors = error_counter.most_common(5)
if len(common_errors) > 3:
suggestions.append(
f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages."
)
for error_msg, count in common_errors[:3]:
if count > 1:
suggestions.append(f"'{error_msg[:50]}...' occurred {count} times")
stack_trace_entries = [
e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower()
]
if stack_trace_entries:
suggestions.append(
"Multiple stack traces detected. Consider checking the exception types and their root causes."
)
connection_errors = [
e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower()
]
if len(connection_errors) > len(error_entries) * 0.3:
suggestions.append(
"High proportion of connection/timeout errors. Check network connectivity and service availability."
)
return suggestions
def list_patterns_by_group(self) -> dict[str, list[dict]]:
"""List all patterns grouped by category."""
return self.patterns.get_all_patterns()