Add JSON, syslog, and Apache parsers
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-02 10:06:59 +00:00
parent 4af2c953f7
commit 01af6ad53f

View File

@@ -1,63 +1,233 @@
"""Apache/Nginx log parser."""
import re
from datetime import datetime
from typing import Optional
from re import Match
from typing import Any, Optional
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
from loglens.parsers.base import LogParser, ParsedLogEntry
class ApacheParser(BaseParser):
"""Parser for Apache/Nginx log formats."""
class ApacheParser(LogParser):
"""Parser for Apache and Nginx access/error logs."""
def __init__(self):
self.combined_log_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-) "(?P<referer>.*?)" "(?P<user_agent>.*?)"'
)
self.common_log_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-)'
format_name = "apache"
APACHE_COMMON_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)'
)
def get_format(self) -> LogFormat:
return LogFormat.APACHE
APACHE_COMBINED_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"'
)
def parse(self, line: str) -> Optional[ParsedEntry]:
match = self.combined_log_pattern.match(line)
if not match:
match = self.common_log_pattern.match(line)
NGINX_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+-\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"\s+"(?P<request_time>.*?)"'
)
if not match:
ERROR_PATTERN = re.compile(
r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$"
)
STATUS_CODES = {
"1xx": "informational",
"2xx": "success",
"3xx": "redirection",
"4xx": "client_error",
"5xx": "server_error",
}
def __init__(self, custom_format: Optional[str] = None):
self.custom_format = custom_format
self._compile_custom_pattern(custom_format)
def _compile_custom_pattern(self, format_str: Optional[str]) -> None:
"""Compile custom log format pattern."""
if not format_str:
self.custom_pattern = None
return
pattern_str = (
format_str.replace("%h", r"(?P<ip>\S+)")
.replace("%l", r"\S+")
.replace("%u", r"\S+")
.replace("%t", r"\[(?P<timestamp>.*?)\]")
.replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"')
.replace("%s", r"(?P<status>\d{3})")
.replace("%b", r"(?P<size>\S+)")
.replace("%{Referer}i", r'"(?P<referer>.*?)"')
.replace("%{User-agent}i", r'"(?P<user_agent>.*?)"')
)
try:
self.custom_pattern = re.compile("^" + pattern_str)
except re.error:
self.custom_pattern = None
def can_parse(self, line: str) -> bool:
"""Check if line matches Apache/Nginx format."""
line = line.strip()
if not line:
return False
if self.APACHE_COMBINED_PATTERN.match(line):
return True
if self.APACHE_COMMON_PATTERN.match(line):
return True
if self.NGINX_PATTERN.match(line):
return True
if self.ERROR_PATTERN.match(line):
return True
if self.custom_pattern and self.custom_pattern.match(line):
return True
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse an Apache/Nginx log line."""
line = line.strip()
if not line:
return None
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
parsed = None
if self.custom_pattern:
match = self.custom_pattern.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.NGINX_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMBINED_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMMON_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.ERROR_PATTERN.match(line)
if match:
parsed = self._extract_error_from_match(match, line)
if parsed:
entry.timestamp = parsed.get("timestamp")
entry.host = parsed.get("ip")
entry.level = parsed.get("level")
entry.message = parsed.get("message", "")
entry.extra = parsed.get("extra", {})
return entry
def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]:
"""Extract data from regex match."""
result = {}
groups = match.groupdict()
if "ip" in groups:
result["ip"] = groups["ip"]
if "timestamp" in groups:
ts = groups["timestamp"]
result["timestamp"] = self._parse_timestamp(ts)
if "request" in groups:
request = groups["request"]
request_match = re.match(r"(?P<method>\S+)\s+(?P<path>.*)", request)
if request_match:
result["method"] = request_match.group("method")
result["path"] = request_match.group("path")
result["message"] = f"{request_match.group('method')} {request_match.group('path')}"
else:
result["message"] = request
if "status" in groups:
status = groups["status"]
result["level"] = self._infer_level_from_status(status)
result["extra"] = {}
if "size" in groups and groups["size"] != "-":
result["extra"] = result.get("extra", {})
try:
result["extra"]["response_size"] = int(groups["size"])
except ValueError:
result["extra"]["response_size"] = groups["size"]
if "referer" in groups and groups["referer"] != "-":
result["extra"]["referer"] = groups["referer"]
if "user_agent" in groups and groups["user_agent"] != "-":
result["extra"]["user_agent"] = groups["user_agent"]
if "request_time" in groups:
result["extra"]["request_time"] = groups["request_time"]
return result
def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]:
"""Extract data from error log match."""
groups = match.groupdict()
result = {
"message": groups.get("message", ""),
}
if len(match.groups()) >= 3:
result["level"] = match.group(3).lower() if match.group(3) else "info"
if match.group(2):
result["extra"] = {"module": match.group(2)}
if groups.get("timestamp"):
result["timestamp"] = self._parse_timestamp(groups["timestamp"])
return result
def _parse_timestamp(self, ts: str) -> Optional[datetime]:
"""Parse Apache/Nginx timestamp format."""
ts = ts.strip()
formats = [
"%d/%b/%Y:%H:%M:%S %z",
"%d/%b/%Y:%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
def _infer_level_from_status(self, status: str) -> Optional[str]:
"""Infer log level from HTTP status code."""
if not status:
return None
try:
timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S %z")
except ValueError:
timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S")
return ParsedEntry(
raw_line=line,
format=self.get_format(),
timestamp=timestamp.isoformat(),
level=self._extract_level(int(match.group("status"))),
message=f"{match.group('method')} {match.group('path')} {match.group('protocol')} - Status: {match.group('status')}",
metadata={
"ip": match.group("ip"),
"method": match.group("method"),
"path": match.group("path"),
"protocol": match.group("protocol"),
"status_code": int(match.group("status")),
"size": match.group("size"),
"referer": match.group("referer"),
"user_agent": match.group("user_agent"),
},
)
def _extract_level(self, status_code: int) -> str:
if status_code >= 500:
return "error"
elif status_code >= 400:
code = int(status)
if 100 <= code < 200:
return "info"
elif 200 <= code < 300:
return "info"
elif 300 <= code < 400:
return "info"
elif 400 <= code < 500:
return "warning"
elif status_code >= 300:
return "info"
elif status_code >= 200:
return "info"
else:
return "debug"
elif 500 <= code < 600:
return "error"
except ValueError:
pass
return None