"""Parser factory for automatic format detection.""" from typing import Dict, List, Optional, Type from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry from loglens.parsers.json_parser import JSONParser from loglens.parsers.syslog_parser import SyslogParser from loglens.parsers.apache_parser import ApacheParser class ParserFactory: """Factory for creating and selecting log parsers.""" def __init__(self): self.parsers: Dict[LogFormat, Type[LogParser]] = { LogFormat.JSON: JSONParser, LogFormat.SYSLOG: SyslogParser, LogFormat.APACHE: ApacheParser, } self._parser_instances: Dict[LogFormat, LogParser] = {} def get_parser(self, format: LogFormat) -> LogParser: """Get parser instance for specified format.""" if format not in self._parser_instances: self._parser_instances[format] = self.parsers[format]() return self._parser_instances[format] def detect_format(self, line: str) -> LogFormat: """Detect log format from a sample line.""" line = line.strip() if not line: return LogFormat.UNKNOWN parsers = [ (LogFormat.JSON, JSONParser()), (LogFormat.SYSLOG, SyslogParser()), (LogFormat.APACHE, ApacheParser()), ] for format_name, parser in parsers: if parser.can_parse(line): return format_name return LogFormat.UNKNOWN def detect_format_batch(self, lines: List[str], sample_size: int = 10) -> LogFormat: """Detect format from multiple lines.""" sample = lines[:sample_size] if len(lines) > sample_size else lines if not sample: return LogFormat.UNKNOWN format_counts: Dict[LogFormat, int] = { LogFormat.JSON: 0, LogFormat.SYSLOG: 0, LogFormat.APACHE: 0, LogFormat.UNKNOWN: 0 } for line in sample: format_detected = self.detect_format(line) format_counts[format_detected] += 1 if format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG] and \ format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE]: return LogFormat.JSON elif format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON] and \ format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE]: return LogFormat.SYSLOG elif format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON] and \ format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG]: return LogFormat.APACHE if format_counts[LogFormat.JSON] > 0: return LogFormat.JSON if format_counts[LogFormat.SYSLOG] > 0: return LogFormat.SYSLOG if format_counts[LogFormat.APACHE] > 0: return LogFormat.APACHE return LogFormat.UNKNOWN def parse_lines(self, lines: List[str], format: Optional[LogFormat] = None) -> List[ParsedLogEntry]: """Parse lines with automatic format detection.""" if format is None: format = self.detect_format_batch(lines) if format == LogFormat.UNKNOWN: return [ParsedLogEntry( raw_line=line, message="Unknown format", line_number=i + 1 ) for i, line in enumerate(lines)] parser = self.get_parser(format) return parser.parse_batch(lines) def get_available_formats(self) -> List[LogFormat]: """Get list of available log formats.""" return list(self.parsers.keys())