Files
loglens-cli/loglens/parsers/factory.py
7000pctAUTO 2cc1eacf09
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled
Add parsers: JSON, Syslog, Apache, and factory
2026-02-02 08:03:24 +00:00

101 lines
3.6 KiB
Python

"""Parser factory for automatic format detection."""
from typing import Dict, List, Optional, Type
from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry
from loglens.parsers.json_parser import JSONParser
from loglens.parsers.syslog_parser import SyslogParser
from loglens.parsers.apache_parser import ApacheParser
class ParserFactory:
"""Factory for creating and selecting log parsers."""
def __init__(self):
self.parsers: Dict[LogFormat, Type[LogParser]] = {
LogFormat.JSON: JSONParser,
LogFormat.SYSLOG: SyslogParser,
LogFormat.APACHE: ApacheParser,
}
self._parser_instances: Dict[LogFormat, LogParser] = {}
def get_parser(self, format: LogFormat) -> LogParser:
"""Get parser instance for specified format."""
if format not in self._parser_instances:
self._parser_instances[format] = self.parsers[format]()
return self._parser_instances[format]
def detect_format(self, line: str) -> LogFormat:
"""Detect log format from a sample line."""
line = line.strip()
if not line:
return LogFormat.UNKNOWN
parsers = [
(LogFormat.JSON, JSONParser()),
(LogFormat.SYSLOG, SyslogParser()),
(LogFormat.APACHE, ApacheParser()),
]
for format_name, parser in parsers:
if parser.can_parse(line):
return format_name
return LogFormat.UNKNOWN
def detect_format_batch(self, lines: List[str], sample_size: int = 10) -> LogFormat:
"""Detect format from multiple lines."""
sample = lines[:sample_size] if len(lines) > sample_size else lines
if not sample:
return LogFormat.UNKNOWN
format_counts: Dict[LogFormat, int] = {
LogFormat.JSON: 0,
LogFormat.SYSLOG: 0,
LogFormat.APACHE: 0,
LogFormat.UNKNOWN: 0
}
for line in sample:
format_detected = self.detect_format(line)
format_counts[format_detected] += 1
if format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG] and \
format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE]:
return LogFormat.JSON
elif format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON] and \
format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE]:
return LogFormat.SYSLOG
elif format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON] and \
format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG]:
return LogFormat.APACHE
if format_counts[LogFormat.JSON] > 0:
return LogFormat.JSON
if format_counts[LogFormat.SYSLOG] > 0:
return LogFormat.SYSLOG
if format_counts[LogFormat.APACHE] > 0:
return LogFormat.APACHE
return LogFormat.UNKNOWN
def parse_lines(self, lines: List[str], format: Optional[LogFormat] = None) -> List[ParsedLogEntry]:
"""Parse lines with automatic format detection."""
if format is None:
format = self.detect_format_batch(lines)
if format == LogFormat.UNKNOWN:
return [ParsedLogEntry(
raw_line=line,
message="Unknown format",
line_number=i + 1
) for i, line in enumerate(lines)]
parser = self.get_parser(format)
return parser.parse_batch(lines)
def get_available_formats(self) -> List[LogFormat]:
"""Get list of available log formats."""
return list(self.parsers.keys())