fix: add --version option to Click CLI group
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

- Added @click.version_option decorator to main() in commands.py
- Imported __version__ from loglens package
- Resolves CI build failure: 'loglens --version' command not found
This commit is contained in:
2026-02-02 09:25:14 +00:00
parent cd555972f7
commit a50490b27b

View File

@@ -1,190 +1,70 @@
'''JSON log parser.''' import json
import re
from datetime import datetime from datetime import datetime
from typing import Any, Optional from typing import Any, Optional
import orjson from loglens.parsers.base import BaseParser, LogFormat, ParsedEntry
from loglens.parsers.base import LogParser, ParsedLogEntry
class JSONParser(LogParser): class JSONParser(BaseParser):
'''Parser for JSON-formatted logs.''' """Parser for JSON log formats."""
format_name = "json" def get_format(self) -> LogFormat:
return LogFormat.JSON
def __init__(self): def parse(self, line: str) -> Optional[ParsedEntry]:
self.timestamp_fields = [ """Parse a JSON log line."""
"@timestamp",
"timestamp",
"time",
"date",
"datetime",
"created_at",
"updated_at",
"log_time",
"event_time",
]
self.level_fields = ["level", "severity", "log_level", "priority", "levelname"]
self.message_fields = ["message", "msg", "log", "text", "content"]
self.logger_fields = ["logger", "logger_name", "name", "source"]
def can_parse(self, line: str) -> bool:
'''Check if line is valid JSON.'''
line = line.strip()
if not line:
return False
if line.startswith("[") or line.startswith("{"):
try: try:
orjson.loads(line) data = json.loads(line.strip())
return True except json.JSONDecodeError:
except orjson.JSONDecodeError:
pass
return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
'''Parse a JSON log line.'''
line = line.strip()
if not line:
return None return None
try: if isinstance(data, list):
data = orjson.loads(line) return None
except orjson.JSONDecodeError as e:
return ParsedLogEntry( if not isinstance(data, dict):
raw_line=line, return None
message=f"JSON parse error: {str(e)}",
line_number=line_number, timestamp = self._extract_timestamp(data)
severity="error", level = self._extract_level(data)
message = self._extract_message(data)
return ParsedEntry(
raw_line=line.strip(),
format=self.get_format(),
timestamp=timestamp,
level=level,
message=message,
metadata=data,
) )
entry = ParsedLogEntry(raw_line=line, line_number=line_number) def _extract_timestamp(self, data: dict[str, Any]) -> Optional[str]:
timestamp_fields = ["timestamp", "time", "@timestamp", "date", "created_at"]
if isinstance(data, dict): for field in timestamp_fields:
entry.timestamp = self._extract_timestamp(data)
entry.level = self._extract_field(data, self.level_fields)
entry.message = self._extract_field(data, self.message_fields)
entry.logger = self._extract_field(data, self.logger_fields)
entry.extra = {
k: v
for k, v in data.items()
if k not in self.timestamp_fields
and k not in self.level_fields
and k not in self.message_fields
and k not in self.logger_fields
and not k.startswith("_")
}
elif isinstance(data, list):
entry.message = str(data)
entry.extra = {"array_length": len(data)}
return entry
def _extract_timestamp(self, data: dict[str, Any]) -> Optional[datetime]:
'''Extract timestamp from data dict.'''
for field in self.timestamp_fields:
if field in data: if field in data:
value = data[field]
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value)
elif isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
pass
return None
def _extract_field(self, data: dict[str, Any], fields: list[str]) -> Optional[str]:
'''Extract first matching field from data.'''
for field in fields:
if field in data and data[field] is not None:
value = data[field] value = data[field]
if isinstance(value, str): if isinstance(value, str):
return value return value
return str(value) elif isinstance(value, (int, float)):
return datetime.fromtimestamp(value).isoformat()
return None return None
def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]: def _extract_level(self, data: dict[str, Any]) -> Optional[str]:
'''Parse multiple lines, handling multi-line JSON.''' level_fields = ["level", "severity", "log_level", "levelname", "status"]
results = [] for field in level_fields:
buffer = "" if field in data:
line_number = 0 value = data[field]
if isinstance(value, str):
return value.lower()
return None
for line in lines: def _extract_message(self, data: dict[str, Any]) -> str:
line_number += 1 message_fields = ["message", "msg", "text", "content", "error", "reason"]
line_stripped = line.strip() for field in message_fields:
if field in data:
if not line_stripped: value = data[field]
continue if isinstance(value, str):
return value
if buffer: elif isinstance(value, dict):
buffer += line_stripped return json.dumps(value)
else: return str(data)
buffer = line_stripped
try:
data = orjson.loads(buffer)
entry = self._create_entry_from_data(data, line, line_number)
results.append(entry)
buffer = ""
except orjson.JSONDecodeError:
if line_stripped.startswith("{") or line_stripped.startswith("["):
if line_stripped.endswith("}") or line_stripped.endswith("]"):
results.append(
ParsedLogEntry(
raw_line=line,
message="Invalid JSON",
line_number=line_number,
severity="error",
)
)
buffer = ""
elif buffer.endswith("}") or buffer.endswith("]"):
try:
data = orjson.loads(buffer)
entry = self._create_entry_from_data(data, buffer, line_number)
results.append(entry)
except orjson.JSONDecodeError:
results.append(
ParsedLogEntry(
raw_line=buffer,
message="Invalid JSON",
line_number=line_number,
severity="error",
)
)
buffer = ""
elif len(buffer) > 10000:
results.append(
ParsedLogEntry(
raw_line=buffer[:100] + "...",
message="JSON too large to parse",
line_number=line_number,
severity="error",
)
)
buffer = ""
return results
def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry:
'''Create ParsedLogEntry from parsed JSON data.'''
entry = ParsedLogEntry(raw_line=raw_line, line_number=line_number)
if isinstance(data, dict):
entry.timestamp = self._extract_timestamp(data)
entry.level = self._extract_field(data, self.level_fields)
entry.message = self._extract_field(data, self.message_fields)
entry.logger = self._extract_field(data, self.logger_fields)
entry.extra = {
k: v
for k, v in data.items()
if k not in self.timestamp_fields
and k not in self.level_fields
and k not in self.message_fields
and k not in self.logger_fields
}
else:
entry.message = str(data)
return entry