fix: resolve CI/CD linting and formatting issues
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled

- Replaced deprecated typing.Dict/List/Tuple with native types (UP035)
- Removed unused imports across all modules
- Fixed unused variables by replacing with _ prefix
- Added missing Optional type imports
- Reorganized imports for proper sorting (I001)
- Applied black formatting to all source files
This commit is contained in:
2026-02-02 08:52:04 +00:00
parent 492b42ec70
commit 3712786db6

View File

@@ -1,29 +1,36 @@
"""JSON log parser.""" '''JSON log parser.'''
import re
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional from typing import Any, Optional
import orjson import orjson
from loglens.parsers.base import LogParser, ParsedLogEntry from loglens.parsers.base import LogParser, ParsedLogEntry
class JSONParser(LogParser): class JSONParser(LogParser):
"""Parser for JSON-formatted logs.""" '''Parser for JSON-formatted logs.'''
format_name = "json" format_name = "json"
def __init__(self): def __init__(self):
self.timestamp_fields = [ self.timestamp_fields = [
"@timestamp", "timestamp", "time", "date", "datetime", "@timestamp",
"created_at", "updated_at", "log_time", "event_time" "timestamp",
"time",
"date",
"datetime",
"created_at",
"updated_at",
"log_time",
"event_time",
] ]
self.level_fields = ["level", "severity", "log_level", "priority", "levelname"] self.level_fields = ["level", "severity", "log_level", "priority", "levelname"]
self.message_fields = ["message", "msg", "log", "text", "content"] self.message_fields = ["message", "msg", "log", "text", "content"]
self.logger_fields = ["logger", "logger_name", "name", "source"] self.logger_fields = ["logger", "logger_name", "name", "source"]
def can_parse(self, line: str) -> bool: def can_parse(self, line: str) -> bool:
"""Check if line is valid JSON.""" '''Check if line is valid JSON.'''
line = line.strip() line = line.strip()
if not line: if not line:
return False return False
@@ -36,7 +43,7 @@ class JSONParser(LogParser):
return False return False
def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]: def parse(self, line: str, line_number: int = 0) -> Optional[ParsedLogEntry]:
"""Parse a JSON log line.""" '''Parse a JSON log line.'''
line = line.strip() line = line.strip()
if not line: if not line:
return None return None
@@ -48,33 +55,33 @@ class JSONParser(LogParser):
raw_line=line, raw_line=line,
message=f"JSON parse error: {str(e)}", message=f"JSON parse error: {str(e)}",
line_number=line_number, line_number=line_number,
severity="error" severity="error",
) )
entry = ParsedLogEntry( entry = ParsedLogEntry(raw_line=line, line_number=line_number)
raw_line=line,
line_number=line_number
)
if isinstance(data, dict): if isinstance(data, dict):
entry.timestamp = self._extract_timestamp(data) entry.timestamp = self._extract_timestamp(data)
entry.level = self._extract_field(data, self.level_fields) entry.level = self._extract_field(data, self.level_fields)
entry.message = self._extract_field(data, self.message_fields) entry.message = self._extract_field(data, self.message_fields)
entry.logger = self._extract_field(data, self.logger_fields) entry.logger = self._extract_field(data, self.logger_fields)
entry.extra = {k: v for k, v in data.items() entry.extra = {
if k not in self.timestamp_fields k: v
and k not in self.level_fields for k, v in data.items()
and k not in self.message_fields if k not in self.timestamp_fields
and k not in self.logger_fields and k not in self.level_fields
and not k.startswith("_")} and k not in self.message_fields
and k not in self.logger_fields
and not k.startswith("_")
}
elif isinstance(data, list): elif isinstance(data, list):
entry.message = str(data) entry.message = str(data)
entry.extra = {"array_length": len(data)} entry.extra = {"array_length": len(data)}
return entry return entry
def _extract_timestamp(self, data: Dict[str, Any]) -> Optional[datetime]: def _extract_timestamp(self, data: dict[str, Any]) -> Optional[datetime]:
"""Extract timestamp from data dict.""" '''Extract timestamp from data dict.'''
for field in self.timestamp_fields: for field in self.timestamp_fields:
if field in data: if field in data:
value = data[field] value = data[field]
@@ -87,8 +94,8 @@ class JSONParser(LogParser):
pass pass
return None return None
def _extract_field(self, data: Dict[str, Any], fields: List[str]) -> Optional[str]: def _extract_field(self, data: dict[str, Any], fields: list[str]) -> Optional[str]:
"""Extract first matching field from data.""" '''Extract first matching field from data.'''
for field in fields: for field in fields:
if field in data and data[field] is not None: if field in data and data[field] is not None:
value = data[field] value = data[field]
@@ -97,8 +104,8 @@ class JSONParser(LogParser):
return str(value) return str(value)
return None return None
def parse_batch(self, lines: List[str]) -> List[ParsedLogEntry]: def parse_batch(self, lines: list[str]) -> list[ParsedLogEntry]:
"""Parse multiple lines, handling multi-line JSON.""" '''Parse multiple lines, handling multi-line JSON.'''
results = [] results = []
buffer = "" buffer = ""
line_number = 0 line_number = 0
@@ -123,12 +130,14 @@ class JSONParser(LogParser):
except orjson.JSONDecodeError: except orjson.JSONDecodeError:
if line_stripped.startswith("{") or line_stripped.startswith("["): if line_stripped.startswith("{") or line_stripped.startswith("["):
if line_stripped.endswith("}") or line_stripped.endswith("]"): if line_stripped.endswith("}") or line_stripped.endswith("]"):
results.append(ParsedLogEntry( results.append(
raw_line=line, ParsedLogEntry(
message="Invalid JSON", raw_line=line,
line_number=line_number, message="Invalid JSON",
severity="error" line_number=line_number,
)) severity="error",
)
)
buffer = "" buffer = ""
elif buffer.endswith("}") or buffer.endswith("]"): elif buffer.endswith("}") or buffer.endswith("]"):
try: try:
@@ -136,41 +145,45 @@ class JSONParser(LogParser):
entry = self._create_entry_from_data(data, buffer, line_number) entry = self._create_entry_from_data(data, buffer, line_number)
results.append(entry) results.append(entry)
except orjson.JSONDecodeError: except orjson.JSONDecodeError:
results.append(ParsedLogEntry( results.append(
raw_line=buffer, ParsedLogEntry(
message="Invalid JSON", raw_line=buffer,
line_number=line_number, message="Invalid JSON",
severity="error" line_number=line_number,
)) severity="error",
)
)
buffer = "" buffer = ""
elif len(buffer) > 10000: elif len(buffer) > 10000:
results.append(ParsedLogEntry( results.append(
raw_line=buffer[:100] + "...", ParsedLogEntry(
message="JSON too large to parse", raw_line=buffer[:100] + "...",
line_number=line_number, message="JSON too large to parse",
severity="error" line_number=line_number,
)) severity="error",
)
)
buffer = "" buffer = ""
return results return results
def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry: def _create_entry_from_data(self, data: Any, raw_line: str, line_number: int) -> ParsedLogEntry:
"""Create ParsedLogEntry from parsed JSON data.""" '''Create ParsedLogEntry from parsed JSON data.'''
entry = ParsedLogEntry( entry = ParsedLogEntry(raw_line=raw_line, line_number=line_number)
raw_line=raw_line,
line_number=line_number
)
if isinstance(data, dict): if isinstance(data, dict):
entry.timestamp = self._extract_timestamp(data) entry.timestamp = self._extract_timestamp(data)
entry.level = self._extract_field(data, self.level_fields) entry.level = self._extract_field(data, self.level_fields)
entry.message = self._extract_field(data, self.message_fields) entry.message = self._extract_field(data, self.message_fields)
entry.logger = self._extract_field(data, self.logger_fields) entry.logger = self._extract_field(data, self.logger_fields)
entry.extra = {k: v for k, v in data.items() entry.extra = {
if k not in self.timestamp_fields k: v
and k not in self.level_fields for k, v in data.items()
and k not in self.message_fields if k not in self.timestamp_fields
and k not in self.logger_fields} and k not in self.level_fields
and k not in self.message_fields
and k not in self.logger_fields
}
else: else:
entry.message = str(data) entry.message = str(data)