From f2ca3181eeed2d9fbb851280ef6e8da3f756ebe7 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 10:06:58 +0000 Subject: [PATCH] Add JSON, syslog, and Apache parsers --- loglens/parsers/json_parser.py | 220 +++++++++++++++++++++++++-------- 1 file changed, 170 insertions(+), 50 deletions(-) diff --git a/loglens/parsers/json_parser.py b/loglens/parsers/json_parser.py index 2f9d0a9..b95a419 100644 --- a/loglens/parsers/json_parser.py +++ b/loglens/parsers/json_parser.py @@ -1,70 +1,190 @@ -import json -import re +"""JSON log parser.""" + from datetime import datetime from typing import Any, Optional -from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry +import orjson + +from loglens.parsers.base import LogParser, ParsedLogEntry -class JSONParser(BaseParser): - """Parser for JSON log formats.""" +class JSONParser(LogParser): + """Parser for JSON-formatted logs.""" - def get_format(self) -> LogFormat: - return LogFormat.JSON + format_name = "json" - def parse(self, line: str) -> Optional[ParsedEntry]: + def __init__(self): + self.timestamp_fields = [ + "@timestamp", + "timestamp", + "time", + "date", + "datetime", + "created_at", + "updated_at", + "log_time", + "event_time", + ] + self.level_fields = ["level", "severity", "log_level", "priority", "levelname"] + self.message_fields = ["message", "msg", "log", "text", "content"] + self.logger_fields = ["logger", "logger_name", "name", "source"] + + def can_parse(self, line: str) -> bool: + """Check if line is valid JSON.""" + line = line.strip() + if not line: + return False + if line.startswith("[") or line.startswith("{"): + try: + orjson.loads(line) + return True + except orjson.JSONDecodeError: + pass + return False + + def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: """Parse a JSON log line.""" + line = line.strip() + if not line: + return None + try: - data = json.loads(line.strip()) - except json.JSONDecodeError: - return None + data = orjson.loads(line) + except orjson.JSONDecodeError as e: + return ParsedLogEntry( + raw_line=line, + message=f"JSON parse error: {str(e)}", + line_number=line_number, + severity="error", + ) - if isinstance(data, list): - return None + entry = ParsedLogEntry(raw_line=line, line_number=line_number) - if not isinstance(data, dict): - return None + if isinstance(data, dict): + entry.timestamp = self._extract_timestamp(data) + entry.level = self._extract_field(data, self.level_fields) + entry.message = self._extract_field(data, self.message_fields) + entry.logger = self._extract_field(data, self.logger_fields) + entry.extra = { + k: v + for k, v in data.items() + if k not in self.timestamp_fields + and k not in self.level_fields + and k not in self.message_fields + and k not in self.logger_fields + and not k.startswith("_") + } + elif isinstance(data, list): + entry.message = str(data) + entry.extra = {"array_length": len(data)} - timestamp = self._extract_timestamp(data) - level = self._extract_level(data) - message = self._extract_message(data) + return entry - return ParsedEntry( - raw_line=line.strip(), - format=self.get_format(), - timestamp=timestamp, - level=level, - message=message, - metadata=data, - ) - - def _extract_timestamp(self, data: dict[str, Any]) -> Optional[str]: - timestamp_fields = ["timestamp", "time", "@timestamp", "date", "created_at"] - for field in timestamp_fields: + def _extract_timestamp(self, data: dict[str, Any]) -> Optional[datetime]: + """Extract timestamp from data dict.""" + for field in self.timestamp_fields: if field in data: + value = data[field] + if isinstance(value, (int, float)): + return datetime.fromtimestamp(value) + elif isinstance(value, str): + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + pass + return None + + def _extract_field(self, data: dict[str, Any], fields: list[str]) -> Optional[str]: + """Extract first matching field from data.""" + for field in fields: + if field in data and data[field] is not None: value = data[field] if isinstance(value, str): return value - elif isinstance(value, (int, float)): - return datetime.fromtimestamp(value).isoformat() + return str(value) return None - def _extract_level(self, data: dict[str, Any]) -> Optional[str]: - level_fields = ["level", "severity", "log_level", "levelname", "status"] - for field in level_fields: - if field in data: - value = data[field] - if isinstance(value, str): - return value.lower() - return None + def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]: + """Parse multiple lines, handling multi-line JSON.""" + results = [] + buffer = "" + line_number = 0 - def _extract_message(self, data: dict[str, Any]) -> str: - message_fields = ["message", "msg", "text", "content", "error", "reason"] - for field in message_fields: - if field in data: - value = data[field] - if isinstance(value, str): - return value - elif isinstance(value, dict): - return json.dumps(value) - return str(data) + for line in lines: + line_number += 1 + line_stripped = line.strip() + + if not line_stripped: + continue + + if buffer: + buffer += line_stripped + else: + buffer = line_stripped + + try: + data = orjson.loads(buffer) + entry = self._create_entry_from_data(data, line, line_number) + results.append(entry) + buffer = "" + except orjson.JSONDecodeError: + if line_stripped.startswith("{") or line_stripped.startswith("["): + if line_stripped.endswith("}") or line_stripped.endswith("]"): + results.append( + ParsedLogEntry( + raw_line=line, + message="Invalid JSON", + line_number=line_number, + severity="error", + ) + ) + buffer = "" + elif buffer.endswith("}") or buffer.endswith("]"): + try: + data = orjson.loads(buffer) + entry = self._create_entry_from_data(data, buffer, line_number) + results.append(entry) + except orjson.JSONDecodeError: + results.append( + ParsedLogEntry( + raw_line=buffer, + message="Invalid JSON", + line_number=line_number, + severity="error", + ) + ) + buffer = "" + elif len(buffer) > 10000: + results.append( + ParsedLogEntry( + raw_line=buffer[:100] + "...", + message="JSON too large to parse", + line_number=line_number, + severity="error", + ) + ) + buffer = "" + + return results + + def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry: + """Create ParsedLogEntry from parsed JSON data.""" + entry = ParsedLogEntry(raw_line=raw_line, line_number=line_number) + + if isinstance(data, dict): + entry.timestamp = self._extract_timestamp(data) + entry.level = self._extract_field(data, self.level_fields) + entry.message = self._extract_field(data, self.message_fields) + entry.logger = self._extract_field(data, self.logger_fields) + entry.extra = { + k: v + for k, v in data.items() + if k not in self.timestamp_fields + and k not in self.level_fields + and k not in self.message_fields + and k not in self.logger_fields + } + else: + entry.message = str(data) + + return entry