Some checks failed
- Added @click.version_option decorator to main() in commands.py - Imported __version__ from loglens package - Resolves CI build failure: 'loglens --version' command not found
93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
import re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
|
|
|
|
|
|
class SyslogParser(BaseParser):
|
|
"""Parser for syslog formats."""
|
|
|
|
RFC5424_PATTERN = re.compile(
|
|
r'^<(?P<prival>[0-9]+)>(?P<version>[0-9]) (?P<timestamp>[-:T\.0-9Z+]+) (?P<hostname>[^\s]+) (?P<appname>[^\s]+) (?P<procid>[^\s]+) (?P<msgid>[^\s]+) (?P<msg>.*)$'
|
|
)
|
|
|
|
RFC3164_PATTERN = re.compile(
|
|
r'^(?P<timestamp>[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P<hostname>[^\s]+) (?P<appname>[^\s]+)(?:\[(?P<procid>[0-9]+)\])?: (?P<msg>.*)$'
|
|
)
|
|
|
|
BSD_PATTERN = re.compile(
|
|
r'^(?P<timestamp>[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P<hostname>[^\s]+) (?P<appname>[^\s]+): (?P<msg>.*)$'
|
|
)
|
|
|
|
def get_format(self) -> LogFormat:
|
|
return LogFormat.SYSLOG
|
|
|
|
def parse(self, line: str) -> Optional[ParsedEntry]:
|
|
match = self.RFC5424_PATTERN.match(line)
|
|
if not match:
|
|
match = self.RFC3164_PATTERN.match(line)
|
|
if not match:
|
|
match = self.BSD_PATTERN.match(line)
|
|
|
|
if not match:
|
|
return None
|
|
|
|
timestamp = self._parse_timestamp(match.group("timestamp"), match.group("version") if "version" in match.groupdict() else None)
|
|
|
|
prival = match.group("prival") if "prival" in match.groupdict() else None
|
|
level = self._extract_severity(prival) if prival else None
|
|
|
|
return ParsedEntry(
|
|
raw_line=line,
|
|
format=self.get_format(),
|
|
timestamp=timestamp,
|
|
level=level,
|
|
message=match.group("msg"),
|
|
metadata={
|
|
"hostname": match.group("hostname"),
|
|
"appname": match.group("appname"),
|
|
"procid": match.group("procid") if "procid" in match.groupdict() else None,
|
|
},
|
|
)
|
|
|
|
def _parse_timestamp(self, timestamp_str: str, version: Optional[str] = None) -> Optional[str]:
|
|
if version == "1":
|
|
try:
|
|
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f%z")
|
|
return dt.isoformat()
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S%z")
|
|
return dt.isoformat()
|
|
except ValueError:
|
|
pass
|
|
|
|
for fmt in ["%b %d %H:%M:%S", "%b %d %H:%M:%S%z", "%Y-%m-%d %H:%M:%S"]:
|
|
try:
|
|
dt = datetime.strptime(timestamp_str, fmt)
|
|
return dt.isoformat()
|
|
except ValueError:
|
|
continue
|
|
|
|
return timestamp_str
|
|
|
|
def _extract_severity(self, prival: str) -> Optional[str]:
|
|
try:
|
|
code = int(prival)
|
|
severity = (code & 0x07)
|
|
severity_map = {
|
|
0: "critical",
|
|
1: "error",
|
|
2: "warning",
|
|
3: "info",
|
|
4: "debug",
|
|
5: "debug",
|
|
6: "debug",
|
|
7: "debug",
|
|
}
|
|
return severity_map.get(severity)
|
|
except (ValueError, TypeError):
|
|
return None
|