fix: resolve CI/CD linting and formatting issues
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

- Replaced deprecated typing.Dict/List/Tuple with native types (UP035)
- Removed unused imports across all modules
- Fixed unused variables by replacing with _ prefix
- Added missing Optional type imports
- Reorganized imports for proper sorting (I001)
- Applied black formatting to all source files
This commit is contained in:
2026-02-02 08:52:06 +00:00
parent 1d6d354f80
commit fb4ce2c22c

View File

@@ -1,15 +1,15 @@
"""Apache/Nginx log parser.""" '''Apache/Nginx log parser.'''
import re import re
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Match, Optional from re import Match
from urllib.parse import parse_qs, unquote from typing import Any, Optional
from loglens.parsers.base import LogParser, ParsedLogEntry from loglens.parsers.base import LogParser, ParsedLogEntry
class ApacheParser(LogParser): class ApacheParser(LogParser):
"""Parser for Apache and Nginx access/error logs.""" '''Parser for Apache and Nginx access/error logs.'''
format_name = "apache" format_name = "apache"
@@ -26,7 +26,7 @@ class ApacheParser(LogParser):
) )
ERROR_PATTERN = re.compile( ERROR_PATTERN = re.compile(
r'^\[[A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$' r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$"
) )
STATUS_CODES = { STATUS_CODES = {
@@ -34,7 +34,7 @@ class ApacheParser(LogParser):
"2xx": "success", "2xx": "success",
"3xx": "redirection", "3xx": "redirection",
"4xx": "client_error", "4xx": "client_error",
"5xx": "server_error" "5xx": "server_error",
} }
def __init__(self, custom_format: Optional[str] = None): def __init__(self, custom_format: Optional[str] = None):
@@ -42,20 +42,22 @@ class ApacheParser(LogParser):
self._compile_custom_pattern(custom_format) self._compile_custom_pattern(custom_format)
def _compile_custom_pattern(self, format_str: Optional[str]) -> None: def _compile_custom_pattern(self, format_str: Optional[str]) -> None:
"""Compile custom log format pattern.""" '''Compile custom log format pattern.'''
if not format_str: if not format_str:
self.custom_pattern = None self.custom_pattern = None
return return
pattern_str = format_str.replace("%h", r"(?P<ip>\S+)") \ pattern_str = (
.replace("%l", r"\S+") \ format_str.replace("%h", r"(?P<ip>\S+)")
.replace("%u", r"\S+") \ .replace("%l", r"\S+")
.replace("%t", r"\[(?P<timestamp>.*?)\]") \ .replace("%u", r"\S+")
.replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"') \ .replace("%t", r"\[(?P<timestamp>.*?)\]")
.replace("%s", r"(?P<status>\d{3})") \ .replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"')
.replace("%b", r"(?P<size>\S+)") \ .replace("%s", r"(?P<status>\d{3})")
.replace("%{Referer}i", r'"(?P<referer>.*?)"') \ .replace("%b", r"(?P<size>\S+)")
.replace("%{Referer}i", r'"(?P<referer>.*?)"')
.replace("%{User-agent}i", r'"(?P<user_agent>.*?)"') .replace("%{User-agent}i", r'"(?P<user_agent>.*?)"')
)
try: try:
self.custom_pattern = re.compile("^" + pattern_str) self.custom_pattern = re.compile("^" + pattern_str)
@@ -63,7 +65,7 @@ class ApacheParser(LogParser):
self.custom_pattern = None self.custom_pattern = None
def can_parse(self, line: str) -> bool: def can_parse(self, line: str) -> bool:
"""Check if line matches Apache/Nginx format.""" '''Check if line matches Apache/Nginx format.'''
line = line.strip() line = line.strip()
if not line: if not line:
return False return False
@@ -82,15 +84,12 @@ class ApacheParser(LogParser):
return False return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse an Apache/Nginx log line.""" '''Parse an Apache/Nginx log line.'''
line = line.strip() line = line.strip()
if not line: if not line:
return None return None
entry = ParsedLogEntry( entry = ParsedLogEntry(raw_line=line, line_number=line_number)
raw_line=line,
line_number=line_number
)
parsed = None parsed = None
@@ -128,8 +127,8 @@ class ApacheParser(LogParser):
return entry return entry
def _extract_from_match(self, match: Match, line: str) -> Dict[str, Any]: def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]:
"""Extract data from regex match.""" '''Extract data from regex match.'''
result = {} result = {}
groups = match.groupdict() groups = match.groupdict()
@@ -143,7 +142,7 @@ class ApacheParser(LogParser):
if "request" in groups: if "request" in groups:
request = groups["request"] request = groups["request"]
request_match = re.match(r'(?P<method>\S+)\s+(?P<path>.*)', request) request_match = re.match(r"(?P<method>\S+)\s+(?P<path>.*)", request)
if request_match: if request_match:
result["method"] = request_match.group("method") result["method"] = request_match.group("method")
result["path"] = request_match.group("path") result["path"] = request_match.group("path")
@@ -174,8 +173,8 @@ class ApacheParser(LogParser):
return result return result
def _extract_error_from_match(self, match: Match, line: str) -> Dict[str, Any]: def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]:
"""Extract data from error log match.""" '''Extract data from error log match.'''
groups = match.groupdict() groups = match.groupdict()
result = { result = {
@@ -193,7 +192,7 @@ class ApacheParser(LogParser):
return result return result
def _parse_timestamp(self, ts: str) -> Optional[datetime]: def _parse_timestamp(self, ts: str) -> Optional[datetime]:
"""Parse Apache/Nginx timestamp format.""" '''Parse Apache/Nginx timestamp format.'''
ts = ts.strip() ts = ts.strip()
formats = [ formats = [
@@ -212,7 +211,7 @@ class ApacheParser(LogParser):
return None return None
def _infer_level_from_status(self, status: str) -> Optional[str]: def _infer_level_from_status(self, status: str) -> Optional[str]:
"""Infer log level from HTTP status code.""" '''Infer log level from HTTP status code.'''
if not status: if not status:
return None return None