diff --git a/loglens/parsers/json_parser.py b/loglens/parsers/json_parser.py index 4664311..2f9d0a9 100644 --- a/loglens/parsers/json_parser.py +++ b/loglens/parsers/json_parser.py @@ -1,190 +1,70 @@ -'''JSON log parser.''' - +import json +import re from datetime import datetime from typing import Any, Optional -import orjson - -from loglens.parsers.base import LogParser, ParsedLogEntry +from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry -class JSONParser(LogParser): - '''Parser for JSON-formatted logs.''' +class JSONParser(BaseParser): + """Parser for JSON log formats.""" - format_name = "json" + def get_format(self) -> LogFormat: + return LogFormat.JSON - def __init__(self): - self.timestamp_fields = [ - "@timestamp", - "timestamp", - "time", - "date", - "datetime", - "created_at", - "updated_at", - "log_time", - "event_time", - ] - self.level_fields = ["level", "severity", "log_level", "priority", "levelname"] - self.message_fields = ["message", "msg", "log", "text", "content"] - self.logger_fields = ["logger", "logger_name", "name", "source"] - - def can_parse(self, line: str) -> bool: - '''Check if line is valid JSON.''' - line = line.strip() - if not line: - return False - if line.startswith("[") or line.startswith("{"): - try: - orjson.loads(line) - return True - except orjson.JSONDecodeError: - pass - return False - - def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: - '''Parse a JSON log line.''' - line = line.strip() - if not line: + def parse(self, line: str) -> Optional[ParsedEntry]: + """Parse a JSON log line.""" + try: + data = json.loads(line.strip()) + except json.JSONDecodeError: return None - try: - data = orjson.loads(line) - except orjson.JSONDecodeError as e: - return ParsedLogEntry( - raw_line=line, - message=f"JSON parse error: {str(e)}", - line_number=line_number, - severity="error", - ) + if isinstance(data, list): + return None - entry = ParsedLogEntry(raw_line=line, line_number=line_number) + if not isinstance(data, dict): + return None - if isinstance(data, dict): - entry.timestamp = self._extract_timestamp(data) - entry.level = self._extract_field(data, self.level_fields) - entry.message = self._extract_field(data, self.message_fields) - entry.logger = self._extract_field(data, self.logger_fields) - entry.extra = { - k: v - for k, v in data.items() - if k not in self.timestamp_fields - and k not in self.level_fields - and k not in self.message_fields - and k not in self.logger_fields - and not k.startswith("_") - } - elif isinstance(data, list): - entry.message = str(data) - entry.extra = {"array_length": len(data)} + timestamp = self._extract_timestamp(data) + level = self._extract_level(data) + message = self._extract_message(data) - return entry + return ParsedEntry( + raw_line=line.strip(), + format=self.get_format(), + timestamp=timestamp, + level=level, + message=message, + metadata=data, + ) - def _extract_timestamp(self, data: dict[str, Any]) -> Optional[datetime]: - '''Extract timestamp from data dict.''' - for field in self.timestamp_fields: + def _extract_timestamp(self, data: dict[str, Any]) -> Optional[str]: + timestamp_fields = ["timestamp", "time", "@timestamp", "date", "created_at"] + for field in timestamp_fields: if field in data: - value = data[field] - if isinstance(value, (int, float)): - return datetime.fromtimestamp(value) - elif isinstance(value, str): - try: - return datetime.fromisoformat(value.replace("Z", "+00:00")) - except ValueError: - pass - return None - - def _extract_field(self, data: dict[str, Any], fields: list[str]) -> Optional[str]: - '''Extract first matching field from data.''' - for field in fields: - if field in data and data[field] is not None: value = data[field] if isinstance(value, str): return value - return str(value) + elif isinstance(value, (int, float)): + return datetime.fromtimestamp(value).isoformat() return None - def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]: - '''Parse multiple lines, handling multi-line JSON.''' - results = [] - buffer = "" - line_number = 0 + def _extract_level(self, data: dict[str, Any]) -> Optional[str]: + level_fields = ["level", "severity", "log_level", "levelname", "status"] + for field in level_fields: + if field in data: + value = data[field] + if isinstance(value, str): + return value.lower() + return None - for line in lines: - line_number += 1 - line_stripped = line.strip() - - if not line_stripped: - continue - - if buffer: - buffer += line_stripped - else: - buffer = line_stripped - - try: - data = orjson.loads(buffer) - entry = self._create_entry_from_data(data, line, line_number) - results.append(entry) - buffer = "" - except orjson.JSONDecodeError: - if line_stripped.startswith("{") or line_stripped.startswith("["): - if line_stripped.endswith("}") or line_stripped.endswith("]"): - results.append( - ParsedLogEntry( - raw_line=line, - message="Invalid JSON", - line_number=line_number, - severity="error", - ) - ) - buffer = "" - elif buffer.endswith("}") or buffer.endswith("]"): - try: - data = orjson.loads(buffer) - entry = self._create_entry_from_data(data, buffer, line_number) - results.append(entry) - except orjson.JSONDecodeError: - results.append( - ParsedLogEntry( - raw_line=buffer, - message="Invalid JSON", - line_number=line_number, - severity="error", - ) - ) - buffer = "" - elif len(buffer) > 10000: - results.append( - ParsedLogEntry( - raw_line=buffer[:100] + "...", - message="JSON too large to parse", - line_number=line_number, - severity="error", - ) - ) - buffer = "" - - return results - - def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry: - '''Create ParsedLogEntry from parsed JSON data.''' - entry = ParsedLogEntry(raw_line=raw_line, line_number=line_number) - - if isinstance(data, dict): - entry.timestamp = self._extract_timestamp(data) - entry.level = self._extract_field(data, self.level_fields) - entry.message = self._extract_field(data, self.message_fields) - entry.logger = self._extract_field(data, self.logger_fields) - entry.extra = { - k: v - for k, v in data.items() - if k not in self.timestamp_fields - and k not in self.level_fields - and k not in self.message_fields - and k not in self.logger_fields - } - else: - entry.message = str(data) - - return entry + def _extract_message(self, data: dict[str, Any]) -> str: + message_fields = ["message", "msg", "text", "content", "error", "reason"] + for field in message_fields: + if field in data: + value = data[field] + if isinstance(value, str): + return value + elif isinstance(value, dict): + return json.dumps(value) + return str(data)