fix: add --version option to Click CLI group
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
This commit is contained in:
2026-02-02 09:25:11 +00:00
parent d304a028dc
commit debe653cfb

View File

@@ -1,233 +1,63 @@
'''Apache/Nginx log parser.'''
import re import re
from datetime import datetime from datetime import datetime
from re import Match from typing import Optional
from typing import Any, Optional
from loglens.parsers.base import LogParser, ParsedLogEntry from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
class ApacheParser(LogParser): class ApacheParser(BaseParser):
'''Parser for Apache and Nginx access/error logs.''' """Parser for Apache/Nginx log formats."""
format_name = "apache" def __init__(self):
self.combined_log_pattern = re.compile(
APACHE_COMMON_PATTERN = re.compile( r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-) "(?P<referer>.*?)" "(?P<user_agent>.*?)"'
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)' )
) self.common_log_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-)'
APACHE_COMBINED_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+\S+\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<request>.*?)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"'
)
NGINX_PATTERN = re.compile(
r'^(?P<ip>\S+)\s+-\s+\S+\s+\[(?P<timestamp>.*?)\]\s+"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"\s+(?P<status>\d{3})\s+(?P<size>\S+)\s+"(?P<referer>.*?)"\s+"(?P<user_agent>.*?)"\s+"(?P<request_time>.*?)"'
)
ERROR_PATTERN = re.compile(
r"^\[([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}(?:\.\d+)?\s+\d{4})\]\s+\[([^\]:]+):([^\]]+)\]\s+(?:\[pid\s+(\d+)\]\s+)?(?P<message>.*)$"
)
STATUS_CODES = {
"1xx": "informational",
"2xx": "success",
"3xx": "redirection",
"4xx": "client_error",
"5xx": "server_error",
}
def __init__(self, custom_format: Optional[str] = None):
self.custom_format = custom_format
self._compile_custom_pattern(custom_format)
def _compile_custom_pattern(self, format_str: Optional[str]) -> None:
'''Compile custom log format pattern.'''
if not format_str:
self.custom_pattern = None
return
pattern_str = (
format_str.replace("%h", r"(?P<ip>\S+)")
.replace("%l", r"\S+")
.replace("%u", r"\S+")
.replace("%t", r"\[(?P<timestamp>.*?)\]")
.replace("%r", r'"(?P<method>\S+)\s+(?P<path>.*?)\s+(?P<protocol>\S+)"')
.replace("%s", r"(?P<status>\d{3})")
.replace("%b", r"(?P<size>\S+)")
.replace("%{Referer}i", r'"(?P<referer>.*?)"')
.replace("%{User-agent}i", r'"(?P<user_agent>.*?)"')
) )
try: def get_format(self) -> LogFormat:
self.custom_pattern = re.compile("^" + pattern_str) return LogFormat.APACHE
except re.error:
self.custom_pattern = None
def can_parse(self, line: str) -> bool: def parse(self, line: str) -> Optional[ParsedEntry]:
'''Check if line matches Apache/Nginx format.''' match = self.combined_log_pattern.match(line)
line = line.strip() if not match:
if not line: match = self.common_log_pattern.match(line)
return False
if self.APACHE_COMBINED_PATTERN.match(line): if not match:
return True
if self.APACHE_COMMON_PATTERN.match(line):
return True
if self.NGINX_PATTERN.match(line):
return True
if self.ERROR_PATTERN.match(line):
return True
if self.custom_pattern and self.custom_pattern.match(line):
return True
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
'''Parse an Apache/Nginx log line.'''
line = line.strip()
if not line:
return None
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
parsed = None
if self.custom_pattern:
match = self.custom_pattern.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.NGINX_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMBINED_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.APACHE_COMMON_PATTERN.match(line)
if match:
parsed = self._extract_from_match(match, line)
if not parsed:
match = self.ERROR_PATTERN.match(line)
if match:
parsed = self._extract_error_from_match(match, line)
if parsed:
entry.timestamp = parsed.get("timestamp")
entry.host = parsed.get("ip")
entry.level = parsed.get("level")
entry.message = parsed.get("message", "")
entry.extra = parsed.get("extra", {})
return entry
def _extract_from_match(self, match: Match, line: str) -> dict[str, Any]:
'''Extract data from regex match.'''
result = {}
groups = match.groupdict()
if "ip" in groups:
result["ip"] = groups["ip"]
if "timestamp" in groups:
ts = groups["timestamp"]
result["timestamp"] = self._parse_timestamp(ts)
if "request" in groups:
request = groups["request"]
request_match = re.match(r"(?P<method>\S+)\s+(?P<path>.*)", request)
if request_match:
result["method"] = request_match.group("method")
result["path"] = request_match.group("path")
result["message"] = f"{request_match.group('method')} {request_match.group('path')}"
else:
result["message"] = request
if "status" in groups:
status = groups["status"]
result["level"] = self._infer_level_from_status(status)
result["extra"] = {}
if "size" in groups and groups["size"] != "-":
result["extra"] = result.get("extra", {})
try:
result["extra"]["response_size"] = int(groups["size"])
except ValueError:
result["extra"]["response_size"] = groups["size"]
if "referer" in groups and groups["referer"] != "-":
result["extra"]["referer"] = groups["referer"]
if "user_agent" in groups and groups["user_agent"] != "-":
result["extra"]["user_agent"] = groups["user_agent"]
if "request_time" in groups:
result["extra"]["request_time"] = groups["request_time"]
return result
def _extract_error_from_match(self, match: Match, line: str) -> dict[str, Any]:
'''Extract data from error log match.'''
groups = match.groupdict()
result = {
"message": groups.get("message", ""),
}
if len(match.groups()) >= 3:
result["level"] = match.group(3).lower() if match.group(3) else "info"
if match.group(2):
result["extra"] = {"module": match.group(2)}
if groups.get("timestamp"):
result["timestamp"] = self._parse_timestamp(groups["timestamp"])
return result
def _parse_timestamp(self, ts: str) -> Optional[datetime]:
'''Parse Apache/Nginx timestamp format.'''
ts = ts.strip()
formats = [
"%d/%b/%Y:%H:%M:%S %z",
"%d/%b/%Y:%H:%M:%S",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
]
for fmt in formats:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
def _infer_level_from_status(self, status: str) -> Optional[str]:
'''Infer log level from HTTP status code.'''
if not status:
return None return None
try: try:
code = int(status) timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S %z")
if 100 <= code < 200:
return "info"
elif 200 <= code < 300:
return "info"
elif 300 <= code < 400:
return "info"
elif 400 <= code < 500:
return "warning"
elif 500 <= code < 600:
return "error"
except ValueError: except ValueError:
pass timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S")
return None return ParsedEntry(
raw_line=line,
format=self.get_format(),
timestamp=timestamp.isoformat(),
level=self._extract_level(int(match.group("status"))),
message=f"{match.group('method')} {match.group('path')} {match.group('protocol')} - Status: {match.group('status')}",
metadata={
"ip": match.group("ip"),
"method": match.group("method"),
"path": match.group("path"),
"protocol": match.group("protocol"),
"status_code": int(match.group("status")),
"size": match.group("size"),
"referer": match.group("referer"),
"user_agent": match.group("user_agent"),
},
)
def _extract_level(self, status_code: int) -> str:
if status_code >= 500:
return "error"
elif status_code >= 400:
return "warning"
elif status_code >= 300:
return "info"
elif status_code >= 200:
return "info"
else:
return "debug"