diff --git a/loglens/parsers/apache_parser.py b/loglens/parsers/apache_parser.py index 11854ae..478c339 100644 --- a/loglens/parsers/apache_parser.py +++ b/loglens/parsers/apache_parser.py @@ -1,63 +1,233 @@ +"""Apache/Nginx log parser.""" + import re from datetime import datetime -from typing import Optional +from re import Match +from typing import Any, Optional -from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry +from loglens.parsers.base import LogParser, ParsedLogEntry -class ApacheParser(BaseParser): - """Parser for Apache/Nginx log formats.""" +class ApacheParser(LogParser): + """Parser for Apache and Nginx access/error logs.""" - def __init__(self): - self.combined_log_pattern = re.compile( - r'^(?P\S+) \S+ \S+ \[(?P.*?)\] "(?P\S+) (?P\S+) (?P\S+)" (?P\d{3}) (?P\d+|-) "(?P.*?)" "(?P.*?)"' - ) - self.common_log_pattern = re.compile( - r'^(?P\S+) \S+ \S+ \[(?P.*?)\] "(?P\S+) (?P\S+) (?P\S+)" (?P\d{3}) (?P\d+|-)' + format_name = "apache" + + APACHE_COMMON_PATTERN = re.compile( + r'^(?P\S+)\s+\S+\s+\S+\s+\[(?P.*?)\]\s+"(?P.*?)"\s+(?P\d{3})\s+(?P\S+)' + ) + + APACHE_COMBINED_PATTERN = re.compile( + r'^(?P\S+)\s+\S+\s+\S+\s+\[(?P.*?)\]\s+"(?P.*?)"\s+(?P\d{3})\s+(?P\S+)\s+"(?P.*?)"\s+"(?P.*?)"' + ) + + NGINX_PATTERN = re.compile( + r'^(?P\S+)\s+-\s+\S+\s+\[(?P.*?)\]\s+"(?P\S+)\s+(?P.*?)\s+(?P\S+)"\s+(?P\d{3})\s+(?P\S+)\s+"(?P.*?)"\s+"(?P.*?)"\s+"(?P.*?)"' + ) + + ERROR_PATTERN = re.compile( + r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P.*)$" + ) + + STATUS_CODES = { + "1xx": "informational", + "2xx": "success", + "3xx": "redirection", + "4xx": "client_error", + "5xx": "server_error", + } + + def __init__(self, custom_format: Optional[str] = None): + self.custom_format = custom_format + self._compile_custom_pattern(custom_format) + + def _compile_custom_pattern(self, format_str: Optional[str]) -> None: + """Compile custom log format pattern.""" + if not format_str: + self.custom_pattern = None + return + + pattern_str = ( + format_str.replace("%h", r"(?P\S+)") + .replace("%l", r"\S+") + .replace("%u", r"\S+") + .replace("%t", r"\[(?P.*?)\]") + .replace("%r", r'"(?P\S+)\s+(?P.*?)\s+(?P\S+)"') + .replace("%s", r"(?P\d{3})") + .replace("%b", r"(?P\S+)") + .replace("%{Referer}i", r'"(?P.*?)"') + .replace("%{User-agent}i", r'"(?P.*?)"') ) - def get_format(self) -> LogFormat: - return LogFormat.APACHE + try: + self.custom_pattern = re.compile("^" + pattern_str) + except re.error: + self.custom_pattern = None - def parse(self, line: str) -> Optional[ParsedEntry]: - match = self.combined_log_pattern.match(line) - if not match: - match = self.common_log_pattern.match(line) + def can_parse(self, line: str) -> bool: + """Check if line matches Apache/Nginx format.""" + line = line.strip() + if not line: + return False - if not match: + if self.APACHE_COMBINED_PATTERN.match(line): + return True + if self.APACHE_COMMON_PATTERN.match(line): + return True + if self.NGINX_PATTERN.match(line): + return True + if self.ERROR_PATTERN.match(line): + return True + if self.custom_pattern and self.custom_pattern.match(line): + return True + + return False + + def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: + """Parse an Apache/Nginx log line.""" + line = line.strip() + if not line: + return None + + entry = ParsedLogEntry(raw_line=line, line_number=line_number) + + parsed = None + + if self.custom_pattern: + match = self.custom_pattern.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.NGINX_PATTERN.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.APACHE_COMBINED_PATTERN.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.APACHE_COMMON_PATTERN.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.ERROR_PATTERN.match(line) + if match: + parsed = self._extract_error_from_match(match, line) + + if parsed: + entry.timestamp = parsed.get("timestamp") + entry.host = parsed.get("ip") + entry.level = parsed.get("level") + entry.message = parsed.get("message", "") + entry.extra = parsed.get("extra", {}) + + return entry + + def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]: + """Extract data from regex match.""" + result = {} + + groups = match.groupdict() + + if "ip" in groups: + result["ip"] = groups["ip"] + + if "timestamp" in groups: + ts = groups["timestamp"] + result["timestamp"] = self._parse_timestamp(ts) + + if "request" in groups: + request = groups["request"] + request_match = re.match(r"(?P\S+)\s+(?P.*)", request) + if request_match: + result["method"] = request_match.group("method") + result["path"] = request_match.group("path") + result["message"] = f"{request_match.group('method')} {request_match.group('path')}" + else: + result["message"] = request + + if "status" in groups: + status = groups["status"] + result["level"] = self._infer_level_from_status(status) + result["extra"] = {} + + if "size" in groups and groups["size"] != "-": + result["extra"] = result.get("extra", {}) + try: + result["extra"]["response_size"] = int(groups["size"]) + except ValueError: + result["extra"]["response_size"] = groups["size"] + + if "referer" in groups and groups["referer"] != "-": + result["extra"]["referer"] = groups["referer"] + + if "user_agent" in groups and groups["user_agent"] != "-": + result["extra"]["user_agent"] = groups["user_agent"] + + if "request_time" in groups: + result["extra"]["request_time"] = groups["request_time"] + + return result + + def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]: + """Extract data from error log match.""" + groups = match.groupdict() + + result = { + "message": groups.get("message", ""), + } + + if len(match.groups()) >= 3: + result["level"] = match.group(3).lower() if match.group(3) else "info" + if match.group(2): + result["extra"] = {"module": match.group(2)} + + if groups.get("timestamp"): + result["timestamp"] = self._parse_timestamp(groups["timestamp"]) + + return result + + def _parse_timestamp(self, ts: str) -> Optional[datetime]: + """Parse Apache/Nginx timestamp format.""" + ts = ts.strip() + + formats = [ + "%d/%b/%Y:%H:%M:%S %z", + "%d/%b/%Y:%H:%M:%S", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%d %H:%M:%S", + ] + + for fmt in formats: + try: + return datetime.strptime(ts, fmt) + except ValueError: + continue + + return None + + def _infer_level_from_status(self, status: str) -> Optional[str]: + """Infer log level from HTTP status code.""" + if not status: return None try: - timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S %z") + code = int(status) + if 100 <= code < 200: + return "info" + elif 200 <= code < 300: + return "info" + elif 300 <= code < 400: + return "info" + elif 400 <= code < 500: + return "warning" + elif 500 <= code < 600: + return "error" except ValueError: - timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S") + pass - return ParsedEntry( - raw_line=line, - format=self.get_format(), - timestamp=timestamp.isoformat(), - level=self._extract_level(int(match.group("status"))), - message=f"{match.group('method')} {match.group('path')} {match.group('protocol')} - Status: {match.group('status')}", - metadata={ - "ip": match.group("ip"), - "method": match.group("method"), - "path": match.group("path"), - "protocol": match.group("protocol"), - "status_code": int(match.group("status")), - "size": match.group("size"), - "referer": match.group("referer"), - "user_agent": match.group("user_agent"), - }, - ) - - def _extract_level(self, status_code: int) -> str: - if status_code >= 500: - return "error" - elif status_code >= 400: - return "warning" - elif status_code >= 300: - return "info" - elif status_code >= 200: - return "info" - else: - return "debug" + return None