fix: resolve CI/CD linting and formatting issues
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

- Replaced deprecated typing.Dict/List/Tuple with native types (UP035)
- Removed unused imports across all modules
- Fixed unused variables by replacing with _ prefix
- Added missing Optional type imports
- Reorganized imports for proper sorting (I001)
- Applied black formatting to all source files
This commit is contained in:
2026-02-02 08:52:05 +00:00
parent 3712786db6
commit 1d6d354f80

View File

@@ -1,24 +1,25 @@
"""Syslog parser for RFC 3164 and RFC 5424 formats."""
'''Syslog parser for RFC 3164 and RFC 5424 formats.'''
import re
from datetime import datetime
from typing import Any, Dict, List, Match, Optional
from typing import Any, Optional
from dateutil import parser as date_parser
from loglens.parsers.base import LogParser, ParsedLogEntry
class SyslogParser(LogParser):
"""Parser for syslog format (RFC 3164 and RFC 5424)."""
'''Parser for syslog format (RFC 3164 and RFC 5424).'''
format_name = "syslog"
SYSLOG_RFC3164_PATTERN = re.compile(
r'^(?P<month>[A-Z][a-z]{2})\s+(?P<day>\d{1,2})\s+(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s+(?P<hostname>[\w.-]+)\s+(?P<process>[\w\[\]]+):\s*(?P<message>.*)$'
r"^(?P<month>[A-Z][a-z]{2})\s+(?P<day>\d{1,2})\s+(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s+(?P<hostname>[\w.-]+)\s+(?P<process>[\w\[\]]+):\s*(?P<message>.*)$"
)
SYSLOG_RFC5424_PATTERN = re.compile(
r'^(?P<pri><\d+>)?(?P<version>\d+)\s+(?P<timestamp>\S+)\s+(?P<hostname>\S+)\s+(?P<process>\S+)\s+(?P<pid>\S+)\s+(?P<msgid>\S+)?\s*(?P<struct_data>-)\s*(?P<message>.*)$'
r"^(?P<pri><\d+>)?(?P<version>\d+)\s+(?P<timestamp>\S+)\s+(?P<hostname>\S+)\s+(?P<process>\S+)\s+(?P<pid>\S+)\s+(?P<msgid>\S+)?\s*(?P<struct_data>-)\s*(?P<message>.*)$"
)
PRIORITY_MAP = {
@@ -29,7 +30,7 @@ class SyslogParser(LogParser):
4: "warning",
5: "notice",
6: "info",
7: "debug"
7: "debug",
}
FACILITY_MAP = {
@@ -56,17 +57,27 @@ class SyslogParser(LogParser):
20: "local4",
21: "local5",
22: "local6",
23: "local7"
23: "local7",
}
def __init__(self):
self.month_map = {
"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
"Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12
"Jan": 1,
"Feb": 2,
"Mar": 3,
"Apr": 4,
"May": 5,
"Jun": 6,
"Jul": 7,
"Aug": 8,
"Sep": 9,
"Oct": 10,
"Nov": 11,
"Dec": 12,
}
def can_parse(self, line: str) -> bool:
"""Check if line matches syslog format."""
'''Check if line matches syslog format.'''
line = line.strip()
if not line:
return False
@@ -85,15 +96,12 @@ class SyslogParser(LogParser):
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse a syslog line."""
'''Parse a syslog line.'''
line = line.strip()
if not line:
return None
entry = ParsedLogEntry(
raw_line=line,
line_number=line_number
)
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
if line.startswith("<"):
parsed = self._parse_rfc5424(line)
@@ -110,8 +118,8 @@ class SyslogParser(LogParser):
return entry
def _parse_rfc3164(self, line: str) -> Optional[Dict[str, Any]]:
"""Parse RFC 3164 syslog format."""
def _parse_rfc3164(self, line: str) -> Optional[dict[str, Any]]:
'''Parse RFC 3164 syslog format.'''
match = self.SYSLOG_RFC3164_PATTERN.match(line)
if not match:
return None
@@ -126,9 +134,7 @@ class SyslogParser(LogParser):
message = match.group("message")
current_year = datetime.now().year
timestamp = datetime(
current_year, self.month_map[month], day, hour, minute, second
)
timestamp = datetime(current_year, self.month_map[month], day, hour, minute, second)
level = self._infer_level(message)
@@ -137,23 +143,23 @@ class SyslogParser(LogParser):
"hostname": hostname,
"process": process,
"message": message,
"level": level
"level": level,
}
def _parse_rfc5424(self, line: str) -> Optional[Dict[str, Any]]:
"""Parse RFC 5424 syslog format."""
def _parse_rfc5424(self, line: str) -> Optional[dict[str, Any]]:
'''Parse RFC 5424 syslog format.'''
match = self.SYSLOG_RFC5424_PATTERN.match(line)
if not match:
return None
raw_pri = match.group("pri")
version = match.group("version")
_ = match.group("version")
timestamp_str = match.group("timestamp")
hostname = match.group("hostname")
process = match.group("process")
pid = match.group("pid")
msgid = match.group("msgid")
struct_data = match.group("struct_data")
_ = match.group("msgid")
_ = match.group("struct_data")
message = match.group("message")
try:
@@ -163,7 +169,6 @@ class SyslogParser(LogParser):
priority = None
facility = None
level = None
if raw_pri:
pri_num = int(raw_pri[1:-1])
priority = pri_num & 0x07
@@ -180,11 +185,11 @@ class SyslogParser(LogParser):
"process": f"{process}[{pid}]" if pid else process,
"message": message,
"level": level,
"facility": facility
"facility": facility,
}
def _infer_level(self, message: str) -> Optional[str]:
"""Infer log level from message content."""
'''Infer log level from message content.'''
message_lower = message.lower()
if any(kw in message_lower for kw in ["emerg", "panic", "critical system"]):