Files
loglens-cli/loglens/parsers/json_parser.py
7000pctAUTO a50490b27b
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
fix: add --version option to Click CLI group
- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
2026-02-02 09:25:14 +00:00

71 lines
2.2 KiB
Python

import json
import re
from datetime import datetime
from typing import Any, Optional
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
class JSONParser(BaseParser):
"""Parser for JSON log formats."""
def get_format(self) -> LogFormat:
return LogFormat.JSON
def parse(self, line: str) -> Optional[ParsedEntry]:
"""Parse a JSON log line."""
try:
data = json.loads(line.strip())
except json.JSONDecodeError:
return None
if isinstance(data, list):
return None
if not isinstance(data, dict):
return None
timestamp = self._extract_timestamp(data)
level = self._extract_level(data)
message = self._extract_message(data)
return ParsedEntry(
raw_line=line.strip(),
format=self.get_format(),
timestamp=timestamp,
level=level,
message=message,
metadata=data,
)
def _extract_timestamp(self, data: dict[str, Any]) -> Optional[str]:
timestamp_fields = ["timestamp", "time", "@timestamp", "date", "created_at"]
for field in timestamp_fields:
if field in data:
value = data[field]
if isinstance(value, str):
return value
elif isinstance(value, (int, float)):
return datetime.fromtimestamp(value).isoformat()
return None
def _extract_level(self, data: dict[str, Any]) -> Optional[str]:
level_fields = ["level", "severity", "log_level", "levelname", "status"]
for field in level_fields:
if field in data:
value = data[field]
if isinstance(value, str):
return value.lower()
return None
def _extract_message(self, data: dict[str, Any]) -> str:
message_fields = ["message", "msg", "text", "content", "error", "reason"]
for field in message_fields:
if field in data:
value = data[field]
if isinstance(value, str):
return value
elif isinstance(value, dict):
return json.dumps(value)
return str(data)