Add analyzers: patterns, severity, and main analyzer
Some checks failed
Some checks failed
This commit is contained in:
203
loglens/analyzers/severity.py
Normal file
203
loglens/analyzers/severity.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""Severity classification for log entries."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
class SeverityLevel(Enum):
|
||||
"""Severity levels for log entries."""
|
||||
CRITICAL = "critical"
|
||||
ERROR = "error"
|
||||
WARNING = "warning"
|
||||
INFO = "info"
|
||||
DEBUG = "debug"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, level: str) -> "SeverityLevel":
|
||||
"""Convert string to SeverityLevel."""
|
||||
level_lower = level.lower()
|
||||
for member in cls:
|
||||
if member.value == level_lower:
|
||||
return member
|
||||
return cls.UNKNOWN
|
||||
|
||||
def score(self) -> int:
|
||||
"""Get numeric score for severity."""
|
||||
scores = {
|
||||
SeverityLevel.CRITICAL: 5,
|
||||
SeverityLevel.ERROR: 4,
|
||||
SeverityLevel.WARNING: 3,
|
||||
SeverityLevel.INFO: 2,
|
||||
SeverityLevel.DEBUG: 1,
|
||||
SeverityLevel.UNKNOWN: 0
|
||||
}
|
||||
return scores.get(self, 0)
|
||||
|
||||
def __lt__(self, other: "SeverityLevel") -> bool:
|
||||
"""Compare severity levels."""
|
||||
return self.score() < other.score()
|
||||
|
||||
def __le__(self, other: "SeverityLevel") -> bool:
|
||||
return self.score() <= other.score()
|
||||
|
||||
def __gt__(self, other: "SeverityLevel") -> bool:
|
||||
return self.score() > other.score()
|
||||
|
||||
def __ge__(self, other: "SeverityLevel") -> bool:
|
||||
return self.score() >= other.score()
|
||||
|
||||
|
||||
@dataclass
|
||||
class SeverityRule:
|
||||
"""Rule for severity classification."""
|
||||
name: str
|
||||
patterns: List[str]
|
||||
severity: SeverityLevel
|
||||
weight: int = 1
|
||||
description: str = ""
|
||||
|
||||
|
||||
class SeverityClassifier:
|
||||
"""Classifies log entries by severity."""
|
||||
|
||||
DEFAULT_RULES = [
|
||||
SeverityRule(
|
||||
name="critical_keywords",
|
||||
patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"],
|
||||
severity=SeverityLevel.CRITICAL,
|
||||
weight=10,
|
||||
description="Critical system failures"
|
||||
),
|
||||
SeverityRule(
|
||||
name="error_keywords",
|
||||
patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"],
|
||||
severity=SeverityLevel.ERROR,
|
||||
weight=5,
|
||||
description="General errors"
|
||||
),
|
||||
SeverityRule(
|
||||
name="warning_keywords",
|
||||
patterns=["warning", "warn", "deprecated", "deprecation"],
|
||||
severity=SeverityLevel.WARNING,
|
||||
weight=3,
|
||||
description="Warnings and deprecations"
|
||||
),
|
||||
SeverityRule(
|
||||
name="info_keywords",
|
||||
patterns=["info", "notice", "started", "stopped", "loaded"],
|
||||
severity=SeverityLevel.INFO,
|
||||
weight=1,
|
||||
description="Informational messages"
|
||||
),
|
||||
SeverityRule(
|
||||
name="debug_keywords",
|
||||
patterns=["debug", "trace", "verbose"],
|
||||
severity=SeverityLevel.DEBUG,
|
||||
weight=0,
|
||||
description="Debug and trace messages"
|
||||
),
|
||||
]
|
||||
|
||||
def __init__(self, custom_rules: Optional[List[Dict[str, Any]]] = None):
|
||||
self.rules: List[SeverityRule] = self.DEFAULT_RULES.copy()
|
||||
if custom_rules:
|
||||
self._load_custom_rules(custom_rules)
|
||||
|
||||
def _load_custom_rules(self, rules: List[Dict[str, Any]]) -> None:
|
||||
"""Load custom severity rules."""
|
||||
for rule_data in rules:
|
||||
rule = SeverityRule(
|
||||
name=rule_data.get("name", "custom"),
|
||||
patterns=rule_data.get("patterns", []),
|
||||
severity=SeverityLevel.from_string(rule_data.get("severity", "info")),
|
||||
weight=rule_data.get("weight", 1),
|
||||
description=rule_data.get("description", "")
|
||||
)
|
||||
self.rules.append(rule)
|
||||
|
||||
def classify(self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None) -> SeverityLevel:
|
||||
"""Classify severity based on level, message, and pattern."""
|
||||
score = 0
|
||||
matched_severity = SeverityLevel.UNKNOWN
|
||||
|
||||
if level:
|
||||
level_lower = level.lower()
|
||||
if level_lower in ["trace", "verbose"]:
|
||||
return SeverityLevel.DEBUG
|
||||
if level_lower in ["critical", "fatal", "emerg", "emergency"]:
|
||||
return SeverityLevel.CRITICAL
|
||||
if level_lower in ["err", "er", "e"]:
|
||||
return SeverityLevel.ERROR
|
||||
if level_lower in ["warn", "w"]:
|
||||
return SeverityLevel.WARNING
|
||||
if level_lower in ["notice"]:
|
||||
return SeverityLevel.INFO
|
||||
|
||||
inferred = SeverityLevel.from_string(level_lower)
|
||||
if inferred != SeverityLevel.UNKNOWN:
|
||||
return inferred
|
||||
|
||||
text = f"{message} {pattern_match or ''}".lower()
|
||||
|
||||
for rule in self.rules:
|
||||
for pattern in rule.patterns:
|
||||
if pattern.lower() in text:
|
||||
if rule.weight > score:
|
||||
score = rule.weight
|
||||
matched_severity = rule.severity
|
||||
|
||||
if matched_severity != SeverityLevel.UNKNOWN:
|
||||
return matched_severity
|
||||
|
||||
if not text.strip():
|
||||
return SeverityLevel.UNKNOWN
|
||||
|
||||
if pattern_match:
|
||||
return SeverityLevel.ERROR
|
||||
|
||||
return SeverityLevel.INFO
|
||||
|
||||
def classify_with_details(self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Classify severity with detailed information."""
|
||||
severity = self.classify(level, message, pattern_match)
|
||||
|
||||
text = f"{message} {pattern_match or ''}".lower()
|
||||
matched_rules = []
|
||||
|
||||
for rule in self.rules:
|
||||
for pattern in rule.patterns:
|
||||
if pattern.lower() in text:
|
||||
matched_rules.append({
|
||||
"rule": rule.name,
|
||||
"pattern": pattern,
|
||||
"severity": rule.severity.value,
|
||||
"weight": rule.weight
|
||||
})
|
||||
|
||||
return {
|
||||
"severity": severity,
|
||||
"matched_rules": matched_rules,
|
||||
"confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5
|
||||
}
|
||||
|
||||
def get_severity_order(self) -> List[SeverityLevel]:
|
||||
"""Get severity levels in order from highest to lowest."""
|
||||
return sorted(
|
||||
[SeverityLevel.CRITICAL, SeverityLevel.ERROR, SeverityLevel.WARNING,
|
||||
SeverityLevel.INFO, SeverityLevel.DEBUG, SeverityLevel.UNKNOWN],
|
||||
reverse=True
|
||||
)
|
||||
|
||||
def add_rule(self, rule: SeverityRule) -> None:
|
||||
"""Add a custom rule."""
|
||||
self.rules.append(rule)
|
||||
|
||||
def remove_rule(self, name: str) -> bool:
|
||||
"""Remove a rule by name."""
|
||||
for i, rule in enumerate(self.rules):
|
||||
if rule.name == name:
|
||||
self.rules.pop(i)
|
||||
return True
|
||||
return False
|
||||
Reference in New Issue
Block a user