Some checks failed
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035) - Removed unused imports across all modules - Fixed unused variables by replacing with _ prefix - Added missing Optional type imports - Reorganized imports for proper sorting (I001) - Applied black formatting to all source files
234 lines
7.6 KiB
Python
234 lines
7.6 KiB
Python
'''Apache/Nginx log parser.'''
|
|
|
|
import re
|
|
from datetime import datetime
|
|
from re import Match
|
|
from typing import Any, Optional
|
|
|
|
from loglens.parsers.base import LogParser, ParsedLogEntry
|
|
|
|
|
|
class ApacheParser(LogParser):
|
|
'''Parser for Apache and Nginx access/error logs.'''
|
|
|
|
format_name = "apache"
|
|
|
|
APACHE_COMMON_PATTERN = re.compile(
|
|
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)'
|
|
)
|
|
|
|
APACHE_COMBINED_PATTERN = re.compile(
|
|
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"'
|
|
)
|
|
|
|
NGINX_PATTERN = re.compile(
|
|
r'^(?P<ip>\S+)\s+-\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"\s+"(?P<request_time>.*?)"'
|
|
)
|
|
|
|
ERROR_PATTERN = re.compile(
|
|
r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$"
|
|
)
|
|
|
|
STATUS_CODES = {
|
|
"1xx": "informational",
|
|
"2xx": "success",
|
|
"3xx": "redirection",
|
|
"4xx": "client_error",
|
|
"5xx": "server_error",
|
|
}
|
|
|
|
def __init__(self, custom_format: Optional[str] = None):
|
|
self.custom_format = custom_format
|
|
self._compile_custom_pattern(custom_format)
|
|
|
|
def _compile_custom_pattern(self, format_str: Optional[str]) -> None:
|
|
'''Compile custom log format pattern.'''
|
|
if not format_str:
|
|
self.custom_pattern = None
|
|
return
|
|
|
|
pattern_str = (
|
|
format_str.replace("%h", r"(?P<ip>\S+)")
|
|
.replace("%l", r"\S+")
|
|
.replace("%u", r"\S+")
|
|
.replace("%t", r"\[(?P<timestamp>.*?)\]")
|
|
.replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"')
|
|
.replace("%s", r"(?P<status>\d{3})")
|
|
.replace("%b", r"(?P<size>\S+)")
|
|
.replace("%{Referer}i", r'"(?P<referer>.*?)"')
|
|
.replace("%{User-agent}i", r'"(?P<user_agent>.*?)"')
|
|
)
|
|
|
|
try:
|
|
self.custom_pattern = re.compile("^" + pattern_str)
|
|
except re.error:
|
|
self.custom_pattern = None
|
|
|
|
def can_parse(self, line: str) -> bool:
|
|
'''Check if line matches Apache/Nginx format.'''
|
|
line = line.strip()
|
|
if not line:
|
|
return False
|
|
|
|
if self.APACHE_COMBINED_PATTERN.match(line):
|
|
return True
|
|
if self.APACHE_COMMON_PATTERN.match(line):
|
|
return True
|
|
if self.NGINX_PATTERN.match(line):
|
|
return True
|
|
if self.ERROR_PATTERN.match(line):
|
|
return True
|
|
if self.custom_pattern and self.custom_pattern.match(line):
|
|
return True
|
|
|
|
return False
|
|
|
|
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
|
|
'''Parse an Apache/Nginx log line.'''
|
|
line = line.strip()
|
|
if not line:
|
|
return None
|
|
|
|
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
|
|
|
|
parsed = None
|
|
|
|
if self.custom_pattern:
|
|
match = self.custom_pattern.match(line)
|
|
if match:
|
|
parsed = self._extract_from_match(match, line)
|
|
|
|
if not parsed:
|
|
match = self.NGINX_PATTERN.match(line)
|
|
if match:
|
|
parsed = self._extract_from_match(match, line)
|
|
|
|
if not parsed:
|
|
match = self.APACHE_COMBINED_PATTERN.match(line)
|
|
if match:
|
|
parsed = self._extract_from_match(match, line)
|
|
|
|
if not parsed:
|
|
match = self.APACHE_COMMON_PATTERN.match(line)
|
|
if match:
|
|
parsed = self._extract_from_match(match, line)
|
|
|
|
if not parsed:
|
|
match = self.ERROR_PATTERN.match(line)
|
|
if match:
|
|
parsed = self._extract_error_from_match(match, line)
|
|
|
|
if parsed:
|
|
entry.timestamp = parsed.get("timestamp")
|
|
entry.host = parsed.get("ip")
|
|
entry.level = parsed.get("level")
|
|
entry.message = parsed.get("message", "")
|
|
entry.extra = parsed.get("extra", {})
|
|
|
|
return entry
|
|
|
|
def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]:
|
|
'''Extract data from regex match.'''
|
|
result = {}
|
|
|
|
groups = match.groupdict()
|
|
|
|
if "ip" in groups:
|
|
result["ip"] = groups["ip"]
|
|
|
|
if "timestamp" in groups:
|
|
ts = groups["timestamp"]
|
|
result["timestamp"] = self._parse_timestamp(ts)
|
|
|
|
if "request" in groups:
|
|
request = groups["request"]
|
|
request_match = re.match(r"(?P<method>\S+)\s+(?P<path>.*)", request)
|
|
if request_match:
|
|
result["method"] = request_match.group("method")
|
|
result["path"] = request_match.group("path")
|
|
result["message"] = f"{request_match.group('method')} {request_match.group('path')}"
|
|
else:
|
|
result["message"] = request
|
|
|
|
if "status" in groups:
|
|
status = groups["status"]
|
|
result["level"] = self._infer_level_from_status(status)
|
|
result["extra"] = {}
|
|
|
|
if "size" in groups and groups["size"] != "-":
|
|
result["extra"] = result.get("extra", {})
|
|
try:
|
|
result["extra"]["response_size"] = int(groups["size"])
|
|
except ValueError:
|
|
result["extra"]["response_size"] = groups["size"]
|
|
|
|
if "referer" in groups and groups["referer"] != "-":
|
|
result["extra"]["referer"] = groups["referer"]
|
|
|
|
if "user_agent" in groups and groups["user_agent"] != "-":
|
|
result["extra"]["user_agent"] = groups["user_agent"]
|
|
|
|
if "request_time" in groups:
|
|
result["extra"]["request_time"] = groups["request_time"]
|
|
|
|
return result
|
|
|
|
def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]:
|
|
'''Extract data from error log match.'''
|
|
groups = match.groupdict()
|
|
|
|
result = {
|
|
"message": groups.get("message", ""),
|
|
}
|
|
|
|
if len(match.groups()) >= 3:
|
|
result["level"] = match.group(3).lower() if match.group(3) else "info"
|
|
if match.group(2):
|
|
result["extra"] = {"module": match.group(2)}
|
|
|
|
if groups.get("timestamp"):
|
|
result["timestamp"] = self._parse_timestamp(groups["timestamp"])
|
|
|
|
return result
|
|
|
|
def _parse_timestamp(self, ts: str) -> Optional[datetime]:
|
|
'''Parse Apache/Nginx timestamp format.'''
|
|
ts = ts.strip()
|
|
|
|
formats = [
|
|
"%d/%b/%Y:%H:%M:%S %z",
|
|
"%d/%b/%Y:%H:%M:%S",
|
|
"%Y-%m-%dT%H:%M:%S%z",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
return datetime.strptime(ts, fmt)
|
|
except ValueError:
|
|
continue
|
|
|
|
return None
|
|
|
|
def _infer_level_from_status(self, status: str) -> Optional[str]:
|
|
'''Infer log level from HTTP status code.'''
|
|
if not status:
|
|
return None
|
|
|
|
try:
|
|
code = int(status)
|
|
if 100 <= code < 200:
|
|
return "info"
|
|
elif 200 <= code < 300:
|
|
return "info"
|
|
elif 300 <= code < 400:
|
|
return "info"
|
|
elif 400 <= code < 500:
|
|
return "warning"
|
|
elif 500 <= code < 600:
|
|
return "error"
|
|
except ValueError:
|
|
pass
|
|
|
|
return None
|