Files
loglens-cli/loglens/parsers/base.py
7000pctAUTO cdde9f629d
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
fix: resolve CI/CD linting and formatting issues
- Replaced deprecated typing.Dict/List/Tuple with native types (UP035)
- Removed unused imports across all modules
- Fixed unused variables by replacing with _ prefix
- Added missing Optional type imports
- Reorganized imports for proper sorting (I001)
- Applied black formatting to all source files
2026-02-02 08:52:02 +00:00

90 lines
2.5 KiB
Python

'''Base parser class and data structures.'''
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Optional
class LogFormat(Enum):
'''Supported log formats.'''
JSON = "json"
SYSLOG = "syslog"
APACHE = "apache"
UNKNOWN = "unknown"
@dataclass
class ParsedLogEntry:
'''Represents a parsed log entry.'''
raw_line: str
timestamp: Optional[datetime] = None
level: Optional[str] = None
message: str = ""
source: Optional[str] = None
host: Optional[str] = None
facility: Optional[str] = None
severity: Optional[str] = None
logger: Optional[str] = None
extra: dict[str, Any] = field(default_factory=dict)
line_number: int = 0
error_pattern: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
'''Convert to dictionary.'''
result = {
"raw_line": self.raw_line,
"message": self.message,
"line_number": self.line_number,
}
if self.timestamp:
result["timestamp"] = self.timestamp.isoformat()
if self.level:
result["level"] = self.level
if self.source:
result["source"] = self.source
if self.host:
result["host"] = self.host
if self.facility:
result["facility"] = self.facility
if self.severity:
result["severity"] = self.severity
if self.logger:
result["logger"] = self.logger
if self.extra:
result["extra"] = self.extra
if self.error_pattern:
result["error_pattern"] = self.error_pattern
return result
class LogParser(ABC):
'''Abstract base class for log parsers.'''
format_name: str = "base"
@abstractmethod
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
'''Parse a single log line.'''
pass
@abstractmethod
def can_parse(self, line: str) -> bool:
'''Check if this parser can handle the given line.'''
pass
def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]:
'''Parse multiple lines.'''
results = []
for i, line in enumerate(lines, 1):
try:
entry = self.parse(line, i)
if entry:
results.append(entry)
except Exception:
results.append(ParsedLogEntry(raw_line=line, message="Parse error", line_number=i))
return results