diff --git a/loglens/parsers/syslog_parser.py b/loglens/parsers/syslog_parser.py index 39f4b7f..7443518 100644 --- a/loglens/parsers/syslog_parser.py +++ b/loglens/parsers/syslog_parser.py @@ -1,24 +1,25 @@ -"""Syslog parser for RFC 3164 and RFC 5424 formats.""" +'''Syslog parser for RFC 3164 and RFC 5424 formats.''' import re from datetime import datetime -from typing import Any, Dict, List, Match, Optional +from typing import Any, Optional + from dateutil import parser as date_parser from loglens.parsers.base import LogParser, ParsedLogEntry class SyslogParser(LogParser): - """Parser for syslog format (RFC 3164 and RFC 5424).""" + '''Parser for syslog format (RFC 3164 and RFC 5424).''' format_name = "syslog" SYSLOG_RFC3164_PATTERN = re.compile( - r'^(?P[A-Z][a-z]{2})\s+(?P\d{1,2})\s+(?P\d{2}):(?P\d{2}):(?P\d{2})\s+(?P[\w.-]+)\s+(?P[\w\[\]]+):\s*(?P.*)$' + r"^(?P[A-Z][a-z]{2})\s+(?P\d{1,2})\s+(?P\d{2}):(?P\d{2}):(?P\d{2})\s+(?P[\w.-]+)\s+(?P[\w\[\]]+):\s*(?P.*)$" ) SYSLOG_RFC5424_PATTERN = re.compile( - r'^(?P<\d+>)?(?P\d+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)?\s*(?P-)\s*(?P.*)$' + r"^(?P<\d+>)?(?P\d+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)?\s*(?P-)\s*(?P.*)$" ) PRIORITY_MAP = { @@ -29,7 +30,7 @@ class SyslogParser(LogParser): 4: "warning", 5: "notice", 6: "info", - 7: "debug" + 7: "debug", } FACILITY_MAP = { @@ -56,17 +57,27 @@ class SyslogParser(LogParser): 20: "local4", 21: "local5", 22: "local6", - 23: "local7" + 23: "local7", } def __init__(self): self.month_map = { - "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, - "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12 + "Jan": 1, + "Feb": 2, + "Mar": 3, + "Apr": 4, + "May": 5, + "Jun": 6, + "Jul": 7, + "Aug": 8, + "Sep": 9, + "Oct": 10, + "Nov": 11, + "Dec": 12, } def can_parse(self, line: str) -> bool: - """Check if line matches syslog format.""" + '''Check if line matches syslog format.''' line = line.strip() if not line: return False @@ -85,15 +96,12 @@ class SyslogParser(LogParser): return False def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: - """Parse a syslog line.""" + '''Parse a syslog line.''' line = line.strip() if not line: return None - entry = ParsedLogEntry( - raw_line=line, - line_number=line_number - ) + entry = ParsedLogEntry(raw_line=line, line_number=line_number) if line.startswith("<"): parsed = self._parse_rfc5424(line) @@ -110,8 +118,8 @@ class SyslogParser(LogParser): return entry - def _parse_rfc3164(self, line: str) -> Optional[Dict[str, Any]]: - """Parse RFC 3164 syslog format.""" + def _parse_rfc3164(self, line: str) -> Optional[dict[str, Any]]: + '''Parse RFC 3164 syslog format.''' match = self.SYSLOG_RFC3164_PATTERN.match(line) if not match: return None @@ -126,9 +134,7 @@ class SyslogParser(LogParser): message = match.group("message") current_year = datetime.now().year - timestamp = datetime( - current_year, self.month_map[month], day, hour, minute, second - ) + timestamp = datetime(current_year, self.month_map[month], day, hour, minute, second) level = self._infer_level(message) @@ -137,23 +143,23 @@ class SyslogParser(LogParser): "hostname": hostname, "process": process, "message": message, - "level": level + "level": level, } - def _parse_rfc5424(self, line: str) -> Optional[Dict[str, Any]]: - """Parse RFC 5424 syslog format.""" + def _parse_rfc5424(self, line: str) -> Optional[dict[str, Any]]: + '''Parse RFC 5424 syslog format.''' match = self.SYSLOG_RFC5424_PATTERN.match(line) if not match: return None raw_pri = match.group("pri") - version = match.group("version") + _ = match.group("version") timestamp_str = match.group("timestamp") hostname = match.group("hostname") process = match.group("process") pid = match.group("pid") - msgid = match.group("msgid") - struct_data = match.group("struct_data") + _ = match.group("msgid") + _ = match.group("struct_data") message = match.group("message") try: @@ -163,7 +169,6 @@ class SyslogParser(LogParser): priority = None facility = None - level = None if raw_pri: pri_num = int(raw_pri[1:-1]) priority = pri_num & 0x07 @@ -180,11 +185,11 @@ class SyslogParser(LogParser): "process": f"{process}[{pid}]" if pid else process, "message": message, "level": level, - "facility": facility + "facility": facility, } def _infer_level(self, message: str) -> Optional[str]: - """Infer log level from message content.""" + '''Infer log level from message content.''' message_lower = message.lower() if any(kw in message_lower for kw in ["emerg", "panic", "critical system"]):