Some checks failed
- Added @click.version_option decorator to main() in commands.py - Imported __version__ from loglens package - Resolves CI build failure: 'loglens --version' command not found
201 lines
6.5 KiB
Python
201 lines
6.5 KiB
Python
import re
|
|
from collections import Counter
|
|
from typing import Optional
|
|
|
|
from loglens.analyzers.patterns import PatternLibrary
|
|
from loglens.analyzers.severity import SeverityClassifier
|
|
from loglens.parsers.base import LogFormat, ParsedEntry
|
|
|
|
|
|
class AnalysisResult:
|
|
"""Result of log analysis."""
|
|
|
|
def __init__(
|
|
self,
|
|
total_lines: int,
|
|
entries: list[ParsedEntry],
|
|
format_detected: LogFormat,
|
|
error_count: int = 0,
|
|
warning_count: int = 0,
|
|
critical_count: int = 0,
|
|
debug_count: int = 0,
|
|
suggestions: Optional[list[str]] = None,
|
|
):
|
|
self.total_lines = total_lines
|
|
self.entries = entries
|
|
self.format_detected = format_detected
|
|
self.error_count = error_count
|
|
self.warning_count = warning_count
|
|
self.critical_count = critical_count
|
|
self.debug_count = debug_count
|
|
self.suggestions = suggestions or []
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
"total_lines": self.total_lines,
|
|
"entries": [e.to_dict() for e in self.entries],
|
|
"format_detected": self.format_detected.value,
|
|
"error_count": self.error_count,
|
|
"warning_count": self.warning_count,
|
|
"critical_count": self.critical_count,
|
|
"debug_count": self.debug_count,
|
|
"suggestions": self.suggestions,
|
|
}
|
|
|
|
|
|
class LogAnalyzer:
|
|
"""Main analyzer for log files."""
|
|
|
|
def __init__(self):
|
|
self.patterns = PatternLibrary()
|
|
self.severity_classifier = SeverityClassifier()
|
|
|
|
def analyze(
|
|
self, lines: list[str], format_enum: Optional[LogFormat] = None
|
|
) -> AnalysisResult:
|
|
"""Analyze a list of log lines."""
|
|
entries = []
|
|
error_count = 0
|
|
warning_count = 0
|
|
critical_count = 0
|
|
debug_count = 0
|
|
|
|
detected_format = format_enum
|
|
|
|
for line in lines:
|
|
if not line.strip():
|
|
continue
|
|
|
|
entry = self._parse_line(line, format_enum)
|
|
if entry:
|
|
entries.append(entry)
|
|
severity = self._classify_entry(entry)
|
|
entry.severity = severity
|
|
|
|
if severity == "critical":
|
|
critical_count += 1
|
|
elif severity == "error":
|
|
error_count += 1
|
|
elif severity == "warning":
|
|
warning_count += 1
|
|
elif severity == "debug":
|
|
debug_count += 1
|
|
|
|
if detected_format is None:
|
|
detected_format = entry.format
|
|
|
|
suggestions = self._generate_suggestions(entries)
|
|
|
|
return AnalysisResult(
|
|
total_lines=len(lines),
|
|
entries=entries,
|
|
format_detected=detected_format or LogFormat.RAW,
|
|
error_count=error_count,
|
|
warning_count=warning_count,
|
|
critical_count=critical_count,
|
|
debug_count=debug_count,
|
|
suggestions=suggestions,
|
|
)
|
|
|
|
def analyze_file(
|
|
self, file_path: str, format_enum: Optional[LogFormat] = None
|
|
) -> AnalysisResult:
|
|
"""Analyze a log file."""
|
|
with open(file_path, "r") as f:
|
|
lines = f.readlines()
|
|
return self.analyze(lines, format_enum)
|
|
|
|
def _parse_line(
|
|
self, line: str, format_enum: Optional[LogFormat] = None
|
|
) -> Optional[ParsedEntry]:
|
|
"""Parse a single log line."""
|
|
from loglens.parsers.factory import ParserFactory
|
|
|
|
if format_enum:
|
|
parser = ParserFactory.get_parser(format_enum)
|
|
entry = parser.parse(line)
|
|
if entry:
|
|
return entry
|
|
|
|
for fmt in LogFormat:
|
|
if fmt == LogFormat.RAW:
|
|
continue
|
|
parser = ParserFactory.get_parser(fmt)
|
|
entry = parser.parse(line)
|
|
if entry:
|
|
return entry
|
|
|
|
return ParsedEntry(
|
|
raw_line=line.strip(),
|
|
format=LogFormat.RAW,
|
|
timestamp=None,
|
|
level=None,
|
|
message=line.strip(),
|
|
metadata={},
|
|
)
|
|
|
|
def _classify_entry(self, entry: ParsedEntry) -> str:
|
|
"""Classify severity of an entry."""
|
|
content = entry.message
|
|
|
|
patterns_by_severity = self.patterns.get_patterns_for_content(content)
|
|
|
|
if patterns_by_severity:
|
|
severities = [p.severity for p in patterns_by_severity]
|
|
if "critical" in severities:
|
|
return "critical"
|
|
elif "error" in severities:
|
|
return "error"
|
|
elif "warning" in severities:
|
|
return "warning"
|
|
elif "debug" in severities:
|
|
return "debug"
|
|
|
|
return self.severity_classifier.classify(content, entry.level)
|
|
|
|
def _generate_suggestions(self, entries: list[ParsedEntry]) -> list[str]:
|
|
"""Generate suggestions based on analysis."""
|
|
suggestions = []
|
|
|
|
error_entries = [e for e in entries if e.severity in ("error", "critical")]
|
|
|
|
if not error_entries:
|
|
return ["No errors detected. Keep up the good work!"]
|
|
|
|
error_messages = [e.message for e in error_entries]
|
|
error_counter = Counter(error_messages)
|
|
|
|
common_errors = error_counter.most_common(5)
|
|
|
|
if len(common_errors) > 3:
|
|
suggestions.append(
|
|
f"Found {len(error_entries)} errors across {len(common_errors)} unique error messages."
|
|
)
|
|
|
|
for error_msg, count in common_errors[:3]:
|
|
if count > 1:
|
|
suggestions.append(f"'{error_msg[:50]}...' occurred {count} times")
|
|
|
|
stack_trace_entries = [
|
|
e for e in error_entries if "Traceback" in e.message or "stack" in e.message.lower()
|
|
]
|
|
if stack_trace_entries:
|
|
suggestions.append(
|
|
"Multiple stack traces detected. Consider checking the exception types and their root causes."
|
|
)
|
|
|
|
connection_errors = [
|
|
e for e in error_entries if "connection" in e.message.lower() or "timeout" in e.message.lower()
|
|
]
|
|
if len(connection_errors) > len(error_entries) * 0.3:
|
|
suggestions.append(
|
|
"High proportion of connection/timeout errors. Check network connectivity and service availability."
|
|
)
|
|
|
|
return suggestions
|
|
|
|
def list_patterns_by_group(self) -> dict[str, list[dict]]:
|
|
"""List all patterns grouped by category."""
|
|
return self.patterns.get_all_patterns()
|