Some checks failed
- Added @click.version_option decorator to main() in commands.py - Imported __version__ from loglens package - Resolves CI build failure: 'loglens --version' command not found
71 lines
2.2 KiB
Python
71 lines
2.2 KiB
Python
import json
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Any, Optional
|
|
|
|
from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
|
|
|
|
|
|
class JSONParser(BaseParser):
|
|
"""Parser for JSON log formats."""
|
|
|
|
def get_format(self) -> LogFormat:
|
|
return LogFormat.JSON
|
|
|
|
def parse(self, line: str) -> Optional[ParsedEntry]:
|
|
"""Parse a JSON log line."""
|
|
try:
|
|
data = json.loads(line.strip())
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
if isinstance(data, list):
|
|
return None
|
|
|
|
if not isinstance(data, dict):
|
|
return None
|
|
|
|
timestamp = self._extract_timestamp(data)
|
|
level = self._extract_level(data)
|
|
message = self._extract_message(data)
|
|
|
|
return ParsedEntry(
|
|
raw_line=line.strip(),
|
|
format=self.get_format(),
|
|
timestamp=timestamp,
|
|
level=level,
|
|
message=message,
|
|
metadata=data,
|
|
)
|
|
|
|
def _extract_timestamp(self, data: dict[str, Any]) -> Optional[str]:
|
|
timestamp_fields = ["timestamp", "time", "@timestamp", "date", "created_at"]
|
|
for field in timestamp_fields:
|
|
if field in data:
|
|
value = data[field]
|
|
if isinstance(value, str):
|
|
return value
|
|
elif isinstance(value, (int, float)):
|
|
return datetime.fromtimestamp(value).isoformat()
|
|
return None
|
|
|
|
def _extract_level(self, data: dict[str, Any]) -> Optional[str]:
|
|
level_fields = ["level", "severity", "log_level", "levelname", "status"]
|
|
for field in level_fields:
|
|
if field in data:
|
|
value = data[field]
|
|
if isinstance(value, str):
|
|
return value.lower()
|
|
return None
|
|
|
|
def _extract_message(self, data: dict[str, Any]) -> str:
|
|
message_fields = ["message", "msg", "text", "content", "error", "reason"]
|
|
for field in message_fields:
|
|
if field in data:
|
|
value = data[field]
|
|
if isinstance(value, str):
|
|
return value
|
|
elif isinstance(value, dict):
|
|
return json.dumps(value)
|
|
return str(data)
|