fix: resolve CI/CD linting and formatting issues
Some checks failed
Some checks failed
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035) - Removed unused imports across all modules - Fixed unused variables by replacing with _ prefix - Added missing Optional type imports - Reorganized imports for proper sorting (I001) - Applied black formatting to all source files
This commit is contained in:
@@ -1,12 +1,13 @@
|
|||||||
"""Severity classification for log entries."""
|
'''Severity classification for log entries.'''
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
|
||||||
class SeverityLevel(Enum):
|
class SeverityLevel(Enum):
|
||||||
"""Severity levels for log entries."""
|
'''Severity levels for log entries.'''
|
||||||
|
|
||||||
CRITICAL = "critical"
|
CRITICAL = "critical"
|
||||||
ERROR = "error"
|
ERROR = "error"
|
||||||
WARNING = "warning"
|
WARNING = "warning"
|
||||||
@@ -16,7 +17,7 @@ class SeverityLevel(Enum):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_string(cls, level: str) -> "SeverityLevel":
|
def from_string(cls, level: str) -> "SeverityLevel":
|
||||||
"""Convert string to SeverityLevel."""
|
'''Convert string to SeverityLevel.'''
|
||||||
level_lower = level.lower()
|
level_lower = level.lower()
|
||||||
for member in cls:
|
for member in cls:
|
||||||
if member.value == level_lower:
|
if member.value == level_lower:
|
||||||
@@ -24,19 +25,19 @@ class SeverityLevel(Enum):
|
|||||||
return cls.UNKNOWN
|
return cls.UNKNOWN
|
||||||
|
|
||||||
def score(self) -> int:
|
def score(self) -> int:
|
||||||
"""Get numeric score for severity."""
|
'''Get numeric score for severity.'''
|
||||||
scores = {
|
scores = {
|
||||||
SeverityLevel.CRITICAL: 5,
|
SeverityLevel.CRITICAL: 5,
|
||||||
SeverityLevel.ERROR: 4,
|
SeverityLevel.ERROR: 4,
|
||||||
SeverityLevel.WARNING: 3,
|
SeverityLevel.WARNING: 3,
|
||||||
SeverityLevel.INFO: 2,
|
SeverityLevel.INFO: 2,
|
||||||
SeverityLevel.DEBUG: 1,
|
SeverityLevel.DEBUG: 1,
|
||||||
SeverityLevel.UNKNOWN: 0
|
SeverityLevel.UNKNOWN: 0,
|
||||||
}
|
}
|
||||||
return scores.get(self, 0)
|
return scores.get(self, 0)
|
||||||
|
|
||||||
def __lt__(self, other: "SeverityLevel") -> bool:
|
def __lt__(self, other: "SeverityLevel") -> bool:
|
||||||
"""Compare severity levels."""
|
'''Compare severity levels.'''
|
||||||
return self.score() < other.score()
|
return self.score() < other.score()
|
||||||
|
|
||||||
def __le__(self, other: "SeverityLevel") -> bool:
|
def __le__(self, other: "SeverityLevel") -> bool:
|
||||||
@@ -51,16 +52,17 @@ class SeverityLevel(Enum):
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SeverityRule:
|
class SeverityRule:
|
||||||
"""Rule for severity classification."""
|
'''Rule for severity classification.'''
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
patterns: List[str]
|
patterns: list[str]
|
||||||
severity: SeverityLevel
|
severity: SeverityLevel
|
||||||
weight: int = 1
|
weight: int = 1
|
||||||
description: str = ""
|
description: str = ""
|
||||||
|
|
||||||
|
|
||||||
class SeverityClassifier:
|
class SeverityClassifier:
|
||||||
"""Classifies log entries by severity."""
|
'''Classifies log entries by severity.'''
|
||||||
|
|
||||||
DEFAULT_RULES = [
|
DEFAULT_RULES = [
|
||||||
SeverityRule(
|
SeverityRule(
|
||||||
@@ -68,57 +70,59 @@ class SeverityClassifier:
|
|||||||
patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"],
|
patterns=["fatal", "segfault", "panic", "core dumped", "critical system failure"],
|
||||||
severity=SeverityLevel.CRITICAL,
|
severity=SeverityLevel.CRITICAL,
|
||||||
weight=10,
|
weight=10,
|
||||||
description="Critical system failures"
|
description="Critical system failures",
|
||||||
),
|
),
|
||||||
SeverityRule(
|
SeverityRule(
|
||||||
name="error_keywords",
|
name="error_keywords",
|
||||||
patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"],
|
patterns=["error", "exception", "failed", "failure", "timeout", "cannot", "unable"],
|
||||||
severity=SeverityLevel.ERROR,
|
severity=SeverityLevel.ERROR,
|
||||||
weight=5,
|
weight=5,
|
||||||
description="General errors"
|
description="General errors",
|
||||||
),
|
),
|
||||||
SeverityRule(
|
SeverityRule(
|
||||||
name="warning_keywords",
|
name="warning_keywords",
|
||||||
patterns=["warning", "warn", "deprecated", "deprecation"],
|
patterns=["warning", "warn", "deprecated", "deprecation"],
|
||||||
severity=SeverityLevel.WARNING,
|
severity=SeverityLevel.WARNING,
|
||||||
weight=3,
|
weight=3,
|
||||||
description="Warnings and deprecations"
|
description="Warnings and deprecations",
|
||||||
),
|
),
|
||||||
SeverityRule(
|
SeverityRule(
|
||||||
name="info_keywords",
|
name="info_keywords",
|
||||||
patterns=["info", "notice", "started", "stopped", "loaded"],
|
patterns=["info", "notice", "started", "stopped", "loaded"],
|
||||||
severity=SeverityLevel.INFO,
|
severity=SeverityLevel.INFO,
|
||||||
weight=1,
|
weight=1,
|
||||||
description="Informational messages"
|
description="Informational messages",
|
||||||
),
|
),
|
||||||
SeverityRule(
|
SeverityRule(
|
||||||
name="debug_keywords",
|
name="debug_keywords",
|
||||||
patterns=["debug", "trace", "verbose"],
|
patterns=["debug", "trace", "verbose"],
|
||||||
severity=SeverityLevel.DEBUG,
|
severity=SeverityLevel.DEBUG,
|
||||||
weight=0,
|
weight=0,
|
||||||
description="Debug and trace messages"
|
description="Debug and trace messages",
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, custom_rules: Optional[List[Dict[str, Any]]] = None):
|
def __init__(self, custom_rules: Optional[list[dict[str, Any]]] = None):
|
||||||
self.rules: List[SeverityRule] = self.DEFAULT_RULES.copy()
|
self.rules: list[SeverityRule] = self.DEFAULT_RULES.copy()
|
||||||
if custom_rules:
|
if custom_rules:
|
||||||
self._load_custom_rules(custom_rules)
|
self._load_custom_rules(custom_rules)
|
||||||
|
|
||||||
def _load_custom_rules(self, rules: List[Dict[str, Any]]) -> None:
|
def _load_custom_rules(self, rules: list[dict[str, Any]]) -> None:
|
||||||
"""Load custom severity rules."""
|
'''Load custom severity rules.'''
|
||||||
for rule_data in rules:
|
for rule_data in rules:
|
||||||
rule = SeverityRule(
|
rule = SeverityRule(
|
||||||
name=rule_data.get("name", "custom"),
|
name=rule_data.get("name", "custom"),
|
||||||
patterns=rule_data.get("patterns", []),
|
patterns=rule_data.get("patterns", []),
|
||||||
severity=SeverityLevel.from_string(rule_data.get("severity", "info")),
|
severity=SeverityLevel.from_string(rule_data.get("severity", "info")),
|
||||||
weight=rule_data.get("weight", 1),
|
weight=rule_data.get("weight", 1),
|
||||||
description=rule_data.get("description", "")
|
description=rule_data.get("description", ""),
|
||||||
)
|
)
|
||||||
self.rules.append(rule)
|
self.rules.append(rule)
|
||||||
|
|
||||||
def classify(self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None) -> SeverityLevel:
|
def classify(
|
||||||
"""Classify severity based on level, message, and pattern."""
|
self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None
|
||||||
|
) -> SeverityLevel:
|
||||||
|
'''Classify severity based on level, message, and pattern.'''
|
||||||
score = 0
|
score = 0
|
||||||
matched_severity = SeverityLevel.UNKNOWN
|
matched_severity = SeverityLevel.UNKNOWN
|
||||||
|
|
||||||
@@ -159,8 +163,10 @@ class SeverityClassifier:
|
|||||||
|
|
||||||
return SeverityLevel.INFO
|
return SeverityLevel.INFO
|
||||||
|
|
||||||
def classify_with_details(self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None) -> Dict[str, Any]:
|
def classify_with_details(
|
||||||
"""Classify severity with detailed information."""
|
self, level: Optional[str], message: str = "", pattern_match: Optional[str] = None
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
'''Classify severity with detailed information.'''
|
||||||
severity = self.classify(level, message, pattern_match)
|
severity = self.classify(level, message, pattern_match)
|
||||||
|
|
||||||
text = f"{message} {pattern_match or ''}".lower()
|
text = f"{message} {pattern_match or ''}".lower()
|
||||||
@@ -169,33 +175,41 @@ class SeverityClassifier:
|
|||||||
for rule in self.rules:
|
for rule in self.rules:
|
||||||
for pattern in rule.patterns:
|
for pattern in rule.patterns:
|
||||||
if pattern.lower() in text:
|
if pattern.lower() in text:
|
||||||
matched_rules.append({
|
matched_rules.append(
|
||||||
"rule": rule.name,
|
{
|
||||||
"pattern": pattern,
|
"rule": rule.name,
|
||||||
"severity": rule.severity.value,
|
"pattern": pattern,
|
||||||
"weight": rule.weight
|
"severity": rule.severity.value,
|
||||||
})
|
"weight": rule.weight,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"severity": severity,
|
"severity": severity,
|
||||||
"matched_rules": matched_rules,
|
"matched_rules": matched_rules,
|
||||||
"confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5
|
"confidence": min(1.0, len(matched_rules) * 0.3) if matched_rules else 0.5,
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_severity_order(self) -> List[SeverityLevel]:
|
def get_severity_order(self) -> list[SeverityLevel]:
|
||||||
"""Get severity levels in order from highest to lowest."""
|
'''Get severity levels in order from highest to lowest.'''
|
||||||
return sorted(
|
return sorted(
|
||||||
[SeverityLevel.CRITICAL, SeverityLevel.ERROR, SeverityLevel.WARNING,
|
[
|
||||||
SeverityLevel.INFO, SeverityLevel.DEBUG, SeverityLevel.UNKNOWN],
|
SeverityLevel.CRITICAL,
|
||||||
reverse=True
|
SeverityLevel.ERROR,
|
||||||
|
SeverityLevel.WARNING,
|
||||||
|
SeverityLevel.INFO,
|
||||||
|
SeverityLevel.DEBUG,
|
||||||
|
SeverityLevel.UNKNOWN,
|
||||||
|
],
|
||||||
|
reverse=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
def add_rule(self, rule: SeverityRule) -> None:
|
def add_rule(self, rule: SeverityRule) -> None:
|
||||||
"""Add a custom rule."""
|
'''Add a custom rule.'''
|
||||||
self.rules.append(rule)
|
self.rules.append(rule)
|
||||||
|
|
||||||
def remove_rule(self, name: str) -> bool:
|
def remove_rule(self, name: str) -> bool:
|
||||||
"""Remove a rule by name."""
|
'''Remove a rule by name.'''
|
||||||
for i, rule in enumerate(self.rules):
|
for i, rule in enumerate(self.rules):
|
||||||
if rule.name == name:
|
if rule.name == name:
|
||||||
self.rules.pop(i)
|
self.rules.pop(i)
|
||||||
|
|||||||
Reference in New Issue
Block a user