Files
loglens-cli/loglens/parsers/apache_parser.py
7000pctAUTO 01af6ad53f
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
Add JSON, syslog, and Apache parsers
2026-02-02 10:06:59 +00:00

234 lines
7.6 KiB
Python

"""Apache/Nginx log parser."""
import re
from datetime import datetime
from re import Match
from typing import Any, Optional
from loglens.parsers.base import LogParser, ParsedLogEntry
class ApacheParser(LogParser):
"""Parser for Apache and Nginx access/error logs."""
format_name = "apache"
APACHE_COMMON_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)'
)
APACHE_COMBINED_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"'
)
NGINX_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+-\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"\s+"(?P<request_time>.*?)"'
)
ERROR_PATTERN = re.compile(
r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$"
)
STATUS_CODES = {
"1xx": "informational",
"2xx": "success",
"3xx": "redirection",
"4xx": "client_error",
"5xx": "server_error",
}
def __init__(self, custom_format: Optional[str] = None):
self.custom_format = custom_format
self._compile_custom_pattern(custom_format)
def _compile_custom_pattern(self, format_str: Optional[str]) -> None:
"""Compile custom log format pattern."""
if not format_str:
self.custom_pattern = None
return
pattern_str = (
format_str.replace("%h", r"(?P<ip>\S+)")
.replace("%l", r"\S+")
.replace("%u", r"\S+")
.replace("%t", r"\[(?P<timestamp>.*?)\]")
.replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"')
.replace("%s", r"(?P<status>\d{3})")
.replace("%b", r"(?P<size>\S+)")
.replace("%{Referer}i", r'"(?P<referer>.*?)"')
.replace("%{User-agent}i", r'"(?P<user_agent>.*?)"')
)
try:
self.custom_pattern = re.compile("^" + pattern_str)
except re.error:
self.custom_pattern = None
def can_parse(self, line: str) -> bool:
"""Check if line matches Apache/Nginx format."""
line = line.strip()
if not line:
return False
if self.APACHE_COMBINED_PATTERN.match(line):
return True
if self.APACHE_COMMON_PATTERN.match(line):
return True
if self.NGINX_PATTERN.match(line):
return True
if self.ERROR_PATTERN.match(line):
return True
if self.custom_pattern and self.custom_pattern.match(line):
return True
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse an Apache/Nginx log line."""
line = line.strip()
if not line:
return None
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
parsed = None
if self.custom_pattern:
match = self.custom_pattern.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.NGINX_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMBINED_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMMON_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.ERROR_PATTERN.match(line)
if match:
parsed = self._extract_error_from_match(match, line)
if parsed:
entry.timestamp = parsed.get("timestamp")
entry.host = parsed.get("ip")
entry.level = parsed.get("level")
entry.message = parsed.get("message", "")
entry.extra = parsed.get("extra", {})
return entry
def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]:
"""Extract data from regex match."""
result = {}
groups = match.groupdict()
if "ip" in groups:
result["ip"] = groups["ip"]
if "timestamp" in groups:
ts = groups["timestamp"]
result["timestamp"] = self._parse_timestamp(ts)
if "request" in groups:
request = groups["request"]
request_match = re.match(r"(?P<method>\S+)\s+(?P<path>.*)", request)
if request_match:
result["method"] = request_match.group("method")
result["path"] = request_match.group("path")
result["message"] = f"{request_match.group('method')} {request_match.group('path')}"
else:
result["message"] = request
if "status" in groups:
status = groups["status"]
result["level"] = self._infer_level_from_status(status)
result["extra"] = {}
if "size" in groups and groups["size"] != "-":
result["extra"] = result.get("extra", {})
try:
result["extra"]["response_size"] = int(groups["size"])
except ValueError:
result["extra"]["response_size"] = groups["size"]
if "referer" in groups and groups["referer"] != "-":
result["extra"]["referer"] = groups["referer"]
if "user_agent" in groups and groups["user_agent"] != "-":
result["extra"]["user_agent"] = groups["user_agent"]
if "request_time" in groups:
result["extra"]["request_time"] = groups["request_time"]
return result
def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]:
"""Extract data from error log match."""
groups = match.groupdict()
result = {
"message": groups.get("message", ""),
}
if len(match.groups()) >= 3:
result["level"] = match.group(3).lower() if match.group(3) else "info"
if match.group(2):
result["extra"] = {"module": match.group(2)}
if groups.get("timestamp"):
result["timestamp"] = self._parse_timestamp(groups["timestamp"])
return result
def _parse_timestamp(self, ts: str) -> Optional[datetime]:
"""Parse Apache/Nginx timestamp format."""
ts = ts.strip()
formats = [
"%d/%b/%Y:%H:%M:%S %z",
"%d/%b/%Y:%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
def _infer_level_from_status(self, status: str) -> Optional[str]:
"""Infer log level from HTTP status code."""
if not status:
return None
try:
code = int(status)
if 100 <= code < 200:
return "info"
elif 200 <= code < 300:
return "info"
elif 300 <= code < 400:
return "info"
elif 400 <= code < 500:
return "warning"
elif 500 <= code < 600:
return "error"
except ValueError:
pass
return None