diff --git a/loglens/parsers/apache_parser.py b/loglens/parsers/apache_parser.py index c1c83f8..d67a719 100644 --- a/loglens/parsers/apache_parser.py +++ b/loglens/parsers/apache_parser.py @@ -1,15 +1,15 @@ -"""Apache/Nginx log parser.""" +'''Apache/Nginx log parser.''' import re from datetime import datetime -from typing import Any, Dict, List, Match, Optional -from urllib.parse import parse_qs, unquote +from re import Match +from typing import Any, Optional from loglens.parsers.base import LogParser, ParsedLogEntry class ApacheParser(LogParser): - """Parser for Apache and Nginx access/error logs.""" + '''Parser for Apache and Nginx access/error logs.''' format_name = "apache" @@ -26,7 +26,7 @@ class ApacheParser(LogParser): ) ERROR_PATTERN = re.compile( - r'^\[[A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P.*)$' + r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P.*)$" ) STATUS_CODES = { @@ -34,7 +34,7 @@ class ApacheParser(LogParser): "2xx": "success", "3xx": "redirection", "4xx": "client_error", - "5xx": "server_error" + "5xx": "server_error", } def __init__(self, custom_format: Optional[str] = None): @@ -42,20 +42,22 @@ class ApacheParser(LogParser): self._compile_custom_pattern(custom_format) def _compile_custom_pattern(self, format_str: Optional[str]) -> None: - """Compile custom log format pattern.""" + '''Compile custom log format pattern.''' if not format_str: self.custom_pattern = None return - pattern_str = format_str.replace("%h", r"(?P\S+)") \ - .replace("%l", r"\S+") \ - .replace("%u", r"\S+") \ - .replace("%t", r"\[(?P.*?)\]") \ - .replace("%r", r'"(?P\S+)\s+(?P.*?)\s+(?P\S+)"') \ - .replace("%s", r"(?P\d{3})") \ - .replace("%b", r"(?P\S+)") \ - .replace("%{Referer}i", r'"(?P.*?)"') \ - .replace("%{User-agent}i", r'"(?P.*?)"') + pattern_str = ( + format_str.replace("%h", r"(?P\S+)") + .replace("%l", r"\S+") + .replace("%u", r"\S+") + .replace("%t", r"\[(?P.*?)\]") + .replace("%r", r'"(?P\S+)\s+(?P.*?)\s+(?P\S+)"') + .replace("%s", r"(?P\d{3})") + .replace("%b", r"(?P\S+)") + .replace("%{Referer}i", r'"(?P.*?)"') + .replace("%{User-agent}i", r'"(?P.*?)"') + ) try: self.custom_pattern = re.compile("^" + pattern_str) @@ -63,7 +65,7 @@ class ApacheParser(LogParser): self.custom_pattern = None def can_parse(self, line: str) -> bool: - """Check if line matches Apache/Nginx format.""" + '''Check if line matches Apache/Nginx format.''' line = line.strip() if not line: return False @@ -82,15 +84,12 @@ class ApacheParser(LogParser): return False def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: - """Parse an Apache/Nginx log line.""" + '''Parse an Apache/Nginx log line.''' line = line.strip() if not line: return None - entry = ParsedLogEntry( - raw_line=line, - line_number=line_number - ) + entry = ParsedLogEntry(raw_line=line, line_number=line_number) parsed = None @@ -128,8 +127,8 @@ class ApacheParser(LogParser): return entry - def _extract_from_match(self, match: Match, line: str) -> Dict[str, Any]: - """Extract data from regex match.""" + def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]: + '''Extract data from regex match.''' result = {} groups = match.groupdict() @@ -143,7 +142,7 @@ class ApacheParser(LogParser): if "request" in groups: request = groups["request"] - request_match = re.match(r'(?P\S+)\s+(?P.*)', request) + request_match = re.match(r"(?P\S+)\s+(?P.*)", request) if request_match: result["method"] = request_match.group("method") result["path"] = request_match.group("path") @@ -174,8 +173,8 @@ class ApacheParser(LogParser): return result - def _extract_error_from_match(self, match: Match, line: str) -> Dict[str, Any]: - """Extract data from error log match.""" + def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]: + '''Extract data from error log match.''' groups = match.groupdict() result = { @@ -193,7 +192,7 @@ class ApacheParser(LogParser): return result def _parse_timestamp(self, ts: str) -> Optional[datetime]: - """Parse Apache/Nginx timestamp format.""" + '''Parse Apache/Nginx timestamp format.''' ts = ts.strip() formats = [ @@ -212,7 +211,7 @@ class ApacheParser(LogParser): return None def _infer_level_from_status(self, status: str) -> Optional[str]: - """Infer log level from HTTP status code.""" + '''Infer log level from HTTP status code.''' if not status: return None