"""Apache/Nginx log parser.""" import re from datetime import datetime from typing import Any, Dict, List, Match, Optional from urllib.parse import parse_qs, unquote from loglens.parsers.base import LogParser, ParsedLogEntry class ApacheParser(LogParser): """Parser for Apache and Nginx access/error logs.""" format_name = "apache" APACHE_COMMON_PATTERN = re.compile( r'^(?P\S+)\s+\S+\s+\S+\s+\[(?P.*?)\]\s+"(?P.*?)"\s+(?P\d{3})\s+(?P\S+)' ) APACHE_COMBINED_PATTERN = re.compile( r'^(?P\S+)\s+\S+\s+\S+\s+\[(?P.*?)\]\s+"(?P.*?)"\s+(?P\d{3})\s+(?P\S+)\s+"(?P.*?)"\s+"(?P.*?)"' ) NGINX_PATTERN = re.compile( r'^(?P\S+)\s+-\s+\S+\s+\[(?P.*?)\]\s+"(?P\S+)\s+(?P.*?)\s+(?P\S+)"\s+(?P\d{3})\s+(?P\S+)\s+"(?P.*?)"\s+"(?P.*?)"\s+"(?P.*?)"' ) ERROR_PATTERN = re.compile( r'^\[[A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P.*)$' ) STATUS_CODES = { "1xx": "informational", "2xx": "success", "3xx": "redirection", "4xx": "client_error", "5xx": "server_error" } def __init__(self, custom_format: Optional[str] = None): self.custom_format = custom_format self._compile_custom_pattern(custom_format) def _compile_custom_pattern(self, format_str: Optional[str]) -> None: """Compile custom log format pattern.""" if not format_str: self.custom_pattern = None return pattern_str = format_str.replace("%h", r"(?P\S+)") \ .replace("%l", r"\S+") \ .replace("%u", r"\S+") \ .replace("%t", r"\[(?P.*?)\]") \ .replace("%r", r'"(?P\S+)\s+(?P.*?)\s+(?P\S+)"') \ .replace("%s", r"(?P\d{3})") \ .replace("%b", r"(?P\S+)") \ .replace("%{Referer}i", r'"(?P.*?)"') \ .replace("%{User-agent}i", r'"(?P.*?)"') try: self.custom_pattern = re.compile("^" + pattern_str) except re.error: self.custom_pattern = None def can_parse(self, line: str) -> bool: """Check if line matches Apache/Nginx format.""" line = line.strip() if not line: return False if self.APACHE_COMBINED_PATTERN.match(line): return True if self.APACHE_COMMON_PATTERN.match(line): return True if self.NGINX_PATTERN.match(line): return True if self.ERROR_PATTERN.match(line): return True if self.custom_pattern and self.custom_pattern.match(line): return True return False def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: """Parse an Apache/Nginx log line.""" line = line.strip() if not line: return None entry = ParsedLogEntry( raw_line=line, line_number=line_number ) parsed = None if self.custom_pattern: match = self.custom_pattern.match(line) if match: parsed = self._extract_from_match(match, line) if not parsed: match = self.NGINX_PATTERN.match(line) if match: parsed = self._extract_from_match(match, line) if not parsed: match = self.APACHE_COMBINED_PATTERN.match(line) if match: parsed = self._extract_from_match(match, line) if not parsed: match = self.APACHE_COMMON_PATTERN.match(line) if match: parsed = self._extract_from_match(match, line) if not parsed: match = self.ERROR_PATTERN.match(line) if match: parsed = self._extract_error_from_match(match, line) if parsed: entry.timestamp = parsed.get("timestamp") entry.host = parsed.get("ip") entry.level = parsed.get("level") entry.message = parsed.get("message", "") entry.extra = parsed.get("extra", {}) return entry def _extract_from_match(self, match: Match, line: str) -> Dict[str, Any]: """Extract data from regex match.""" result = {} groups = match.groupdict() if "ip" in groups: result["ip"] = groups["ip"] if "timestamp" in groups: ts = groups["timestamp"] result["timestamp"] = self._parse_timestamp(ts) if "request" in groups: request = groups["request"] request_match = re.match(r'(?P\S+)\s+(?P.*)', request) if request_match: result["method"] = request_match.group("method") result["path"] = request_match.group("path") result["message"] = f"{request_match.group('method')} {request_match.group('path')}" else: result["message"] = request if "status" in groups: status = groups["status"] result["level"] = self._infer_level_from_status(status) result["extra"] = {} if "size" in groups and groups["size"] != "-": result["extra"] = result.get("extra", {}) try: result["extra"]["response_size"] = int(groups["size"]) except ValueError: result["extra"]["response_size"] = groups["size"] if "referer" in groups and groups["referer"] != "-": result["extra"]["referer"] = groups["referer"] if "user_agent" in groups and groups["user_agent"] != "-": result["extra"]["user_agent"] = groups["user_agent"] if "request_time" in groups: result["extra"]["request_time"] = groups["request_time"] return result def _extract_error_from_match(self, match: Match, line: str) -> Dict[str, Any]: """Extract data from error log match.""" groups = match.groupdict() result = { "message": groups.get("message", ""), } if len(match.groups()) >= 3: result["level"] = match.group(3).lower() if match.group(3) else "info" if match.group(2): result["extra"] = {"module": match.group(2)} if groups.get("timestamp"): result["timestamp"] = self._parse_timestamp(groups["timestamp"]) return result def _parse_timestamp(self, ts: str) -> Optional[datetime]: """Parse Apache/Nginx timestamp format.""" ts = ts.strip() formats = [ "%d/%b/%Y:%H:%M:%S %z", "%d/%b/%Y:%H:%M:%S", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d %H:%M:%S", ] for fmt in formats: try: return datetime.strptime(ts, fmt) except ValueError: continue return None def _infer_level_from_status(self, status: str) -> Optional[str]: """Infer log level from HTTP status code.""" if not status: return None try: code = int(status) if 100 <= code < 200: return "info" elif 200 <= code < 300: return "info" elif 300 <= code < 400: return "info" elif 400 <= code < 500: return "warning" elif 500 <= code < 600: return "error" except ValueError: pass return None