"""Syslog parser for RFC 3164 and RFC 5424 formats.""" import re from datetime import datetime from typing import Any, Optional from dateutil import parser as date_parser from loglens.parsers.base import LogParser, ParsedLogEntry class SyslogParser(LogParser): """Parser for syslog format (RFC 3164 and RFC 5424).""" format_name = "syslog" SYSLOG_RFC3164_PATTERN = re.compile( r"^(?P[A-Z][a-z]{2})\s+(?P\d{1,2})\s+(?P\d{2}):(?P\d{2}):(?P\d{2})\s+(?P[\w.-]+)\s+(?P[\w\[\]]+):\s*(?P.*)$" ) SYSLOG_RFC5424_PATTERN = re.compile( r"^(?P<\d+>)?(?P\d+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)?\s*(?P-)\s*(?P.*)$" ) PRIORITY_MAP = { 0: "emergency", 1: "alert", 2: "critical", 3: "error", 4: "warning", 5: "notice", 6: "info", 7: "debug", } FACILITY_MAP = { 0: "kernel", 1: "user", 2: "mail", 3: "daemon", 4: "auth", 5: "syslog", 6: "lpr", 7: "news", 8: "uucp", 9: "clock", 10: "authpriv", 11: "ftp", 12: "ntp", 13: "logaudit", 14: "logalert", 15: "cron", 16: "local0", 17: "local1", 18: "local2", 19: "local3", 20: "local4", 21: "local5", 22: "local6", 23: "local7", } def __init__(self): self.month_map = { "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6, "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12, } def can_parse(self, line: str) -> bool: """Check if line matches syslog format.""" line = line.strip() if not line: return False if line.startswith("<") and ">" in line: parts = line.split(">", 1) if parts[0][1:].isdigit(): return True match = self.SYSLOG_RFC3164_PATTERN.match(line) if match: month = match.group("month") if month in self.month_map: return True return False def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: """Parse a syslog line.""" line = line.strip() if not line: return None entry = ParsedLogEntry(raw_line=line, line_number=line_number) if line.startswith("<"): parsed = self._parse_rfc5424(line) else: parsed = self._parse_rfc3164(line) if parsed: entry.timestamp = parsed.get("timestamp") entry.host = parsed.get("hostname") entry.level = parsed.get("level") entry.message = parsed.get("message", "") entry.facility = parsed.get("facility") entry.logger = parsed.get("process") return entry def _parse_rfc3164(self, line: str) -> Optional[dict[str, Any]]: """Parse RFC 3164 syslog format.""" match = self.SYSLOG_RFC3164_PATTERN.match(line) if not match: return None month = match.group("month") day = int(match.group("day")) hour = int(match.group("hour")) minute = int(match.group("minute")) second = int(match.group("second")) hostname = match.group("hostname") process = match.group("process") message = match.group("message") current_year = datetime.now().year timestamp = datetime(current_year, self.month_map[month], day, hour, minute, second) level = self._infer_level(message) return { "timestamp": timestamp, "hostname": hostname, "process": process, "message": message, "level": level, } def _parse_rfc5424(self, line: str) -> Optional[dict[str, Any]]: """Parse RFC 5424 syslog format.""" match = self.SYSLOG_RFC5424_PATTERN.match(line) if not match: return None raw_pri = match.group("pri") _ = match.group("version") timestamp_str = match.group("timestamp") hostname = match.group("hostname") process = match.group("process") pid = match.group("pid") _ = match.group("msgid") _ = match.group("struct_data") message = match.group("message") try: timestamp = date_parser.isoparse(timestamp_str) except ValueError: timestamp = datetime.now() priority = None facility = None if raw_pri: pri_num = int(raw_pri[1:-1]) priority = pri_num & 0x07 facility_num = pri_num >> 3 facility = self.FACILITY_MAP.get(facility_num) level = self.PRIORITY_MAP.get(priority) if not level: level = self._infer_level(message) return { "timestamp": timestamp, "hostname": hostname, "process": f"{process}[{pid}]" if pid else process, "message": message, "level": level, "facility": facility, } def _infer_level(self, message: str) -> Optional[str]: """Infer log level from message content.""" message_lower = message.lower() if any(kw in message_lower for kw in ["emerg", "panic", "critical system"]): return "emergency" elif any(kw in message_lower for kw in ["alert", "immediate action"]): return "alert" elif any(kw in message_lower for kw in ["critical", "fatal", "segfault"]): return "critical" elif any(kw in message_lower for kw in ["error", "exception", "failed", "failure"]): return "error" elif any(kw in message_lower for kw in ["warning", "warn", "deprecation"]): return "warning" elif any(kw in message_lower for kw in ["notice"]): return "notice" elif any(kw in message_lower for kw in ["info", "information"]): return "info" elif any(kw in message_lower for kw in ["debug", "trace"]): return "debug" return None