fix: resolve CI/CD linting and formatting issues
Some checks failed
Some checks failed
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035) - Removed unused imports across all modules - Fixed unused variables by replacing with _ prefix - Added missing Optional type imports - Reorganized imports for proper sorting (I001) - Applied black formatting to all source files
This commit is contained in:
@@ -1,32 +1,32 @@
|
|||||||
"""Parser factory for automatic format detection."""
|
'''Parser factory for automatic format detection.'''
|
||||||
|
|
||||||
from typing import Dict, List, Optional, Type
|
from typing import Optional
|
||||||
|
|
||||||
|
from loglens.parsers.apache_parser import ApacheParser
|
||||||
from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry
|
from loglens.parsers.base import LogFormat, LogParser, ParsedLogEntry
|
||||||
from loglens.parsers.json_parser import JSONParser
|
from loglens.parsers.json_parser import JSONParser
|
||||||
from loglens.parsers.syslog_parser import SyslogParser
|
from loglens.parsers.syslog_parser import SyslogParser
|
||||||
from loglens.parsers.apache_parser import ApacheParser
|
|
||||||
|
|
||||||
|
|
||||||
class ParserFactory:
|
class ParserFactory:
|
||||||
"""Factory for creating and selecting log parsers."""
|
'''Factory for creating and selecting log parsers.'''
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.parsers: Dict[LogFormat, Type[LogParser]] = {
|
self.parsers: dict[LogFormat, type[LogParser]] = {
|
||||||
LogFormat.JSON: JSONParser,
|
LogFormat.JSON: JSONParser,
|
||||||
LogFormat.SYSLOG: SyslogParser,
|
LogFormat.SYSLOG: SyslogParser,
|
||||||
LogFormat.APACHE: ApacheParser,
|
LogFormat.APACHE: ApacheParser,
|
||||||
}
|
}
|
||||||
self._parser_instances: Dict[LogFormat, LogParser] = {}
|
self._parser_instances: dict[LogFormat, LogParser] = {}
|
||||||
|
|
||||||
def get_parser(self, format: LogFormat) -> LogParser:
|
def get_parser(self, format: LogFormat) -> LogParser:
|
||||||
"""Get parser instance for specified format."""
|
'''Get parser instance for specified format.'''
|
||||||
if format not in self._parser_instances:
|
if format not in self._parser_instances:
|
||||||
self._parser_instances[format] = self.parsers[format]()
|
self._parser_instances[format] = self.parsers[format]()
|
||||||
return self._parser_instances[format]
|
return self._parser_instances[format]
|
||||||
|
|
||||||
def detect_format(self, line: str) -> LogFormat:
|
def detect_format(self, line: str) -> LogFormat:
|
||||||
"""Detect log format from a sample line."""
|
'''Detect log format from a sample line.'''
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if not line:
|
if not line:
|
||||||
return LogFormat.UNKNOWN
|
return LogFormat.UNKNOWN
|
||||||
@@ -43,32 +43,38 @@ class ParserFactory:
|
|||||||
|
|
||||||
return LogFormat.UNKNOWN
|
return LogFormat.UNKNOWN
|
||||||
|
|
||||||
def detect_format_batch(self, lines: List[str], sample_size: int = 10) -> LogFormat:
|
def detect_format_batch(self, lines: list[str], sample_size: int = 10) -> LogFormat:
|
||||||
"""Detect format from multiple lines."""
|
'''Detect format from multiple lines.'''
|
||||||
sample = lines[:sample_size] if len(lines) > sample_size else lines
|
sample = lines[:sample_size] if len(lines) > sample_size else lines
|
||||||
|
|
||||||
if not sample:
|
if not sample:
|
||||||
return LogFormat.UNKNOWN
|
return LogFormat.UNKNOWN
|
||||||
|
|
||||||
format_counts: Dict[LogFormat, int] = {
|
format_counts: dict[LogFormat, int] = {
|
||||||
LogFormat.JSON: 0,
|
LogFormat.JSON: 0,
|
||||||
LogFormat.SYSLOG: 0,
|
LogFormat.SYSLOG: 0,
|
||||||
LogFormat.APACHE: 0,
|
LogFormat.APACHE: 0,
|
||||||
LogFormat.UNKNOWN: 0
|
LogFormat.UNKNOWN: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
for line in sample:
|
for line in sample:
|
||||||
format_detected = self.detect_format(line)
|
format_detected = self.detect_format(line)
|
||||||
format_counts[format_detected] += 1
|
format_counts[format_detected] += 1
|
||||||
|
|
||||||
if format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG] and \
|
if (
|
||||||
format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE]:
|
format_counts[LogFormat.JSON] > format_counts[LogFormat.SYSLOG]
|
||||||
|
and format_counts[LogFormat.JSON] > format_counts[LogFormat.APACHE]
|
||||||
|
):
|
||||||
return LogFormat.JSON
|
return LogFormat.JSON
|
||||||
elif format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON] and \
|
elif (
|
||||||
format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE]:
|
format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.JSON]
|
||||||
|
and format_counts[LogFormat.SYSLOG] > format_counts[LogFormat.APACHE]
|
||||||
|
):
|
||||||
return LogFormat.SYSLOG
|
return LogFormat.SYSLOG
|
||||||
elif format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON] and \
|
elif (
|
||||||
format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG]:
|
format_counts[LogFormat.APACHE] > format_counts[LogFormat.JSON]
|
||||||
|
and format_counts[LogFormat.APACHE] > format_counts[LogFormat.SYSLOG]
|
||||||
|
):
|
||||||
return LogFormat.APACHE
|
return LogFormat.APACHE
|
||||||
|
|
||||||
if format_counts[LogFormat.JSON] > 0:
|
if format_counts[LogFormat.JSON] > 0:
|
||||||
@@ -80,21 +86,22 @@ class ParserFactory:
|
|||||||
|
|
||||||
return LogFormat.UNKNOWN
|
return LogFormat.UNKNOWN
|
||||||
|
|
||||||
def parse_lines(self, lines: List[str], format: Optional[LogFormat] = None) -> List[ParsedLogEntry]:
|
def parse_lines(
|
||||||
"""Parse lines with automatic format detection."""
|
self, lines: list[str], format: Optional[LogFormat] = None
|
||||||
|
) -> list[ParsedLogEntry]:
|
||||||
|
'''Parse lines with automatic format detection.'''
|
||||||
if format is None:
|
if format is None:
|
||||||
format = self.detect_format_batch(lines)
|
format = self.detect_format_batch(lines)
|
||||||
|
|
||||||
if format == LogFormat.UNKNOWN:
|
if format == LogFormat.UNKNOWN:
|
||||||
return [ParsedLogEntry(
|
return [
|
||||||
raw_line=line,
|
ParsedLogEntry(raw_line=line, message="Unknown format", line_number=i + 1)
|
||||||
message="Unknown format",
|
for i, line in enumerate(lines)
|
||||||
line_number=i + 1
|
]
|
||||||
) for i, line in enumerate(lines)]
|
|
||||||
|
|
||||||
parser = self.get_parser(format)
|
parser = self.get_parser(format)
|
||||||
return parser.parse_batch(lines)
|
return parser.parse_batch(lines)
|
||||||
|
|
||||||
def get_available_formats(self) -> List[LogFormat]:
|
def get_available_formats(self) -> list[LogFormat]:
|
||||||
"""Get list of available log formats."""
|
'''Get list of available log formats.'''
|
||||||
return list(self.parsers.keys())
|
return list(self.parsers.keys())
|
||||||
|
|||||||
Reference in New Issue
Block a user