From c83ede07009c7de72e15f61ba07f7fbd2fd90a71 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 08:03:24 +0000 Subject: [PATCH] Add parsers: JSON, Syslog, Apache, and factory --- loglens/parsers/apache_parser.py | 234 +++++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 loglens/parsers/apache_parser.py diff --git a/loglens/parsers/apache_parser.py b/loglens/parsers/apache_parser.py new file mode 100644 index 0000000..c1c83f8 --- /dev/null +++ b/loglens/parsers/apache_parser.py @@ -0,0 +1,234 @@ +"""Apache/Nginx log parser.""" + +import re +from datetime import datetime +from typing import Any, Dict, List, Match, Optional +from urllib.parse import parse_qs, unquote + +from loglens.parsers.base import LogParser, ParsedLogEntry + + +class ApacheParser(LogParser): + """Parser for Apache and Nginx access/error logs.""" + + format_name = "apache" + + APACHE_COMMON_PATTERN = re.compile( + r'^(?P\S+)\s+\S+\s+\S+\s+\[(?P.*?)\]\s+"(?P.*?)"\s+(?P\d{3})\s+(?P\S+)' + ) + + APACHE_COMBINED_PATTERN = re.compile( + r'^(?P\S+)\s+\S+\s+\S+\s+\[(?P.*?)\]\s+"(?P.*?)"\s+(?P\d{3})\s+(?P\S+)\s+"(?P.*?)"\s+"(?P.*?)"' + ) + + NGINX_PATTERN = re.compile( + r'^(?P\S+)\s+-\s+\S+\s+\[(?P.*?)\]\s+"(?P\S+)\s+(?P.*?)\s+(?P\S+)"\s+(?P\d{3})\s+(?P\S+)\s+"(?P.*?)"\s+"(?P.*?)"\s+"(?P.*?)"' + ) + + ERROR_PATTERN = re.compile( + r'^\[[A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P.*)$' + ) + + STATUS_CODES = { + "1xx": "informational", + "2xx": "success", + "3xx": "redirection", + "4xx": "client_error", + "5xx": "server_error" + } + + def __init__(self, custom_format: Optional[str] = None): + self.custom_format = custom_format + self._compile_custom_pattern(custom_format) + + def _compile_custom_pattern(self, format_str: Optional[str]) -> None: + """Compile custom log format pattern.""" + if not format_str: + self.custom_pattern = None + return + + pattern_str = format_str.replace("%h", r"(?P\S+)") \ + .replace("%l", r"\S+") \ + .replace("%u", r"\S+") \ + .replace("%t", r"\[(?P.*?)\]") \ + .replace("%r", r'"(?P\S+)\s+(?P.*?)\s+(?P\S+)"') \ + .replace("%s", r"(?P\d{3})") \ + .replace("%b", r"(?P\S+)") \ + .replace("%{Referer}i", r'"(?P.*?)"') \ + .replace("%{User-agent}i", r'"(?P.*?)"') + + try: + self.custom_pattern = re.compile("^" + pattern_str) + except re.error: + self.custom_pattern = None + + def can_parse(self, line: str) -> bool: + """Check if line matches Apache/Nginx format.""" + line = line.strip() + if not line: + return False + + if self.APACHE_COMBINED_PATTERN.match(line): + return True + if self.APACHE_COMMON_PATTERN.match(line): + return True + if self.NGINX_PATTERN.match(line): + return True + if self.ERROR_PATTERN.match(line): + return True + if self.custom_pattern and self.custom_pattern.match(line): + return True + + return False + + def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: + """Parse an Apache/Nginx log line.""" + line = line.strip() + if not line: + return None + + entry = ParsedLogEntry( + raw_line=line, + line_number=line_number + ) + + parsed = None + + if self.custom_pattern: + match = self.custom_pattern.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.NGINX_PATTERN.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.APACHE_COMBINED_PATTERN.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.APACHE_COMMON_PATTERN.match(line) + if match: + parsed = self._extract_from_match(match, line) + + if not parsed: + match = self.ERROR_PATTERN.match(line) + if match: + parsed = self._extract_error_from_match(match, line) + + if parsed: + entry.timestamp = parsed.get("timestamp") + entry.host = parsed.get("ip") + entry.level = parsed.get("level") + entry.message = parsed.get("message", "") + entry.extra = parsed.get("extra", {}) + + return entry + + def _extract_from_match(self, match: Match, line: str) -> Dict[str, Any]: + """Extract data from regex match.""" + result = {} + + groups = match.groupdict() + + if "ip" in groups: + result["ip"] = groups["ip"] + + if "timestamp" in groups: + ts = groups["timestamp"] + result["timestamp"] = self._parse_timestamp(ts) + + if "request" in groups: + request = groups["request"] + request_match = re.match(r'(?P\S+)\s+(?P.*)', request) + if request_match: + result["method"] = request_match.group("method") + result["path"] = request_match.group("path") + result["message"] = f"{request_match.group('method')} {request_match.group('path')}" + else: + result["message"] = request + + if "status" in groups: + status = groups["status"] + result["level"] = self._infer_level_from_status(status) + result["extra"] = {} + + if "size" in groups and groups["size"] != "-": + result["extra"] = result.get("extra", {}) + try: + result["extra"]["response_size"] = int(groups["size"]) + except ValueError: + result["extra"]["response_size"] = groups["size"] + + if "referer" in groups and groups["referer"] != "-": + result["extra"]["referer"] = groups["referer"] + + if "user_agent" in groups and groups["user_agent"] != "-": + result["extra"]["user_agent"] = groups["user_agent"] + + if "request_time" in groups: + result["extra"]["request_time"] = groups["request_time"] + + return result + + def _extract_error_from_match(self, match: Match, line: str) -> Dict[str, Any]: + """Extract data from error log match.""" + groups = match.groupdict() + + result = { + "message": groups.get("message", ""), + } + + if len(match.groups()) >= 3: + result["level"] = match.group(3).lower() if match.group(3) else "info" + if match.group(2): + result["extra"] = {"module": match.group(2)} + + if groups.get("timestamp"): + result["timestamp"] = self._parse_timestamp(groups["timestamp"]) + + return result + + def _parse_timestamp(self, ts: str) -> Optional[datetime]: + """Parse Apache/Nginx timestamp format.""" + ts = ts.strip() + + formats = [ + "%d/%b/%Y:%H:%M:%S %z", + "%d/%b/%Y:%H:%M:%S", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%d %H:%M:%S", + ] + + for fmt in formats: + try: + return datetime.strptime(ts, fmt) + except ValueError: + continue + + return None + + def _infer_level_from_status(self, status: str) -> Optional[str]: + """Infer log level from HTTP status code.""" + if not status: + return None + + try: + code = int(status) + if 100 <= code < 200: + return "info" + elif 200 <= code < 300: + return "info" + elif 300 <= code < 400: + return "info" + elif 400 <= code < 500: + return "warning" + elif 500 <= code < 600: + return "error" + except ValueError: + pass + + return None