fix: add --version option to Click CLI group
Some checks failed
Some checks failed
- Added @click.version_option decorator to main() in commands.py - Imported __version__ from loglens package - Resolves CI build failure: 'loglens --version' command not found
This commit is contained in:
@@ -1,107 +1,29 @@
|
||||
'''Parser factory for automatic format detection.'''
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from loglens.parsers.apache_parser import ApacheParser
|
||||
from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry
|
||||
from loglens.parsers.base import LogFormat
|
||||
from loglens.parsers.json_parser import JSONParser
|
||||
from loglens.parsers.syslog_parser import SyslogParser
|
||||
|
||||
|
||||
class ParserFactory:
|
||||
'''Factory for creating and selecting log parsers.'''
|
||||
"""Factory for creating parsers based on format."""
|
||||
|
||||
def __init__(self):
|
||||
self.parsers: dict[LogFormat, type[LogParser]] = {
|
||||
LogFormat.JSON: JSONParser,
|
||||
LogFormat.SYSLOG: SyslogParser,
|
||||
LogFormat.APACHE: ApacheParser,
|
||||
}
|
||||
self._parser_instances: dict[LogFormat, LogParser] = {}
|
||||
_parsers = {
|
||||
LogFormat.JSON: JSONParser(),
|
||||
LogFormat.SYSLOG: SyslogParser(),
|
||||
LogFormat.APACHE: ApacheParser(),
|
||||
}
|
||||
|
||||
def get_parser(self, format: LogFormat) -> LogParser:
|
||||
'''Get parser instance for specified format.'''
|
||||
if format not in self._parser_instances:
|
||||
self._parser_instances[format] = self.parsers[format]()
|
||||
return self._parser_instances[format]
|
||||
@classmethod
|
||||
def get_parser(cls, format_enum: LogFormat):
|
||||
"""Get a parser for the specified format."""
|
||||
return cls._parsers.get(format_enum)
|
||||
|
||||
def detect_format(self, line: str) -> LogFormat:
|
||||
'''Detect log format from a sample line.'''
|
||||
line = line.strip()
|
||||
if not line:
|
||||
return LogFormat.UNKNOWN
|
||||
|
||||
parsers = [
|
||||
(LogFormat.JSON, JSONParser()),
|
||||
(LogFormat.SYSLOG, SyslogParser()),
|
||||
(LogFormat.APACHE, ApacheParser()),
|
||||
]
|
||||
|
||||
for format_name, parser in parsers:
|
||||
if parser.can_parse(line):
|
||||
return format_name
|
||||
|
||||
return LogFormat.UNKNOWN
|
||||
|
||||
def detect_format_batch(self, lines: list[str], sample_size: int = 10) -> LogFormat:
|
||||
'''Detect format from multiple lines.'''
|
||||
sample = lines[:sample_size] if len(lines) > sample_size else lines
|
||||
|
||||
if not sample:
|
||||
return LogFormat.UNKNOWN
|
||||
|
||||
format_counts: dict[LogFormat, int] = {
|
||||
LogFormat.JSON: 0,
|
||||
LogFormat.SYSLOG: 0,
|
||||
LogFormat.APACHE: 0,
|
||||
LogFormat.UNKNOWN: 0,
|
||||
}
|
||||
|
||||
for line in sample:
|
||||
format_detected = self.detect_format(line)
|
||||
format_counts[format_detected] += 1
|
||||
|
||||
if (
|
||||
format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG]
|
||||
and format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE]
|
||||
):
|
||||
return LogFormat.JSON
|
||||
elif (
|
||||
format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON]
|
||||
and format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE]
|
||||
):
|
||||
return LogFormat.SYSLOG
|
||||
elif (
|
||||
format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON]
|
||||
and format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG]
|
||||
):
|
||||
return LogFormat.APACHE
|
||||
|
||||
if format_counts[LogFormat.JSON] > 0:
|
||||
return LogFormat.JSON
|
||||
if format_counts[LogFormat.SYSLOG] > 0:
|
||||
return LogFormat.SYSLOG
|
||||
if format_counts[LogFormat.APACHE] > 0:
|
||||
return LogFormat.APACHE
|
||||
|
||||
return LogFormat.UNKNOWN
|
||||
|
||||
def parse_lines(
|
||||
self, lines: list[str], format: Optional[LogFormat] = None
|
||||
) -> list[ParsedLogEntry]:
|
||||
'''Parse lines with automatic format detection.'''
|
||||
if format is None:
|
||||
format = self.detect_format_batch(lines)
|
||||
|
||||
if format == LogFormat.UNKNOWN:
|
||||
return [
|
||||
ParsedLogEntry(raw_line=line, message="Unknown format", line_number=i + 1)
|
||||
for i, line in enumerate(lines)
|
||||
]
|
||||
|
||||
parser = self.get_parser(format)
|
||||
return parser.parse_batch(lines)
|
||||
|
||||
def get_available_formats(self) -> list[LogFormat]:
|
||||
'''Get list of available log formats.'''
|
||||
return list(self.parsers.keys())
|
||||
@classmethod
|
||||
def detect_format(cls, sample_line: str) -> LogFormat:
|
||||
"""Detect the format of a log line."""
|
||||
for fmt, parser in cls._parsers.items():
|
||||
if parser.parse(sample_line):
|
||||
return fmt
|
||||
return LogFormat.RAW
|
||||
|
||||
Reference in New Issue
Block a user