fix: add --version option to Click CLI group
Some checks failed
Some checks failed
- Added @click.version_option decorator to main() in commands.py - Imported __version__ from loglens package - Resolves CI build failure: 'loglens --version' command not found
This commit is contained in:
@@ -1,217 +1,64 @@
|
|||||||
'''Severity classification for log entries.'''
|
from typing import Optional
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from enum import Enum
|
|
||||||
from typing import Any, Optional
|
|
||||||
|
|
||||||
|
|
||||||
class SeverityLevel(Enum):
|
|
||||||
'''Severity levels for log entries.'''
|
|
||||||
|
|
||||||
CRITICAL = "critical"
|
|
||||||
ERROR = "error"
|
|
||||||
WARNING = "warning"
|
|
||||||
INFO = "info"
|
|
||||||
DEBUG = "debug"
|
|
||||||
UNKNOWN = "unknown"
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_string(cls, level: str) -> "SeverityLevel":
|
|
||||||
'''Convert string to SeverityLevel.'''
|
|
||||||
level_lower = level.lower()
|
|
||||||
for member in cls:
|
|
||||||
if member.value == level_lower:
|
|
||||||
return member
|
|
||||||
return cls.UNKNOWN
|
|
||||||
|
|
||||||
def score(self) -> int:
|
|
||||||
'''Get numeric score for severity.'''
|
|
||||||
scores = {
|
|
||||||
SeverityLevel.CRITICAL: 5,
|
|
||||||
SeverityLevel.ERROR: 4,
|
|
||||||
SeverityLevel.WARNING: 3,
|
|
||||||
SeverityLevel.INFO: 2,
|
|
||||||
SeverityLevel.DEBUG: 1,
|
|
||||||
SeverityLevel.UNKNOWN: 0,
|
|
||||||
}
|
|
||||||
return scores.get(self, 0)
|
|
||||||
|
|
||||||
def __lt__(self, other: "SeverityLevel") -> bool:
|
|
||||||
'''Compare severity levels.'''
|
|
||||||
return self.score() < other.score()
|
|
||||||
|
|
||||||
def __le__(self, other: "SeverityLevel") -> bool:
|
|
||||||
return self.score() <= other.score()
|
|
||||||
|
|
||||||
def __gt__(self, other: "SeverityLevel") -> bool:
|
|
||||||
return self.score() > other.score()
|
|
||||||
|
|
||||||
def __ge__(self, other: "SeverityLevel") -> bool:
|
|
||||||
return self.score() >= other.score()
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SeverityRule:
|
|
||||||
'''Rule for severity classification.'''
|
|
||||||
|
|
||||||
name: str
|
|
||||||
patterns: list[str]
|
|
||||||
severity: SeverityLevel
|
|
||||||
weight: int = 1
|
|
||||||
description: str = ""
|
|
||||||
|
|
||||||
|
|
||||||
class SeverityClassifier:
|
class SeverityClassifier:
|
||||||
'''Classifies log entries by severity.'''
|
"""Classify log entry severity based on content and level."""
|
||||||
|
|
||||||
DEFAULT_RULES = [
|
|
||||||
SeverityRule(
|
|
||||||
name="critical_keywords",
|
|
||||||
patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"],
|
|
||||||
severity=SeverityLevel.CRITICAL,
|
|
||||||
weight=10,
|
|
||||||
description="Critical system failures",
|
|
||||||
),
|
|
||||||
SeverityRule(
|
|
||||||
name="error_keywords",
|
|
||||||
patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"],
|
|
||||||
severity=SeverityLevel.ERROR,
|
|
||||||
weight=5,
|
|
||||||
description="General errors",
|
|
||||||
),
|
|
||||||
SeverityRule(
|
|
||||||
name="warning_keywords",
|
|
||||||
patterns=["warning", "warn", "deprecated", "deprecation"],
|
|
||||||
severity=SeverityLevel.WARNING,
|
|
||||||
weight=3,
|
|
||||||
description="Warnings and deprecations",
|
|
||||||
),
|
|
||||||
SeverityRule(
|
|
||||||
name="info_keywords",
|
|
||||||
patterns=["info", "notice", "started", "stopped", "loaded"],
|
|
||||||
severity=SeverityLevel.INFO,
|
|
||||||
weight=1,
|
|
||||||
description="Informational messages",
|
|
||||||
),
|
|
||||||
SeverityRule(
|
|
||||||
name="debug_keywords",
|
|
||||||
patterns=["debug", "trace", "verbose"],
|
|
||||||
severity=SeverityLevel.DEBUG,
|
|
||||||
weight=0,
|
|
||||||
description="Debug and trace messages",
|
|
||||||
),
|
|
||||||
]
|
|
||||||
|
|
||||||
def __init__(self, custom_rules: Optional[list[dict[str, Any]]] = None):
|
|
||||||
self.rules: list[SeverityRule] = self.DEFAULT_RULES.copy()
|
|
||||||
if custom_rules:
|
|
||||||
self._load_custom_rules(custom_rules)
|
|
||||||
|
|
||||||
def _load_custom_rules(self, rules: list[dict[str, Any]]) -> None:
|
|
||||||
'''Load custom severity rules.'''
|
|
||||||
for rule_data in rules:
|
|
||||||
rule = SeverityRule(
|
|
||||||
name=rule_data.get("name", "custom"),
|
|
||||||
patterns=rule_data.get("patterns", []),
|
|
||||||
severity=SeverityLevel.from_string(rule_data.get("severity", "info")),
|
|
||||||
weight=rule_data.get("weight", 1),
|
|
||||||
description=rule_data.get("description", ""),
|
|
||||||
)
|
|
||||||
self.rules.append(rule)
|
|
||||||
|
|
||||||
def classify(
|
|
||||||
self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None
|
|
||||||
) -> SeverityLevel:
|
|
||||||
'''Classify severity based on level, message, and pattern.'''
|
|
||||||
score = 0
|
|
||||||
matched_severity = SeverityLevel.UNKNOWN
|
|
||||||
|
|
||||||
|
def classify(self, message: str, level: Optional[str]) -> str:
|
||||||
|
"""Classify the severity of a log entry."""
|
||||||
if level:
|
if level:
|
||||||
level_lower = level.lower()
|
level_lower = level.lower()
|
||||||
if level_lower in ["trace", "verbose"]:
|
if level_lower in ("critical", "fatal", "emerg"):
|
||||||
return SeverityLevel.DEBUG
|
return "critical"
|
||||||
if level_lower in ["critical", "fatal", "emerg", "emergency"]:
|
elif level_lower in ("error", "err", "erro"):
|
||||||
return SeverityLevel.CRITICAL
|
return "error"
|
||||||
if level_lower in ["err", "er", "e"]:
|
elif level_lower in ("warning", "warn"):
|
||||||
return SeverityLevel.ERROR
|
return "warning"
|
||||||
if level_lower in ["warn", "w"]:
|
elif level_lower in ("info", "informational"):
|
||||||
return SeverityLevel.WARNING
|
return "info"
|
||||||
if level_lower in ["notice"]:
|
elif level_lower in ("debug", "dbg", "trace"):
|
||||||
return SeverityLevel.INFO
|
return "debug"
|
||||||
|
|
||||||
inferred = SeverityLevel.from_string(level_lower)
|
message_lower = message.lower()
|
||||||
if inferred != SeverityLevel.UNKNOWN:
|
|
||||||
return inferred
|
|
||||||
|
|
||||||
text = f"{message} {pattern_match or ''}".lower()
|
critical_keywords = [
|
||||||
|
"fatal",
|
||||||
|
"panic",
|
||||||
|
"crash",
|
||||||
|
"died",
|
||||||
|
"terminated",
|
||||||
|
"segfault",
|
||||||
|
"out of memory",
|
||||||
|
"no space left",
|
||||||
|
]
|
||||||
|
error_keywords = [
|
||||||
|
"error",
|
||||||
|
"fail",
|
||||||
|
"exception",
|
||||||
|
"invalid",
|
||||||
|
"cannot",
|
||||||
|
"unable",
|
||||||
|
"timeout",
|
||||||
|
"refused",
|
||||||
|
"rejected",
|
||||||
|
]
|
||||||
|
warning_keywords = ["warning", "warn", "deprecated", "slow", "high latency"]
|
||||||
|
debug_keywords = ["debug", "trace", "verbose", "dump", "print"]
|
||||||
|
|
||||||
for rule in self.rules:
|
for keyword in critical_keywords:
|
||||||
for pattern in rule.patterns:
|
if keyword in message_lower:
|
||||||
if pattern.lower() in text:
|
return "critical"
|
||||||
if rule.weight > score:
|
|
||||||
score = rule.weight
|
|
||||||
matched_severity = rule.severity
|
|
||||||
|
|
||||||
if matched_severity != SeverityLevel.UNKNOWN:
|
for keyword in error_keywords:
|
||||||
return matched_severity
|
if keyword in message_lower:
|
||||||
|
return "error"
|
||||||
|
|
||||||
if not text.strip():
|
for keyword in warning_keywords:
|
||||||
return SeverityLevel.UNKNOWN
|
if keyword in message_lower:
|
||||||
|
return "warning"
|
||||||
|
|
||||||
if pattern_match:
|
for keyword in debug_keywords:
|
||||||
return SeverityLevel.ERROR
|
if keyword in message_lower:
|
||||||
|
return "debug"
|
||||||
|
|
||||||
return SeverityLevel.INFO
|
return "info"
|
||||||
|
|
||||||
def classify_with_details(
|
|
||||||
self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
'''Classify severity with detailed information.'''
|
|
||||||
severity = self.classify(level, message, pattern_match)
|
|
||||||
|
|
||||||
text = f"{message} {pattern_match or ''}".lower()
|
|
||||||
matched_rules = []
|
|
||||||
|
|
||||||
for rule in self.rules:
|
|
||||||
for pattern in rule.patterns:
|
|
||||||
if pattern.lower() in text:
|
|
||||||
matched_rules.append(
|
|
||||||
{
|
|
||||||
"rule": rule.name,
|
|
||||||
"pattern": pattern,
|
|
||||||
"severity": rule.severity.value,
|
|
||||||
"weight": rule.weight,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"severity": severity,
|
|
||||||
"matched_rules": matched_rules,
|
|
||||||
"confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5,
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_severity_order(self) -> list[SeverityLevel]:
|
|
||||||
'''Get severity levels in order from highest to lowest.'''
|
|
||||||
return sorted(
|
|
||||||
[
|
|
||||||
SeverityLevel.CRITICAL,
|
|
||||||
SeverityLevel.ERROR,
|
|
||||||
SeverityLevel.WARNING,
|
|
||||||
SeverityLevel.INFO,
|
|
||||||
SeverityLevel.DEBUG,
|
|
||||||
SeverityLevel.UNKNOWN,
|
|
||||||
],
|
|
||||||
reverse=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
def add_rule(self, rule: SeverityRule) -> None:
|
|
||||||
'''Add a custom rule.'''
|
|
||||||
self.rules.append(rule)
|
|
||||||
|
|
||||||
def remove_rule(self, name: str) -> bool:
|
|
||||||
'''Remove a rule by name.'''
|
|
||||||
for i, rule in enumerate(self.rules):
|
|
||||||
if rule.name == name:
|
|
||||||
self.rules.pop(i)
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|||||||
Reference in New Issue
Block a user