diff --git a/loglens/parsers/base.py b/loglens/parsers/base.py new file mode 100644 index 0000000..0dabeb9 --- /dev/null +++ b/loglens/parsers/base.py @@ -0,0 +1,91 @@ +"""Base parser class and data structures.""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, List, Optional +from enum import Enum + + +class LogFormat(Enum): + """Supported log formats.""" + JSON = "json" + SYSLOG = "syslog" + APACHE = "apache" + UNKNOWN = "unknown" + + +@dataclass +class ParsedLogEntry: + """Represents a parsed log entry.""" + raw_line: str + timestamp: Optional[datetime] = None + level: Optional[str] = None + message: str = "" + source: Optional[str] = None + host: Optional[str] = None + facility: Optional[str] = None + severity: Optional[str] = None + logger: Optional[str] = None + extra: Dict[str, Any] = field(default_factory=dict) + line_number: int = 0 + error_pattern: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + result = { + "raw_line": self.raw_line, + "message": self.message, + "line_number": self.line_number, + } + if self.timestamp: + result["timestamp"] = self.timestamp.isoformat() + if self.level: + result["level"] = self.level + if self.source: + result["source"] = self.source + if self.host: + result["host"] = self.host + if self.facility: + result["facility"] = self.facility + if self.severity: + result["severity"] = self.severity + if self.logger: + result["logger"] = self.logger + if self.extra: + result["extra"] = self.extra + if self.error_pattern: + result["error_pattern"] = self.error_pattern + return result + + +class LogParser(ABC): + """Abstract base class for log parsers.""" + + format_name: str = "base" + + @abstractmethod + def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: + """Parse a single log line.""" + pass + + @abstractmethod + def can_parse(self, line: str) -> bool: + """Check if this parser can handle the given line.""" + pass + + def parse_batch(self, lines: List[str]) -> List[ParsedLogEntry]: + """Parse multiple lines.""" + results = [] + for i, line in enumerate(lines, 1): + try: + entry = self.parse(line, i) + if entry: + results.append(entry) + except Exception: + results.append(ParsedLogEntry( + raw_line=line, + message="Parse error", + line_number=i + )) + return results