Add parsers: JSON, Syslog, Apache, and factory
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled

This commit is contained in:
2026-02-02 08:03:24 +00:00
parent e2f3baf47f
commit c83ede0700

View File

@@ -0,0 +1,234 @@
"""Apache/Nginx log parser."""
import re
from datetime import datetime
from typing import Any, Dict, List, Match, Optional
from urllib.parse import parse_qs, unquote
from loglens.parsers.base import LogParser, ParsedLogEntry
class ApacheParser(LogParser):
"""Parser for Apache and Nginx access/error logs."""
format_name = "apache"
APACHE_COMMON_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)'
)
APACHE_COMBINED_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"'
)
NGINX_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+-\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"\s+"(?P<request_time>.*?)"'
)
ERROR_PATTERN = re.compile(
r'^\[[A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$'
)
STATUS_CODES = {
"1xx": "informational",
"2xx": "success",
"3xx": "redirection",
"4xx": "client_error",
"5xx": "server_error"
}
def __init__(self, custom_format: Optional[str] = None):
self.custom_format = custom_format
self._compile_custom_pattern(custom_format)
def _compile_custom_pattern(self, format_str: Optional[str]) -> None:
"""Compile custom log format pattern."""
if not format_str:
self.custom_pattern = None
return
pattern_str = format_str.replace("%h", r"(?P<ip>\S+)") \
.replace("%l", r"\S+") \
.replace("%u", r"\S+") \
.replace("%t", r"\[(?P<timestamp>.*?)\]") \
.replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"') \
.replace("%s", r"(?P<status>\d{3})") \
.replace("%b", r"(?P<size>\S+)") \
.replace("%{Referer}i", r'"(?P<referer>.*?)"') \
.replace("%{User-agent}i", r'"(?P<user_agent>.*?)"')
try:
self.custom_pattern = re.compile("^" + pattern_str)
except re.error:
self.custom_pattern = None
def can_parse(self, line: str) -> bool:
"""Check if line matches Apache/Nginx format."""
line = line.strip()
if not line:
return False
if self.APACHE_COMBINED_PATTERN.match(line):
return True
if self.APACHE_COMMON_PATTERN.match(line):
return True
if self.NGINX_PATTERN.match(line):
return True
if self.ERROR_PATTERN.match(line):
return True
if self.custom_pattern and self.custom_pattern.match(line):
return True
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse an Apache/Nginx log line."""
line = line.strip()
if not line:
return None
entry = ParsedLogEntry(
raw_line=line,
line_number=line_number
)
parsed = None
if self.custom_pattern:
match = self.custom_pattern.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.NGINX_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMBINED_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMMON_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.ERROR_PATTERN.match(line)
if match:
parsed = self._extract_error_from_match(match, line)
if parsed:
entry.timestamp = parsed.get("timestamp")
entry.host = parsed.get("ip")
entry.level = parsed.get("level")
entry.message = parsed.get("message", "")
entry.extra = parsed.get("extra", {})
return entry
def _extract_from_match(self, match: Match, line: str) -> Dict[str, Any]:
"""Extract data from regex match."""
result = {}
groups = match.groupdict()
if "ip" in groups:
result["ip"] = groups["ip"]
if "timestamp" in groups:
ts = groups["timestamp"]
result["timestamp"] = self._parse_timestamp(ts)
if "request" in groups:
request = groups["request"]
request_match = re.match(r'(?P<method>\S+)\s+(?P<path>.*)', request)
if request_match:
result["method"] = request_match.group("method")
result["path"] = request_match.group("path")
result["message"] = f"{request_match.group('method')} {request_match.group('path')}"
else:
result["message"] = request
if "status" in groups:
status = groups["status"]
result["level"] = self._infer_level_from_status(status)
result["extra"] = {}
if "size" in groups and groups["size"] != "-":
result["extra"] = result.get("extra", {})
try:
result["extra"]["response_size"] = int(groups["size"])
except ValueError:
result["extra"]["response_size"] = groups["size"]
if "referer" in groups and groups["referer"] != "-":
result["extra"]["referer"] = groups["referer"]
if "user_agent" in groups and groups["user_agent"] != "-":
result["extra"]["user_agent"] = groups["user_agent"]
if "request_time" in groups:
result["extra"]["request_time"] = groups["request_time"]
return result
def _extract_error_from_match(self, match: Match, line: str) -> Dict[str, Any]:
"""Extract data from error log match."""
groups = match.groupdict()
result = {
"message": groups.get("message", ""),
}
if len(match.groups()) >= 3:
result["level"] = match.group(3).lower() if match.group(3) else "info"
if match.group(2):
result["extra"] = {"module": match.group(2)}
if groups.get("timestamp"):
result["timestamp"] = self._parse_timestamp(groups["timestamp"])
return result
def _parse_timestamp(self, ts: str) -> Optional[datetime]:
"""Parse Apache/Nginx timestamp format."""
ts = ts.strip()
formats = [
"%d/%b/%Y:%H:%M:%S %z",
"%d/%b/%Y:%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
def _infer_level_from_status(self, status: str) -> Optional[str]:
"""Infer log level from HTTP status code."""
if not status:
return None
try:
code = int(status)
if 100 <= code < 200:
return "info"
elif 200 <= code < 300:
return "info"
elif 300 <= code < 400:
return "info"
elif 400 <= code < 500:
return "warning"
elif 500 <= code < 600:
return "error"
except ValueError:
pass
return None