"""Parser factory for automatic format detection.""" from typing import Optional from loglens.parsers.apache_parser import ApacheParser from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry from loglens.parsers.json_parser import JSONParser from loglens.parsers.syslog_parser import SyslogParser class ParserFactory: """Factory for creating and selecting log parsers.""" def __init__(self): self.parsers: dict[LogFormat, type[LogParser]] = { LogFormat.JSON: JSONParser, LogFormat.SYSLOG: SyslogParser, LogFormat.APACHE: ApacheParser, } self._parser_instances: dict[LogFormat, LogParser] = {} def get_parser(self, format: LogFormat) -> LogParser: """Get parser instance for specified format.""" if format not in self._parser_instances: self._parser_instances[format] = self.parsers[format]() return self._parser_instances[format] def detect_format(self, line: str) -> LogFormat: """Detect log format from a sample line.""" line = line.strip() if not line: return LogFormat.UNKNOWN parsers = [ (LogFormat.JSON, JSONParser()), (LogFormat.SYSLOG, SyslogParser()), (LogFormat.APACHE, ApacheParser()), ] for format_name, parser in parsers: if parser.can_parse(line): return format_name return LogFormat.UNKNOWN def detect_format_batch(self, lines: list[str], sample_size: int = 10) -> LogFormat: """Detect format from multiple lines.""" sample = lines[:sample_size] if len(lines) > sample_size else lines if not sample: return LogFormat.UNKNOWN format_counts: dict[LogFormat, int] = { LogFormat.JSON: 0, LogFormat.SYSLOG: 0, LogFormat.APACHE: 0, LogFormat.UNKNOWN: 0, } for line in sample: format_detected = self.detect_format(line) format_counts[format_detected] += 1 if ( format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG] and format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE] ): return LogFormat.JSON elif ( format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON] and format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE] ): return LogFormat.SYSLOG elif ( format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON] and format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG] ): return LogFormat.APACHE if format_counts[LogFormat.JSON] > 0: return LogFormat.JSON if format_counts[LogFormat.SYSLOG] > 0: return LogFormat.SYSLOG if format_counts[LogFormat.APACHE] > 0: return LogFormat.APACHE return LogFormat.UNKNOWN def parse_lines( self, lines: list[str], format: Optional[LogFormat] = None ) -> list[ParsedLogEntry]: """Parse lines with automatic format detection.""" if format is None: format = self.detect_format_batch(lines) if format == LogFormat.UNKNOWN: return [ ParsedLogEntry(raw_line=line, message="Unknown format", line_number=i + 1) for i, line in enumerate(lines) ] parser = self.get_parser(format) return parser.parse_batch(lines) def get_available_formats(self) -> list[LogFormat]: """Get list of available log formats.""" return list(self.parsers.keys())