diff --git a/loglens/analyzers/severity.py b/loglens/analyzers/severity.py index fdfd609..0f65f2c 100644 --- a/loglens/analyzers/severity.py +++ b/loglens/analyzers/severity.py @@ -1,217 +1,64 @@ -'''Severity classification for log entries.''' - -from dataclasses import dataclass -from enum import Enum -from typing import Any, Optional - - -class SeverityLevel(Enum): - '''Severity levels for log entries.''' - - CRITICAL = "critical" - ERROR = "error" - WARNING = "warning" - INFO = "info" - DEBUG = "debug" - UNKNOWN = "unknown" - - @classmethod - def from_string(cls, level: str) -> "SeverityLevel": - '''Convert string to SeverityLevel.''' - level_lower = level.lower() - for member in cls: - if member.value == level_lower: - return member - return cls.UNKNOWN - - def score(self) -> int: - '''Get numeric score for severity.''' - scores = { - SeverityLevel.CRITICAL: 5, - SeverityLevel.ERROR: 4, - SeverityLevel.WARNING: 3, - SeverityLevel.INFO: 2, - SeverityLevel.DEBUG: 1, - SeverityLevel.UNKNOWN: 0, - } - return scores.get(self, 0) - - def __lt__(self, other: "SeverityLevel") -> bool: - '''Compare severity levels.''' - return self.score() < other.score() - - def __le__(self, other: "SeverityLevel") -> bool: - return self.score() <= other.score() - - def __gt__(self, other: "SeverityLevel") -> bool: - return self.score() > other.score() - - def __ge__(self, other: "SeverityLevel") -> bool: - return self.score() >= other.score() - - -@dataclass -class SeverityRule: - '''Rule for severity classification.''' - - name: str - patterns: list[str] - severity: SeverityLevel - weight: int = 1 - description: str = "" +from typing import Optional class SeverityClassifier: - '''Classifies log entries by severity.''' - - DEFAULT_RULES = [ - SeverityRule( - name="critical_keywords", - patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"], - severity=SeverityLevel.CRITICAL, - weight=10, - description="Critical system failures", - ), - SeverityRule( - name="error_keywords", - patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"], - severity=SeverityLevel.ERROR, - weight=5, - description="General errors", - ), - SeverityRule( - name="warning_keywords", - patterns=["warning", "warn", "deprecated", "deprecation"], - severity=SeverityLevel.WARNING, - weight=3, - description="Warnings and deprecations", - ), - SeverityRule( - name="info_keywords", - patterns=["info", "notice", "started", "stopped", "loaded"], - severity=SeverityLevel.INFO, - weight=1, - description="Informational messages", - ), - SeverityRule( - name="debug_keywords", - patterns=["debug", "trace", "verbose"], - severity=SeverityLevel.DEBUG, - weight=0, - description="Debug and trace messages", - ), - ] - - def __init__(self, custom_rules: Optional[list[dict[str, Any]]] = None): - self.rules: list[SeverityRule] = self.DEFAULT_RULES.copy() - if custom_rules: - self._load_custom_rules(custom_rules) - - def _load_custom_rules(self, rules: list[dict[str, Any]]) -> None: - '''Load custom severity rules.''' - for rule_data in rules: - rule = SeverityRule( - name=rule_data.get("name", "custom"), - patterns=rule_data.get("patterns", []), - severity=SeverityLevel.from_string(rule_data.get("severity", "info")), - weight=rule_data.get("weight", 1), - description=rule_data.get("description", ""), - ) - self.rules.append(rule) - - def classify( - self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None - ) -> SeverityLevel: - '''Classify severity based on level, message, and pattern.''' - score = 0 - matched_severity = SeverityLevel.UNKNOWN + """Classify log entry severity based on content and level.""" + def classify(self, message: str, level: Optional[str]) -> str: + """Classify the severity of a log entry.""" if level: level_lower = level.lower() - if level_lower in ["trace", "verbose"]: - return SeverityLevel.DEBUG - if level_lower in ["critical", "fatal", "emerg", "emergency"]: - return SeverityLevel.CRITICAL - if level_lower in ["err", "er", "e"]: - return SeverityLevel.ERROR - if level_lower in ["warn", "w"]: - return SeverityLevel.WARNING - if level_lower in ["notice"]: - return SeverityLevel.INFO + if level_lower in ("critical", "fatal", "emerg"): + return "critical" + elif level_lower in ("error", "err", "erro"): + return "error" + elif level_lower in ("warning", "warn"): + return "warning" + elif level_lower in ("info", "informational"): + return "info" + elif level_lower in ("debug", "dbg", "trace"): + return "debug" - inferred = SeverityLevel.from_string(level_lower) - if inferred != SeverityLevel.UNKNOWN: - return inferred + message_lower = message.lower() - text = f"{message} {pattern_match or ''}".lower() + critical_keywords = [ + "fatal", + "panic", + "crash", + "died", + "terminated", + "segfault", + "out of memory", + "no space left", + ] + error_keywords = [ + "error", + "fail", + "exception", + "invalid", + "cannot", + "unable", + "timeout", + "refused", + "rejected", + ] + warning_keywords = ["warning", "warn", "deprecated", "slow", "high latency"] + debug_keywords = ["debug", "trace", "verbose", "dump", "print"] - for rule in self.rules: - for pattern in rule.patterns: - if pattern.lower() in text: - if rule.weight > score: - score = rule.weight - matched_severity = rule.severity + for keyword in critical_keywords: + if keyword in message_lower: + return "critical" - if matched_severity != SeverityLevel.UNKNOWN: - return matched_severity + for keyword in error_keywords: + if keyword in message_lower: + return "error" - if not text.strip(): - return SeverityLevel.UNKNOWN + for keyword in warning_keywords: + if keyword in message_lower: + return "warning" - if pattern_match: - return SeverityLevel.ERROR + for keyword in debug_keywords: + if keyword in message_lower: + return "debug" - return SeverityLevel.INFO - - def classify_with_details( - self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None - ) -> dict[str, Any]: - '''Classify severity with detailed information.''' - severity = self.classify(level, message, pattern_match) - - text = f"{message} {pattern_match or ''}".lower() - matched_rules = [] - - for rule in self.rules: - for pattern in rule.patterns: - if pattern.lower() in text: - matched_rules.append( - { - "rule": rule.name, - "pattern": pattern, - "severity": rule.severity.value, - "weight": rule.weight, - } - ) - - return { - "severity": severity, - "matched_rules": matched_rules, - "confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5, - } - - def get_severity_order(self) -> list[SeverityLevel]: - '''Get severity levels in order from highest to lowest.''' - return sorted( - [ - SeverityLevel.CRITICAL, - SeverityLevel.ERROR, - SeverityLevel.WARNING, - SeverityLevel.INFO, - SeverityLevel.DEBUG, - SeverityLevel.UNKNOWN, - ], - reverse=True, - ) - - def add_rule(self, rule: SeverityRule) -> None: - '''Add a custom rule.''' - self.rules.append(rule) - - def remove_rule(self, name: str) -> bool: - '''Remove a rule by name.''' - for i, rule in enumerate(self.rules): - if rule.name == name: - self.rules.pop(i) - return True - return False + return "info"