From d2c658d2a352ba7c3d4406a507f2030c0b4af71c Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 10:07:00 +0000 Subject: [PATCH] Add JSON, syslog, and Apache parsers --- loglens/parsers/factory.py | 114 +++++++++++++++++++++++++++++++------ 1 file changed, 96 insertions(+), 18 deletions(-) diff --git a/loglens/parsers/factory.py b/loglens/parsers/factory.py index 62b9628..b1d33f3 100644 --- a/loglens/parsers/factory.py +++ b/loglens/parsers/factory.py @@ -1,29 +1,107 @@ +"""Parser factory for automatic format detection.""" + from typing import Optional from loglens.parsers.apache_parser import ApacheParser -from loglens.parsers.base import LogFormat +from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry from loglens.parsers.json_parser import JSONParser from loglens.parsers.syslog_parser import SyslogParser class ParserFactory: - """Factory for creating parsers based on format.""" + """Factory for creating and selecting log parsers.""" - _parsers = { - LogFormat.JSON: JSONParser(), - LogFormat.SYSLOG: SyslogParser(), - LogFormat.APACHE: ApacheParser(), - } + def __init__(self): + self.parsers: dict[LogFormat, type[LogParser]] = { + LogFormat.JSON: JSONParser, + LogFormat.SYSLOG: SyslogParser, + LogFormat.APACHE: ApacheParser, + } + self._parser_instances: dict[LogFormat, LogParser] = {} - @classmethod - def get_parser(cls, format_enum: LogFormat): - """Get a parser for the specified format.""" - return cls._parsers.get(format_enum) + def get_parser(self, format: LogFormat) -> LogParser: + """Get parser instance for specified format.""" + if format not in self._parser_instances: + self._parser_instances[format] = self.parsers[format]() + return self._parser_instances[format] - @classmethod - def detect_format(cls, sample_line: str) -> LogFormat: - """Detect the format of a log line.""" - for fmt, parser in cls._parsers.items(): - if parser.parse(sample_line): - return fmt - return LogFormat.RAW + def detect_format(self, line: str) -> LogFormat: + """Detect log format from a sample line.""" + line = line.strip() + if not line: + return LogFormat.UNKNOWN + + parsers = [ + (LogFormat.JSON, JSONParser()), + (LogFormat.SYSLOG, SyslogParser()), + (LogFormat.APACHE, ApacheParser()), + ] + + for format_name, parser in parsers: + if parser.can_parse(line): + return format_name + + return LogFormat.UNKNOWN + + def detect_format_batch(self, lines: list[str], sample_size: int = 10) -> LogFormat: + """Detect format from multiple lines.""" + sample = lines[:sample_size] if len(lines) > sample_size else lines + + if not sample: + return LogFormat.UNKNOWN + + format_counts: dict[LogFormat, int] = { + LogFormat.JSON: 0, + LogFormat.SYSLOG: 0, + LogFormat.APACHE: 0, + LogFormat.UNKNOWN: 0, + } + + for line in sample: + format_detected = self.detect_format(line) + format_counts[format_detected] += 1 + + if ( + format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG] + and format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE] + ): + return LogFormat.JSON + elif ( + format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON] + and format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE] + ): + return LogFormat.SYSLOG + elif ( + format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON] + and format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG] + ): + return LogFormat.APACHE + + if format_counts[LogFormat.JSON] > 0: + return LogFormat.JSON + if format_counts[LogFormat.SYSLOG] > 0: + return LogFormat.SYSLOG + if format_counts[LogFormat.APACHE] > 0: + return LogFormat.APACHE + + return LogFormat.UNKNOWN + + def parse_lines( + self, lines: list[str], format: Optional[LogFormat] = None + ) -> list[ParsedLogEntry]: + """Parse lines with automatic format detection.""" + if format is None: + format = self.detect_format_batch(lines) + + if format == LogFormat.UNKNOWN: + return [ + ParsedLogEntry(raw_line=line, message="Unknown format", line_number=i + 1) + for i, line in enumerate(lines) + ] + + parser = self.get_parser(format) + return parser.parse_batch(lines) + + def get_available_formats(self) -> list[LogFormat]: + """Get list of available log formats.""" + return list(self.parsers.keys())