Files
loglens-cli/loglens/parsers/syslog_parser.py
7000pctAUTO 572eedf7a0
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
fix: add --version option to Click CLI group
- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
2026-02-02 09:25:15 +00:00

93 lines
3.1 KiB
Python

import re
from datetime import datetime
from typing import Optional
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
class SyslogParser(BaseParser):
"""Parser for syslog formats."""
RFC5424_PATTERN = re.compile(
r'^<(?P<prival>[0-9]+)>(?P<version>[0-9]) (?P<timestamp>[-:T\.0-9Z+]+) (?P<hostname>[^\s]+) (?P<appname>[^\s]+) (?P<procid>[^\s]+) (?P<msgid>[^\s]+) (?P<msg>.*)$'
)
RFC3164_PATTERN = re.compile(
r'^(?P<timestamp>[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P<hostname>[^\s]+) (?P<appname>[^\s]+)(?:\[(?P<procid>[0-9]+)\])?: (?P<msg>.*)$'
)
BSD_PATTERN = re.compile(
r'^(?P<timestamp>[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P<hostname>[^\s]+) (?P<appname>[^\s]+): (?P<msg>.*)$'
)
def get_format(self) -> LogFormat:
return LogFormat.SYSLOG
def parse(self, line: str) -> Optional[ParsedEntry]:
match = self.RFC5424_PATTERN.match(line)
if not match:
match = self.RFC3164_PATTERN.match(line)
if not match:
match = self.BSD_PATTERN.match(line)
if not match:
return None
timestamp = self._parse_timestamp(match.group("timestamp"), match.group("version") if "version" in match.groupdict() else None)
prival = match.group("prival") if "prival" in match.groupdict() else None
level = self._extract_severity(prival) if prival else None
return ParsedEntry(
raw_line=line,
format=self.get_format(),
timestamp=timestamp,
level=level,
message=match.group("msg"),
metadata={
"hostname": match.group("hostname"),
"appname": match.group("appname"),
"procid": match.group("procid") if "procid" in match.groupdict() else None,
},
)
def _parse_timestamp(self, timestamp_str: str, version: Optional[str] = None) -> Optional[str]:
if version == "1":
try:
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f%z")
return dt.isoformat()
except ValueError:
pass
try:
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S%z")
return dt.isoformat()
except ValueError:
pass
for fmt in ["%b %d %H:%M:%S", "%b %d %H:%M:%S%z", "%Y-%m-%d %H:%M:%S"]:
try:
dt = datetime.strptime(timestamp_str, fmt)
return dt.isoformat()
except ValueError:
continue
return timestamp_str
def _extract_severity(self, prival: str) -> Optional[str]:
try:
code = int(prival)
severity = (code & 0x07)
severity_map = {
0: "critical",
1: "error",
2: "warning",
3: "info",
4: "debug",
5: "debug",
6: "debug",
7: "debug",
}
return severity_map.get(severity)
except (ValueError, TypeError):
return None