From c41232b3f64d6267654c37ab3c4a04dcb7739342 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 10:08:36 +0000 Subject: [PATCH] Add analyzers (patterns, severity, analyzer) --- loglens/analyzers/severity.py | 255 +++++++++++++++++++++++++++------- 1 file changed, 204 insertions(+), 51 deletions(-) diff --git a/loglens/analyzers/severity.py b/loglens/analyzers/severity.py index 0f65f2c..10df83f 100644 --- a/loglens/analyzers/severity.py +++ b/loglens/analyzers/severity.py @@ -1,64 +1,217 @@ -from typing import Optional +"""Severity classification for log entries.""" + +from dataclasses import dataclass +from enum import Enum +from typing import Any, Optional + + +class SeverityLevel(Enum): + """Severity levels for log entries.""" + + CRITICAL = "critical" + ERROR = "error" + WARNING = "warning" + INFO = "info" + DEBUG = "debug" + UNKNOWN = "unknown" + + @classmethod + def from_string(cls, level: str) -> "SeverityLevel": + """Convert string to SeverityLevel.""" + level_lower = level.lower() + for member in cls: + if member.value == level_lower: + return member + return cls.UNKNOWN + + def score(self) -> int: + """Get numeric score for severity.""" + scores = { + SeverityLevel.CRITICAL: 5, + SeverityLevel.ERROR: 4, + SeverityLevel.WARNING: 3, + SeverityLevel.INFO: 2, + SeverityLevel.DEBUG: 1, + SeverityLevel.UNKNOWN: 0, + } + return scores.get(self, 0) + + def __lt__(self, other: "SeverityLevel") -> bool: + """Compare severity levels.""" + return self.score() < other.score() + + def __le__(self, other: "SeverityLevel") -> bool: + return self.score() <= other.score() + + def __gt__(self, other: "SeverityLevel") -> bool: + return self.score() > other.score() + + def __ge__(self, other: "SeverityLevel") -> bool: + return self.score() >= other.score() + + +@dataclass +class SeverityRule: + """Rule for severity classification.""" + + name: str + patterns: list[str] + severity: SeverityLevel + weight: int = 1 + description: str = "" class SeverityClassifier: - """Classify log entry severity based on content and level.""" + """Classifies log entries by severity.""" + + DEFAULT_RULES = [ + SeverityRule( + name="critical_keywords", + patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"], + severity=SeverityLevel.CRITICAL, + weight=10, + description="Critical system failures", + ), + SeverityRule( + name="error_keywords", + patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"], + severity=SeverityLevel.ERROR, + weight=5, + description="General errors", + ), + SeverityRule( + name="warning_keywords", + patterns=["warning", "warn", "deprecated", "deprecation"], + severity=SeverityLevel.WARNING, + weight=3, + description="Warnings and deprecations", + ), + SeverityRule( + name="info_keywords", + patterns=["info", "notice", "started", "stopped", "loaded"], + severity=SeverityLevel.INFO, + weight=1, + description="Informational messages", + ), + SeverityRule( + name="debug_keywords", + patterns=["debug", "trace", "verbose"], + severity=SeverityLevel.DEBUG, + weight=0, + description="Debug and trace messages", + ), + ] + + def __init__(self, custom_rules: Optional[list[dict[str, Any]]] = None): + self.rules: list[SeverityRule] = self.DEFAULT_RULES.copy() + if custom_rules: + self._load_custom_rules(custom_rules) + + def _load_custom_rules(self, rules: list[dict[str, Any]]) -> None: + """Load custom severity rules.""" + for rule_data in rules: + rule = SeverityRule( + name=rule_data.get("name", "custom"), + patterns=rule_data.get("patterns", []), + severity=SeverityLevel.from_string(rule_data.get("severity", "info")), + weight=rule_data.get("weight", 1), + description=rule_data.get("description", ""), + ) + self.rules.append(rule) + + def classify( + self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None + ) -> SeverityLevel: + """Classify severity based on level, message, and pattern.""" + score = 0 + matched_severity = SeverityLevel.UNKNOWN - def classify(self, message: str, level: Optional[str]) -> str: - """Classify the severity of a log entry.""" if level: level_lower = level.lower() - if level_lower in ("critical", "fatal", "emerg"): - return "critical" - elif level_lower in ("error", "err", "erro"): - return "error" - elif level_lower in ("warning", "warn"): - return "warning" - elif level_lower in ("info", "informational"): - return "info" - elif level_lower in ("debug", "dbg", "trace"): - return "debug" + if level_lower in ["trace", "verbose"]: + return SeverityLevel.DEBUG + if level_lower in ["critical", "fatal", "emerg", "emergency"]: + return SeverityLevel.CRITICAL + if level_lower in ["err", "er", "e"]: + return SeverityLevel.ERROR + if level_lower in ["warn", "w"]: + return SeverityLevel.WARNING + if level_lower in ["notice"]: + return SeverityLevel.INFO - message_lower = message.lower() + inferred = SeverityLevel.from_string(level_lower) + if inferred != SeverityLevel.UNKNOWN: + return inferred - critical_keywords = [ - "fatal", - "panic", - "crash", - "died", - "terminated", - "segfault", - "out of memory", - "no space left", - ] - error_keywords = [ - "error", - "fail", - "exception", - "invalid", - "cannot", - "unable", - "timeout", - "refused", - "rejected", - ] - warning_keywords = ["warning", "warn", "deprecated", "slow", "high latency"] - debug_keywords = ["debug", "trace", "verbose", "dump", "print"] + text = f"{message} {pattern_match or ''}".lower() - for keyword in critical_keywords: - if keyword in message_lower: - return "critical" + for rule in self.rules: + for pattern in rule.patterns: + if pattern.lower() in text: + if rule.weight > score: + score = rule.weight + matched_severity = rule.severity - for keyword in error_keywords: - if keyword in message_lower: - return "error" + if matched_severity != SeverityLevel.UNKNOWN: + return matched_severity - for keyword in warning_keywords: - if keyword in message_lower: - return "warning" + if not text.strip(): + return SeverityLevel.UNKNOWN - for keyword in debug_keywords: - if keyword in message_lower: - return "debug" + if pattern_match: + return SeverityLevel.ERROR - return "info" + return SeverityLevel.INFO + + def classify_with_details( + self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None + ) -> dict[str, Any]: + """Classify severity with detailed information.""" + severity = self.classify(level, message, pattern_match) + + text = f"{message} {pattern_match or ''}".lower() + matched_rules = [] + + for rule in self.rules: + for pattern in rule.patterns: + if pattern.lower() in text: + matched_rules.append( + { + "rule": rule.name, + "pattern": pattern, + "severity": rule.severity.value, + "weight": rule.weight, + } + ) + + return { + "severity": severity, + "matched_rules": matched_rules, + "confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5, + } + + def get_severity_order(self) -> list[SeverityLevel]: + """Get severity levels in order from highest to lowest.""" + return sorted( + [ + SeverityLevel.CRITICAL, + SeverityLevel.ERROR, + SeverityLevel.WARNING, + SeverityLevel.INFO, + SeverityLevel.DEBUG, + SeverityLevel.UNKNOWN, + ], + reverse=True, + ) + + def add_rule(self, rule: SeverityRule) -> None: + """Add a custom rule.""" + self.rules.append(rule) + + def remove_rule(self, name: str) -> bool: + """Remove a rule by name.""" + for i, rule in enumerate(self.rules): + if rule.name == name: + self.rules.pop(i) + return True + return False