Add loglens package files (parsers, cli, config)
Some checks failed
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (3.10) (push) Has been cancelled

This commit is contained in:
2026-02-02 10:05:27 +00:00
parent 05d7c2ec3f
commit 4f6f5e0370

View File

@@ -1,3 +1,5 @@
"""Base parser class and data structures."""
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
@@ -11,43 +13,77 @@ class LogFormat(Enum):
JSON = "json" JSON = "json"
SYSLOG = "syslog" SYSLOG = "syslog"
APACHE = "apache" APACHE = "apache"
RAW = "raw" UNKNOWN = "unknown"
@dataclass @dataclass
class ParsedEntry: class ParsedLogEntry:
"""Parsed log entry.""" """Represents a parsed log entry."""
raw_line: str raw_line: str
format: LogFormat timestamp: Optional[datetime] = None
timestamp: Optional[str] level: Optional[str] = None
level: Optional[str] message: str = ""
message: str source: Optional[str] = None
metadata: dict[str, Any] = field(default_factory=dict) host: Optional[str] = None
severity: str = "info" facility: Optional[str] = None
severity: Optional[str] = None
logger: Optional[str] = None
extra: dict[str, Any] = field(default_factory=dict)
line_number: int = 0
error_pattern: Optional[str] = None
def to_dict(self) -> dict: def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary.""" """Convert to dictionary."""
return { result = {
"raw_line": self.raw_line, "raw_line": self.raw_line,
"format": self.format.value,
"timestamp": self.timestamp,
"level": self.level,
"message": self.message, "message": self.message,
"metadata": self.metadata, "line_number": self.line_number,
"severity": self.severity,
} }
if self.timestamp:
result["timestamp"] = self.timestamp.isoformat()
if self.level:
result["level"] = self.level
if self.source:
result["source"] = self.source
if self.host:
result["host"] = self.host
if self.facility:
result["facility"] = self.facility
if self.severity:
result["severity"] = self.severity
if self.logger:
result["logger"] = self.logger
if self.extra:
result["extra"] = self.extra
if self.error_pattern:
result["error_pattern"] = self.error_pattern
return result
class BaseParser(ABC): class LogParser(ABC):
"""Base parser class.""" """Abstract base class for log parsers."""
format_name: str = "base"
@abstractmethod @abstractmethod
def get_format(self) -> LogFormat: def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Get the format this parser handles.""" """Parse a single log line."""
pass pass
@abstractmethod @abstractmethod
def parse(self, line: str) -> Optional[ParsedEntry]: def can_parse(self, line: str) -> bool:
"""Parse a log line.""" """Check if this parser can handle the given line."""
pass pass
def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]:
"""Parse multiple lines."""
results = []
for i, line in enumerate(lines, 1):
try:
entry = self.parse(line, i)
if entry:
results.append(entry)
except Exception:
results.append(ParsedLogEntry(raw_line=line, message="Parse error", line_number=i))
return results