Add JSON, syslog, and Apache parsers
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled

This commit is contained in:
2026-02-02 10:06:58 +00:00
parent f2ca3181ee
commit 4af2c953f7

View File

@@ -1,92 +1,212 @@
"""Syslog parser for RFC 3164 and RFC 5424 formats."""
import re
from datetime import datetime
from typing import Optional
from typing import Any, Optional
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
from dateutil import parser as date_parser
from loglens.parsers.base import LogParser, ParsedLogEntry
class SyslogParser(BaseParser):
"""Parser for syslog formats."""
class SyslogParser(LogParser):
"""Parser for syslog format (RFC 3164 and RFC 5424)."""
RFC5424_PATTERN = re.compile(
r'^<(?P<prival>[0-9]+)>(?P<version>[0-9]) (?P<timestamp>[-:T\.0-9Z+]+) (?P<hostname>[^\s]+) (?P<appname>[^\s]+) (?P<procid>[^\s]+) (?P<msgid>[^\s]+) (?P<msg>.*)$'
format_name = "syslog"
SYSLOG_RFC3164_PATTERN = re.compile(
r"^(?P<month>[A-Z][a-z]{2})\s+(?P<day>\d{1,2})\s+(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s+(?P<hostname>[\w.-]+)\s+(?P<process>[\w\[\]]+):\s*(?P<message>.*)$"
)
RFC3164_PATTERN = re.compile(
r'^(?P<timestamp>[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P<hostname>[^\s]+) (?P<appname>[^\s]+)(?:\[(?P<procid>[0-9]+)\])?: (?P<msg>.*)$'
SYSLOG_RFC5424_PATTERN = re.compile(
r"^(?P<pri><\d+>)?(?P<version>\d+)\s+(?P<timestamp>\S+)\s+(?P<hostname>\S+)\s+(?P<process>\S+)\s+(?P<pid>\S+)\s+(?P<msgid>\S+)?\s*(?P<struct_data>-)\s*(?P<message>.*)$"
)
BSD_PATTERN = re.compile(
r'^(?P<timestamp>[A-Z][a-z]{2}\s+[0-9]{1,2}\s+[0-9]{2}:[0-9]{2}:[0-9]{2}) (?P<hostname>[^\s]+) (?P<appname>[^\s]+): (?P<msg>.*)$'
)
def get_format(self) -> LogFormat:
return LogFormat.SYSLOG
def parse(self, line: str) -> Optional[ParsedEntry]:
match = self.RFC5424_PATTERN.match(line)
if not match:
match = self.RFC3164_PATTERN.match(line)
if not match:
match = self.BSD_PATTERN.match(line)
if not match:
return None
timestamp = self._parse_timestamp(match.group("timestamp"), match.group("version") if "version" in match.groupdict() else None)
prival = match.group("prival") if "prival" in match.groupdict() else None
level = self._extract_severity(prival) if prival else None
return ParsedEntry(
raw_line=line,
format=self.get_format(),
timestamp=timestamp,
level=level,
message=match.group("msg"),
metadata={
"hostname": match.group("hostname"),
"appname": match.group("appname"),
"procid": match.group("procid") if "procid" in match.groupdict() else None,
},
)
def _parse_timestamp(self, timestamp_str: str, version: Optional[str] = None) -> Optional[str]:
if version == "1":
try:
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%f%z")
return dt.isoformat()
except ValueError:
pass
try:
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S%z")
return dt.isoformat()
except ValueError:
pass
for fmt in ["%b %d %H:%M:%S", "%b %d %H:%M:%S%z", "%Y-%m-%d %H:%M:%S"]:
try:
dt = datetime.strptime(timestamp_str, fmt)
return dt.isoformat()
except ValueError:
continue
return timestamp_str
def _extract_severity(self, prival: str) -> Optional[str]:
try:
code = int(prival)
severity = (code & 0x07)
severity_map = {
0: "critical",
1: "error",
2: "warning",
3: "info",
4: "debug",
5: "debug",
6: "debug",
PRIORITY_MAP = {
0: "emergency",
1: "alert",
2: "critical",
3: "error",
4: "warning",
5: "notice",
6: "info",
7: "debug",
}
return severity_map.get(severity)
except (ValueError, TypeError):
FACILITY_MAP = {
0: "kernel",
1: "user",
2: "mail",
3: "daemon",
4: "auth",
5: "syslog",
6: "lpr",
7: "news",
8: "uucp",
9: "clock",
10: "authpriv",
11: "ftp",
12: "ntp",
13: "logaudit",
14: "logalert",
15: "cron",
16: "local0",
17: "local1",
18: "local2",
19: "local3",
20: "local4",
21: "local5",
22: "local6",
23: "local7",
}
def __init__(self):
self.month_map = {
"Jan": 1,
"Feb": 2,
"Mar": 3,
"Apr": 4,
"May": 5,
"Jun": 6,
"Jul": 7,
"Aug": 8,
"Sep": 9,
"Oct": 10,
"Nov": 11,
"Dec": 12,
}
def can_parse(self, line: str) -> bool:
"""Check if line matches syslog format."""
line = line.strip()
if not line:
return False
if line.startswith("<") and ">" in line:
parts = line.split(">", 1)
if parts[0][1:].isdigit():
return True
match = self.SYSLOG_RFC3164_PATTERN.match(line)
if match:
month = match.group("month")
if month in self.month_map:
return True
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse a syslog line."""
line = line.strip()
if not line:
return None
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
if line.startswith("<"):
parsed = self._parse_rfc5424(line)
else:
parsed = self._parse_rfc3164(line)
if parsed:
entry.timestamp = parsed.get("timestamp")
entry.host = parsed.get("hostname")
entry.level = parsed.get("level")
entry.message = parsed.get("message", "")
entry.facility = parsed.get("facility")
entry.logger = parsed.get("process")
return entry
def _parse_rfc3164(self, line: str) -> Optional[dict[str, Any]]:
"""Parse RFC 3164 syslog format."""
match = self.SYSLOG_RFC3164_PATTERN.match(line)
if not match:
return None
month = match.group("month")
day = int(match.group("day"))
hour = int(match.group("hour"))
minute = int(match.group("minute"))
second = int(match.group("second"))
hostname = match.group("hostname")
process = match.group("process")
message = match.group("message")
current_year = datetime.now().year
timestamp = datetime(current_year, self.month_map[month], day, hour, minute, second)
level = self._infer_level(message)
return {
"timestamp": timestamp,
"hostname": hostname,
"process": process,
"message": message,
"level": level,
}
def _parse_rfc5424(self, line: str) -> Optional[dict[str, Any]]:
"""Parse RFC 5424 syslog format."""
match = self.SYSLOG_RFC5424_PATTERN.match(line)
if not match:
return None
raw_pri = match.group("pri")
_ = match.group("version")
timestamp_str = match.group("timestamp")
hostname = match.group("hostname")
process = match.group("process")
pid = match.group("pid")
_ = match.group("msgid")
_ = match.group("struct_data")
message = match.group("message")
try:
timestamp = date_parser.isoparse(timestamp_str)
except ValueError:
timestamp = datetime.now()
priority = None
facility = None
if raw_pri:
pri_num = int(raw_pri[1:-1])
priority = pri_num & 0x07
facility_num = pri_num >> 3
facility = self.FACILITY_MAP.get(facility_num)
level = self.PRIORITY_MAP.get(priority)
if not level:
level = self._infer_level(message)
return {
"timestamp": timestamp,
"hostname": hostname,
"process": f"{process}[{pid}]" if pid else process,
"message": message,
"level": level,
"facility": facility,
}
def _infer_level(self, message: str) -> Optional[str]:
"""Infer log level from message content."""
message_lower = message.lower()
if any(kw in message_lower for kw in ["emerg", "panic", "critical system"]):
return "emergency"
elif any(kw in message_lower for kw in ["alert", "immediate action"]):
return "alert"
elif any(kw in message_lower for kw in ["critical", "fatal", "segfault"]):
return "critical"
elif any(kw in message_lower for kw in ["error", "exception", "failed", "failure"]):
return "error"
elif any(kw in message_lower for kw in ["warning", "warn", "deprecation"]):
return "warning"
elif any(kw in message_lower for kw in ["notice"]):
return "notice"
elif any(kw in message_lower for kw in ["info", "information"]):
return "info"
elif any(kw in message_lower for kw in ["debug", "trace"]):
return "debug"
return None