Files
loglens-cli/loglens/parsers/base.py
7000pctAUTO 8c48e85c56
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
Add parsers: JSON, Syslog, Apache, and factory
2026-02-02 08:03:21 +00:00

92 lines
2.6 KiB
Python

"""Base parser class and data structures."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
from enum import Enum
class LogFormat(Enum):
"""Supported log formats."""
JSON = "json"
SYSLOG = "syslog"
APACHE = "apache"
UNKNOWN = "unknown"
@dataclass
class ParsedLogEntry:
"""Represents a parsed log entry."""
raw_line: str
timestamp: Optional[datetime] = None
level: Optional[str] = None
message: str = ""
source: Optional[str] = None
host: Optional[str] = None
facility: Optional[str] = None
severity: Optional[str] = None
logger: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)
line_number: int = 0
error_pattern: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result = {
"raw_line": self.raw_line,
"message": self.message,
"line_number": self.line_number,
}
if self.timestamp:
result["timestamp"] = self.timestamp.isoformat()
if self.level:
result["level"] = self.level
if self.source:
result["source"] = self.source
if self.host:
result["host"] = self.host
if self.facility:
result["facility"] = self.facility
if self.severity:
result["severity"] = self.severity
if self.logger:
result["logger"] = self.logger
if self.extra:
result["extra"] = self.extra
if self.error_pattern:
result["error_pattern"] = self.error_pattern
return result
class LogParser(ABC):
"""Abstract base class for log parsers."""
format_name: str = "base"
@abstractmethod
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse a single log line."""
pass
@abstractmethod
def can_parse(self, line: str) -> bool:
"""Check if this parser can handle the given line."""
pass
def parse_batch(self, lines: List[str]) -> List[ParsedLogEntry]:
"""Parse multiple lines."""
results = []
for i, line in enumerate(lines, 1):
try:
entry = self.parse(line, i)
if entry:
results.append(entry)
except Exception:
results.append(ParsedLogEntry(
raw_line=line,
message="Parse error",
line_number=i
))
return results