From 2f2103fd1cfd4634c15a8cb20285cc40e23f45a9 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 08:03:21 +0000 Subject: [PATCH] Add parsers: JSON, Syslog, Apache, and factory --- loglens/parsers/json_parser.py | 177 +++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 loglens/parsers/json_parser.py diff --git a/loglens/parsers/json_parser.py b/loglens/parsers/json_parser.py new file mode 100644 index 0000000..4f8fb24 --- /dev/null +++ b/loglens/parsers/json_parser.py @@ -0,0 +1,177 @@ +"""JSON log parser.""" + +import re +from datetime import datetime +from typing import Any, Dict, List, Optional +import orjson + +from loglens.parsers.base import LogParser, ParsedLogEntry + + +class JSONParser(LogParser): + """Parser for JSON-formatted logs.""" + + format_name = "json" + + def __init__(self): + self.timestamp_fields = [ + "@timestamp", "timestamp", "time", "date", "datetime", + "created_at", "updated_at", "log_time", "event_time" + ] + self.level_fields = ["level", "severity", "log_level", "priority", "levelname"] + self.message_fields = ["message", "msg", "log", "text", "content"] + self.logger_fields = ["logger", "logger_name", "name", "source"] + + def can_parse(self, line: str) -> bool: + """Check if line is valid JSON.""" + line = line.strip() + if not line: + return False + if line.startswith("[") or line.startswith("{"): + try: + orjson.loads(line) + return True + except orjson.JSONDecodeError: + pass + return False + + def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: + """Parse a JSON log line.""" + line = line.strip() + if not line: + return None + + try: + data = orjson.loads(line) + except orjson.JSONDecodeError as e: + return ParsedLogEntry( + raw_line=line, + message=f"JSON parse error: {str(e)}", + line_number=line_number, + severity="error" + ) + + entry = ParsedLogEntry( + raw_line=line, + line_number=line_number + ) + + if isinstance(data, dict): + entry.timestamp = self._extract_timestamp(data) + entry.level = self._extract_field(data, self.level_fields) + entry.message = self._extract_field(data, self.message_fields) + entry.logger = self._extract_field(data, self.logger_fields) + entry.extra = {k: v for k, v in data.items() + if k not in self.timestamp_fields + and k not in self.level_fields + and k not in self.message_fields + and k not in self.logger_fields + and not k.startswith("_")} + elif isinstance(data, list): + entry.message = str(data) + entry.extra = {"array_length": len(data)} + + return entry + + def _extract_timestamp(self, data: Dict[str, Any]) -> Optional[datetime]: + """Extract timestamp from data dict.""" + for field in self.timestamp_fields: + if field in data: + value = data[field] + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value) + elif isinstance(value, str): + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + pass + return None + + def _extract_field(self, data: Dict[str, Any], fields: List[str]) -> Optional[str]: + """Extract first matching field from data.""" + for field in fields: + if field in data and data[field] is not None: + value = data[field] + if isinstance(value, str): + return value + return str(value) + return None + + def parse_batch(self, lines: List[str]) -> List[ParsedLogEntry]: + """Parse multiple lines, handling multi-line JSON.""" + results = [] + buffer = "" + line_number = 0 + + for line in lines: + line_number += 1 + line_stripped = line.strip() + + if not line_stripped: + continue + + if buffer: + buffer += line_stripped + else: + buffer = line_stripped + + try: + data = orjson.loads(buffer) + entry = self._create_entry_from_data(data, line, line_number) + results.append(entry) + buffer = "" + except orjson.JSONDecodeError: + if line_stripped.startswith("{") or line_stripped.startswith("["): + if line_stripped.endswith("}") or line_stripped.endswith("]"): + results.append(ParsedLogEntry( + raw_line=line, + message="Invalid JSON", + line_number=line_number, + severity="error" + )) + buffer = "" + elif buffer.endswith("}") or buffer.endswith("]"): + try: + data = orjson.loads(buffer) + entry = self._create_entry_from_data(data, buffer, line_number) + results.append(entry) + except orjson.JSONDecodeError: + results.append(ParsedLogEntry( + raw_line=buffer, + message="Invalid JSON", + line_number=line_number, + severity="error" + )) + buffer = "" + elif len(buffer) > 10000: + results.append(ParsedLogEntry( + raw_line=buffer[:100] + "...", + message="JSON too large to parse", + line_number=line_number, + severity="error" + )) + buffer = "" + + return results + + def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry: + """Create ParsedLogEntry from parsed JSON data.""" + entry = ParsedLogEntry( + raw_line=raw_line, + line_number=line_number + ) + + if isinstance(data, dict): + entry.timestamp = self._extract_timestamp(data) + entry.level = self._extract_field(data, self.level_fields) + entry.message = self._extract_field(data, self.message_fields) + entry.logger = self._extract_field(data, self.logger_fields) + entry.extra = {k: v for k, v in data.items() + if k not in self.timestamp_fields + and k not in self.level_fields + and k not in self.message_fields + and k not in self.logger_fields} + else: + entry.message = str(data) + + return entry