Files
loglens-cli/loglens/parsers/json_parser.py
7000pctAUTO f2ca3181ee
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
Add JSON, syslog, and Apache parsers
2026-02-02 10:06:58 +00:00

191 lines
6.7 KiB
Python

"""JSON log parser."""
from datetime import datetime
from typing import Any, Optional
import orjson
from loglens.parsers.base import LogParser, ParsedLogEntry
class JSONParser(LogParser):
"""Parser for JSON-formatted logs."""
format_name = "json"
def __init__(self):
self.timestamp_fields = [
"@timestamp",
"timestamp",
"time",
"date",
"datetime",
"created_at",
"updated_at",
"log_time",
"event_time",
]
self.level_fields = ["level", "severity", "log_level", "priority", "levelname"]
self.message_fields = ["message", "msg", "log", "text", "content"]
self.logger_fields = ["logger", "logger_name", "name", "source"]
def can_parse(self, line: str) -> bool:
"""Check if line is valid JSON."""
line = line.strip()
if not line:
return False
if line.startswith("[") or line.startswith("{"):
try:
orjson.loads(line)
return True
except orjson.JSONDecodeError:
pass
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse a JSON log line."""
line = line.strip()
if not line:
return None
try:
data = orjson.loads(line)
except orjson.JSONDecodeError as e:
return ParsedLogEntry(
raw_line=line,
message=f"JSON parse error: {str(e)}",
line_number=line_number,
severity="error",
)
entry = ParsedLogEntry(raw_line=line, line_number=line_number)
if isinstance(data, dict):
entry.timestamp = self._extract_timestamp(data)
entry.level = self._extract_field(data, self.level_fields)
entry.message = self._extract_field(data, self.message_fields)
entry.logger = self._extract_field(data, self.logger_fields)
entry.extra = {
k: v
for k, v in data.items()
if k not in self.timestamp_fields
and k not in self.level_fields
and k not in self.message_fields
and k not in self.logger_fields
and not k.startswith("_")
}
elif isinstance(data, list):
entry.message = str(data)
entry.extra = {"array_length": len(data)}
return entry
def _extract_timestamp(self, data: dict[str, Any]) -> Optional[datetime]:
"""Extract timestamp from data dict."""
for field in self.timestamp_fields:
if field in data:
value = data[field]
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value)
elif isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
pass
return None
def _extract_field(self, data: dict[str, Any], fields: list[str]) -> Optional[str]:
"""Extract first matching field from data."""
for field in fields:
if field in data and data[field] is not None:
value = data[field]
if isinstance(value, str):
return value
return str(value)
return None
def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]:
"""Parse multiple lines, handling multi-line JSON."""
results = []
buffer = ""
line_number = 0
for line in lines:
line_number += 1
line_stripped = line.strip()
if not line_stripped:
continue
if buffer:
buffer += line_stripped
else:
buffer = line_stripped
try:
data = orjson.loads(buffer)
entry = self._create_entry_from_data(data, line, line_number)
results.append(entry)
buffer = ""
except orjson.JSONDecodeError:
if line_stripped.startswith("{") or line_stripped.startswith("["):
if line_stripped.endswith("}") or line_stripped.endswith("]"):
results.append(
ParsedLogEntry(
raw_line=line,
message="Invalid JSON",
line_number=line_number,
severity="error",
)
)
buffer = ""
elif buffer.endswith("}") or buffer.endswith("]"):
try:
data = orjson.loads(buffer)
entry = self._create_entry_from_data(data, buffer, line_number)
results.append(entry)
except orjson.JSONDecodeError:
results.append(
ParsedLogEntry(
raw_line=buffer,
message="Invalid JSON",
line_number=line_number,
severity="error",
)
)
buffer = ""
elif len(buffer) > 10000:
results.append(
ParsedLogEntry(
raw_line=buffer[:100] + "...",
message="JSON too large to parse",
line_number=line_number,
severity="error",
)
)
buffer = ""
return results
def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry:
"""Create ParsedLogEntry from parsed JSON data."""
entry = ParsedLogEntry(raw_line=raw_line, line_number=line_number)
if isinstance(data, dict):
entry.timestamp = self._extract_timestamp(data)
entry.level = self._extract_field(data, self.level_fields)
entry.message = self._extract_field(data, self.message_fields)
entry.logger = self._extract_field(data, self.logger_fields)
entry.extra = {
k: v
for k, v in data.items()
if k not in self.timestamp_fields
and k not in self.level_fields
and k not in self.message_fields
and k not in self.logger_fields
}
else:
entry.message = str(data)
return entry