diff --git a/loglens/parsers/factory.py b/loglens/parsers/factory.py new file mode 100644 index 0000000..e8edf02 --- /dev/null +++ b/loglens/parsers/factory.py @@ -0,0 +1,100 @@ +"""Parser factory for automatic format detection.""" + +from typing import Dict, List, Optional, Type + +from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry +from loglens.parsers.json_parser import JSONParser +from loglens.parsers.syslog_parser import SyslogParser +from loglens.parsers.apache_parser import ApacheParser + + +class ParserFactory: + """Factory for creating and selecting log parsers.""" + + def __init__(self): + self.parsers: Dict[LogFormat, Type[LogParser]] = { + LogFormat.JSON: JSONParser, + LogFormat.SYSLOG: SyslogParser, + LogFormat.APACHE: ApacheParser, + } + self._parser_instances: Dict[LogFormat, LogParser] = {} + + def get_parser(self, format: LogFormat) -> LogParser: + """Get parser instance for specified format.""" + if format not in self._parser_instances: + self._parser_instances[format] = self.parsers[format]() + return self._parser_instances[format] + + def detect_format(self, line: str) -> LogFormat: + """Detect log format from a sample line.""" + line = line.strip() + if not line: + return LogFormat.UNKNOWN + + parsers = [ + (LogFormat.JSON, JSONParser()), + (LogFormat.SYSLOG, SyslogParser()), + (LogFormat.APACHE, ApacheParser()), + ] + + for format_name, parser in parsers: + if parser.can_parse(line): + return format_name + + return LogFormat.UNKNOWN + + def detect_format_batch(self, lines: List[str], sample_size: int = 10) -> LogFormat: + """Detect format from multiple lines.""" + sample = lines[:sample_size] if len(lines) > sample_size else lines + + if not sample: + return LogFormat.UNKNOWN + + format_counts: Dict[LogFormat, int] = { + LogFormat.JSON: 0, + LogFormat.SYSLOG: 0, + LogFormat.APACHE: 0, + LogFormat.UNKNOWN: 0 + } + + for line in sample: + format_detected = self.detect_format(line) + format_counts[format_detected] += 1 + + if format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG] and \ + format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE]: + return LogFormat.JSON + elif format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON] and \ + format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE]: + return LogFormat.SYSLOG + elif format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON] and \ + format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG]: + return LogFormat.APACHE + + if format_counts[LogFormat.JSON] > 0: + return LogFormat.JSON + if format_counts[LogFormat.SYSLOG] > 0: + return LogFormat.SYSLOG + if format_counts[LogFormat.APACHE] > 0: + return LogFormat.APACHE + + return LogFormat.UNKNOWN + + def parse_lines(self, lines: List[str], format: Optional[LogFormat] = None) -> List[ParsedLogEntry]: + """Parse lines with automatic format detection.""" + if format is None: + format = self.detect_format_batch(lines) + + if format == LogFormat.UNKNOWN: + return [ParsedLogEntry( + raw_line=line, + message="Unknown format", + line_number=i + 1 + ) for i, line in enumerate(lines)] + + parser = self.get_parser(format) + return parser.parse_batch(lines) + + def get_available_formats(self) -> List[LogFormat]: + """Get list of available log formats.""" + return list(self.parsers.keys())