Some checks failed
- Added @click.version_option decorator to main() in commands.py - Imported __version__ from loglens package - Resolves CI build failure: 'loglens --version' command not found
64 lines
2.3 KiB
Python
64 lines
2.3 KiB
Python
import re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
|
|
|
|
|
|
class ApacheParser(BaseParser):
|
|
"""Parser for Apache/Nginx log formats."""
|
|
|
|
def __init__(self):
|
|
self.combined_log_pattern = re.compile(
|
|
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-) "(?P<referer>.*?)" "(?P<user_agent>.*?)"'
|
|
)
|
|
self.common_log_pattern = re.compile(
|
|
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-)'
|
|
)
|
|
|
|
def get_format(self) -> LogFormat:
|
|
return LogFormat.APACHE
|
|
|
|
def parse(self, line: str) -> Optional[ParsedEntry]:
|
|
match = self.combined_log_pattern.match(line)
|
|
if not match:
|
|
match = self.common_log_pattern.match(line)
|
|
|
|
if not match:
|
|
return None
|
|
|
|
try:
|
|
timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S %z")
|
|
except ValueError:
|
|
timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S")
|
|
|
|
return ParsedEntry(
|
|
raw_line=line,
|
|
format=self.get_format(),
|
|
timestamp=timestamp.isoformat(),
|
|
level=self._extract_level(int(match.group("status"))),
|
|
message=f"{match.group('method')} {match.group('path')} {match.group('protocol')} - Status: {match.group('status')}",
|
|
metadata={
|
|
"ip": match.group("ip"),
|
|
"method": match.group("method"),
|
|
"path": match.group("path"),
|
|
"protocol": match.group("protocol"),
|
|
"status_code": int(match.group("status")),
|
|
"size": match.group("size"),
|
|
"referer": match.group("referer"),
|
|
"user_agent": match.group("user_agent"),
|
|
},
|
|
)
|
|
|
|
def _extract_level(self, status_code: int) -> str:
|
|
if status_code >= 500:
|
|
return "error"
|
|
elif status_code >= 400:
|
|
return "warning"
|
|
elif status_code >= 300:
|
|
return "info"
|
|
elif status_code >= 200:
|
|
return "info"
|
|
else:
|
|
return "debug"
|