Files
loglens-cli/loglens/parsers/apache_parser.py
7000pctAUTO debe653cfb
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
fix: add --version option to Click CLI group
- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
2026-02-02 09:25:11 +00:00

64 lines
2.3 KiB
Python

import re
from datetime import datetime
from typing import Optional
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
class ApacheParser(BaseParser):
"""Parser for Apache/Nginx log formats."""
def __init__(self):
self.combined_log_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-) "(?P<referer>.*?)" "(?P<user_agent>.*?)"'
)
self.common_log_pattern = re.compile(
r'^(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>.*?)\] "(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" (?P<status>\d{3}) (?P<size>\d+|-)'
)
def get_format(self) -> LogFormat:
return LogFormat.APACHE
def parse(self, line: str) -> Optional[ParsedEntry]:
match = self.combined_log_pattern.match(line)
if not match:
match = self.common_log_pattern.match(line)
if not match:
return None
try:
timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S %z")
except ValueError:
timestamp = datetime.strptime(match.group("timestamp"), "%d/%b/%Y:%H:%M:%S")
return ParsedEntry(
raw_line=line,
format=self.get_format(),
timestamp=timestamp.isoformat(),
level=self._extract_level(int(match.group("status"))),
message=f"{match.group('method')} {match.group('path')} {match.group('protocol')} - Status: {match.group('status')}",
metadata={
"ip": match.group("ip"),
"method": match.group("method"),
"path": match.group("path"),
"protocol": match.group("protocol"),
"status_code": int(match.group("status")),
"size": match.group("size"),
"referer": match.group("referer"),
"user_agent": match.group("user_agent"),
},
)
def _extract_level(self, status_code: int) -> str:
if status_code >= 500:
return "error"
elif status_code >= 400:
return "warning"
elif status_code >= 300:
return "info"
elif status_code >= 200:
return "info"
else:
return "debug"