"""JSON type inference and analysis module.""" from typing import Any, Dict, List, Optional, Set from dataclasses import dataclass from json import JSONDecodeError import json @dataclass class TypeInfo: """Stores type information for a JSON value.""" type_name: str format: Optional[str] = None properties: Optional[Dict[str, 'TypeInfo']] = None items: Optional['TypeInfo'] = None enum_values: Optional[List[Any]] = None nullable: bool = False class JsonAnalyzer: """Analyzes JSON data and infers OpenAPI schema types.""" def __init__(self, max_depth: int = 10): self.max_depth = max_depth self._visited_paths: Set[str] = set() def analyze(self, data: Any) -> TypeInfo: """Analyze JSON data and return type information.""" self._visited_paths.clear() return self._analyze_value(data, "$") def _analyze_value(self, value: Any, path: str) -> TypeInfo: if path in self._visited_paths: return TypeInfo(type_name="object", properties={}) if value is None: return TypeInfo(type_name="null") value_type = type(value) if value_type is str: return self._analyze_string(value) elif value_type is int: return self._analyze_integer(value) elif value_type is float: return self._analyze_number(value) elif value_type is bool: return TypeInfo(type_name="boolean") elif value_type is list: return self._analyze_array(value, path) elif value_type is dict: self._visited_paths.add(path) result = self._analyze_object(value, path) return result else: return TypeInfo(type_name="string") def _analyze_string(self, value: str) -> TypeInfo: type_info = TypeInfo(type_name="string") if self._is_date(value): type_info.format = "date" elif self._is_date_time(value): type_info.format = "date-time" elif self._is_email(value): type_info.format = "email" elif self._is_uri(value): type_info.format = "uri" elif self._is_uuid(value): type_info.format = "uuid" elif self._is_byte(value): type_info.format = "byte" return type_info def _analyze_number(self, value: float) -> TypeInfo: return TypeInfo(type_name="number", format="double") def _analyze_integer(self, value: int) -> TypeInfo: if value >= -2147483648 and value <= 2147483647: return TypeInfo(type_name="integer", format="int32") else: return TypeInfo(type_name="integer", format="int64") def _analyze_array(self, value: list, path: str) -> TypeInfo: if not value: return TypeInfo(type_name="array", items=TypeInfo(type_name="string")) type_counts: Dict[str, int] = {} item_types: List[TypeInfo] = [] for item in value: item_type = self._analyze_value(item, f"{path}[]") type_key = item_type.type_name type_counts[type_key] = type_counts.get(type_key, 0) + 1 item_types.append(item_type) if len(type_counts) == 1: only_type = list(type_counts.keys())[0] if only_type == "object": merged = self._merge_object_properties(item_types) return TypeInfo(type_name="array", items=merged) else: return TypeInfo(type_name="array", items=item_types[0]) unique_types = list(set(t.type_name for t in item_types)) if len(unique_types) == 1: return TypeInfo(type_name="array", items=item_types[0]) return TypeInfo( type_name="array", items=TypeInfo( type_name="object", properties={ "_type": TypeInfo(type_name="string", enum_values=unique_types) } ) ) def _merge_object_properties(self, item_types: List[TypeInfo]) -> TypeInfo: if not item_types: return TypeInfo(type_name="object") all_props: Dict[str, List[TypeInfo]] = {} for item_type in item_types: if item_type.properties: for prop_name, prop_type in item_type.properties.items(): if prop_name not in all_props: all_props[prop_name] = [] all_props[prop_name].append(prop_type) merged_props: Dict[str, TypeInfo] = {} for prop_name, types in all_props.items(): if len(types) == 1: merged_props[prop_name] = types[0] else: merged_props[prop_name] = self._merge_types(types) return TypeInfo(type_name="object", properties=merged_props) def _merge_types(self, types: List[TypeInfo]) -> TypeInfo: type_names = set(t.type_name for t in types) if len(type_names) == 1: return types[0] if "null" in type_names: non_null = [t for t in types if t.type_name != "null"] if non_null: merged = self._merge_types(non_null) merged.nullable = True return merged return TypeInfo(type_name="null") if "integer" in type_names and "number" in type_names: return TypeInfo(type_name="number") return TypeInfo(type_name="string") def _analyze_object(self, value: dict, path: str) -> TypeInfo: if len(path) > self.max_depth * 3: return TypeInfo(type_name="object") properties: Dict[str, TypeInfo] = {} required_fields: List[str] = [] for key, val in value.items(): prop_type = self._analyze_value(val, f"{path}.{key}") properties[key] = prop_type required_fields.append(key) return TypeInfo( type_name="object", properties=properties ) def _is_date(self, value: str) -> bool: from datetime import datetime try: datetime.strptime(value, "%Y-%m-%d") return True except ValueError: return False def _is_date_time(self, value: str) -> bool: from datetime import datetime formats = [ "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%fZ", ] for fmt in formats: try: datetime.strptime(value, fmt) return True except ValueError: continue return False def _is_email(self, value: str) -> bool: import re pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return bool(re.match(pattern, value)) def _is_uri(self, value: str) -> bool: import re pattern = r"^[a-zA-Z][a-zA-Z0+.-]*://[^\\s]+$" return bool(re.match(pattern, value)) def _is_uuid(self, value: str) -> bool: import re pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" return bool(re.match(pattern, value.lower())) def _is_byte(self, value: str) -> bool: try: value.encode('ascii') return True except UnicodeEncodeError: return False def parse_json_file(file_path: str) -> Any: """Parse a JSON file and return its contents.""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except JSONDecodeError as e: raise ValueError(f"Invalid JSON in {file_path}: {e}") except FileNotFoundError: raise ValueError(f"File not found: {file_path}") except PermissionError: raise ValueError(f"Permission denied: {file_path}")