import re from datetime import datetime from typing import Optional from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry class SyslogParser(BaseParser): """Parser for syslog formats.""" RFC5424_PATTERN = re.compile( r'^<(?P[0-9]+)>(?P[0-9]) (?P[-:T\.0-9Z+]+) (?P[^\s]+) (?P[^\s]+) (?P[^\s]+) (?P[^\s]+) (?P.*)$' ) RFC3164_PATTERN = re.compile( r'^(?P[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P[^\s]+) (?P[^\s]+)(?:\[(?P[0-9]+)\])?: (?P.*)$' ) BSD_PATTERN = re.compile( r'^(?P[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P[^\s]+) (?P[^\s]+): (?P.*)$' ) def get_format(self) -> LogFormat: return LogFormat.SYSLOG def parse(self, line: str) -> Optional[ParsedEntry]: match = self.RFC5424_PATTERN.match(line) if not match: match = self.RFC3164_PATTERN.match(line) if not match: match = self.BSD_PATTERN.match(line) if not match: return None timestamp = self._parse_timestamp(match.group("timestamp"), match.group("version") if "version" in match.groupdict() else None) prival = match.group("prival") if "prival" in match.groupdict() else None level = self._extract_severity(prival) if prival else None return ParsedEntry( raw_line=line, format=self.get_format(), timestamp=timestamp, level=level, message=match.group("msg"), metadata={ "hostname": match.group("hostname"), "appname": match.group("appname"), "procid": match.group("procid") if "procid" in match.groupdict() else None, }, ) def _parse_timestamp(self, timestamp_str: str, version: Optional[str] = None) -> Optional[str]: if version == "1": try: dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f%z") return dt.isoformat() except ValueError: pass try: dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S%z") return dt.isoformat() except ValueError: pass for fmt in ["%b %d %H:%M:%S", "%b %d %H:%M:%S%z", "%Y-%m-%d %H:%M:%S"]: try: dt = datetime.strptime(timestamp_str, fmt) return dt.isoformat() except ValueError: continue return timestamp_str def _extract_severity(self, prival: str) -> Optional[str]: try: code = int(prival) severity = (code & 0x07) severity_map = { 0: "critical", 1: "error", 2: "warning", 3: "info", 4: "debug", 5: "debug", 6: "debug", 7: "debug", } return severity_map.get(severity) except (ValueError, TypeError): return None