From 7e55e0edd249841f9099e4d8eadafd553c878cc1 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 08:05:04 +0000 Subject: [PATCH] Add analyzers: patterns, severity, and main analyzer --- loglens/analyzers/severity.py | 203 ++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 loglens/analyzers/severity.py diff --git a/loglens/analyzers/severity.py b/loglens/analyzers/severity.py new file mode 100644 index 0000000..4dda3a7 --- /dev/null +++ b/loglens/analyzers/severity.py @@ -0,0 +1,203 @@ +"""Severity classification for log entries.""" + +from dataclasses import dataclass +from enum import Enum +from typing import Any, Dict, List, Optional + + +class SeverityLevel(Enum): + """Severity levels for log entries.""" + CRITICAL = "critical" + ERROR = "error" + WARNING = "warning" + INFO = "info" + DEBUG = "debug" + UNKNOWN = "unknown" + + @classmethod + def from_string(cls, level: str) -> "SeverityLevel": + """Convert string to SeverityLevel.""" + level_lower = level.lower() + for member in cls: + if member.value == level_lower: + return member + return cls.UNKNOWN + + def score(self) -> int: + """Get numeric score for severity.""" + scores = { + SeverityLevel.CRITICAL: 5, + SeverityLevel.ERROR: 4, + SeverityLevel.WARNING: 3, + SeverityLevel.INFO: 2, + SeverityLevel.DEBUG: 1, + SeverityLevel.UNKNOWN: 0 + } + return scores.get(self, 0) + + def __lt__(self, other: "SeverityLevel") -> bool: + """Compare severity levels.""" + return self.score() < other.score() + + def __le__(self, other: "SeverityLevel") -> bool: + return self.score() <= other.score() + + def __gt__(self, other: "SeverityLevel") -> bool: + return self.score() > other.score() + + def __ge__(self, other: "SeverityLevel") -> bool: + return self.score() >= other.score() + + +@dataclass +class SeverityRule: + """Rule for severity classification.""" + name: str + patterns: List[str] + severity: SeverityLevel + weight: int = 1 + description: str = "" + + +class SeverityClassifier: + """Classifies log entries by severity.""" + + DEFAULT_RULES = [ + SeverityRule( + name="critical_keywords", + patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"], + severity=SeverityLevel.CRITICAL, + weight=10, + description="Critical system failures" + ), + SeverityRule( + name="error_keywords", + patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"], + severity=SeverityLevel.ERROR, + weight=5, + description="General errors" + ), + SeverityRule( + name="warning_keywords", + patterns=["warning", "warn", "deprecated", "deprecation"], + severity=SeverityLevel.WARNING, + weight=3, + description="Warnings and deprecations" + ), + SeverityRule( + name="info_keywords", + patterns=["info", "notice", "started", "stopped", "loaded"], + severity=SeverityLevel.INFO, + weight=1, + description="Informational messages" + ), + SeverityRule( + name="debug_keywords", + patterns=["debug", "trace", "verbose"], + severity=SeverityLevel.DEBUG, + weight=0, + description="Debug and trace messages" + ), + ] + + def __init__(self, custom_rules: Optional[List[Dict[str, Any]]] = None): + self.rules: List[SeverityRule] = self.DEFAULT_RULES.copy() + if custom_rules: + self._load_custom_rules(custom_rules) + + def _load_custom_rules(self, rules: List[Dict[str, Any]]) -> None: + """Load custom severity rules.""" + for rule_data in rules: + rule = SeverityRule( + name=rule_data.get("name", "custom"), + patterns=rule_data.get("patterns", []), + severity=SeverityLevel.from_string(rule_data.get("severity", "info")), + weight=rule_data.get("weight", 1), + description=rule_data.get("description", "") + ) + self.rules.append(rule) + + def classify(self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None) -> SeverityLevel: + """Classify severity based on level, message, and pattern.""" + score = 0 + matched_severity = SeverityLevel.UNKNOWN + + if level: + level_lower = level.lower() + if level_lower in ["trace", "verbose"]: + return SeverityLevel.DEBUG + if level_lower in ["critical", "fatal", "emerg", "emergency"]: + return SeverityLevel.CRITICAL + if level_lower in ["err", "er", "e"]: + return SeverityLevel.ERROR + if level_lower in ["warn", "w"]: + return SeverityLevel.WARNING + if level_lower in ["notice"]: + return SeverityLevel.INFO + + inferred = SeverityLevel.from_string(level_lower) + if inferred != SeverityLevel.UNKNOWN: + return inferred + + text = f"{message} {pattern_match or ''}".lower() + + for rule in self.rules: + for pattern in rule.patterns: + if pattern.lower() in text: + if rule.weight > score: + score = rule.weight + matched_severity = rule.severity + + if matched_severity != SeverityLevel.UNKNOWN: + return matched_severity + + if not text.strip(): + return SeverityLevel.UNKNOWN + + if pattern_match: + return SeverityLevel.ERROR + + return SeverityLevel.INFO + + def classify_with_details(self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None) -> Dict[str, Any]: + """Classify severity with detailed information.""" + severity = self.classify(level, message, pattern_match) + + text = f"{message} {pattern_match or ''}".lower() + matched_rules = [] + + for rule in self.rules: + for pattern in rule.patterns: + if pattern.lower() in text: + matched_rules.append({ + "rule": rule.name, + "pattern": pattern, + "severity": rule.severity.value, + "weight": rule.weight + }) + + return { + "severity": severity, + "matched_rules": matched_rules, + "confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5 + } + + def get_severity_order(self) -> List[SeverityLevel]: + """Get severity levels in order from highest to lowest.""" + return sorted( + [SeverityLevel.CRITICAL, SeverityLevel.ERROR, SeverityLevel.WARNING, + SeverityLevel.INFO, SeverityLevel.DEBUG, SeverityLevel.UNKNOWN], + reverse=True + ) + + def add_rule(self, rule: SeverityRule) -> None: + """Add a custom rule.""" + self.rules.append(rule) + + def remove_rule(self, name: str) -> bool: + """Remove a rule by name.""" + for i, rule in enumerate(self.rules): + if rule.name == name: + self.rules.pop(i) + return True + return False