Add parsers: JSON, Syslog, Apache, and factory
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-02 08:03:21 +00:00
parent c1bb5da1c8
commit 8c48e85c56

91
loglens/parsers/base.py Normal file
View File

@@ -0,0 +1,91 @@
"""Base parser class and data structures."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
from enum import Enum
class LogFormat(Enum):
"""Supported log formats."""
JSON = "json"
SYSLOG = "syslog"
APACHE = "apache"
UNKNOWN = "unknown"
@dataclass
class ParsedLogEntry:
"""Represents a parsed log entry."""
raw_line: str
timestamp: Optional[datetime] = None
level: Optional[str] = None
message: str = ""
source: Optional[str] = None
host: Optional[str] = None
facility: Optional[str] = None
severity: Optional[str] = None
logger: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)
line_number: int = 0
error_pattern: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result = {
"raw_line": self.raw_line,
"message": self.message,
"line_number": self.line_number,
}
if self.timestamp:
result["timestamp"] = self.timestamp.isoformat()
if self.level:
result["level"] = self.level
if self.source:
result["source"] = self.source
if self.host:
result["host"] = self.host
if self.facility:
result["facility"] = self.facility
if self.severity:
result["severity"] = self.severity
if self.logger:
result["logger"] = self.logger
if self.extra:
result["extra"] = self.extra
if self.error_pattern:
result["error_pattern"] = self.error_pattern
return result
class LogParser(ABC):
"""Abstract base class for log parsers."""
format_name: str = "base"
@abstractmethod
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse a single log line."""
pass
@abstractmethod
def can_parse(self, line: str) -> bool:
"""Check if this parser can handle the given line."""
pass
def parse_batch(self, lines: List[str]) -> List[ParsedLogEntry]:
"""Parse multiple lines."""
results = []
for i, line in enumerate(lines, 1):
try:
entry = self.parse(line, i)
if entry:
results.append(entry)
except Exception:
results.append(ParsedLogEntry(
raw_line=line,
message="Parse error",
line_number=i
))
return results