Initial upload with comprehensive README and tests
This commit is contained in:
236
json_to_openapi/analyzer.py
Normal file
236
json_to_openapi/analyzer.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""JSON type inference and analysis module."""
|
||||
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
from dataclasses import dataclass
|
||||
from json import JSONDecodeError
|
||||
import json
|
||||
|
||||
|
||||
@dataclass
|
||||
class TypeInfo:
|
||||
"""Stores type information for a JSON value."""
|
||||
type_name: str
|
||||
format: Optional[str] = None
|
||||
properties: Optional[Dict[str, 'TypeInfo']] = None
|
||||
items: Optional['TypeInfo'] = None
|
||||
enum_values: Optional[List[Any]] = None
|
||||
nullable: bool = False
|
||||
|
||||
|
||||
class JsonAnalyzer:
|
||||
"""Analyzes JSON data and infers OpenAPI schema types."""
|
||||
|
||||
def __init__(self, max_depth: int = 10):
|
||||
self.max_depth = max_depth
|
||||
self._visited_paths: Set[str] = set()
|
||||
|
||||
def analyze(self, data: Any) -> TypeInfo:
|
||||
"""Analyze JSON data and return type information."""
|
||||
self._visited_paths.clear()
|
||||
return self._analyze_value(data, "$")
|
||||
|
||||
def _analyze_value(self, value: Any, path: str) -> TypeInfo:
|
||||
if path in self._visited_paths:
|
||||
return TypeInfo(type_name="object", properties={})
|
||||
|
||||
if value is None:
|
||||
return TypeInfo(type_name="null")
|
||||
|
||||
value_type = type(value)
|
||||
|
||||
if value_type is str:
|
||||
return self._analyze_string(value)
|
||||
elif value_type is int:
|
||||
return self._analyze_integer(value)
|
||||
elif value_type is float:
|
||||
return self._analyze_number(value)
|
||||
elif value_type is bool:
|
||||
return TypeInfo(type_name="boolean")
|
||||
elif value_type is list:
|
||||
return self._analyze_array(value, path)
|
||||
elif value_type is dict:
|
||||
self._visited_paths.add(path)
|
||||
result = self._analyze_object(value, path)
|
||||
return result
|
||||
else:
|
||||
return TypeInfo(type_name="string")
|
||||
|
||||
def _analyze_string(self, value: str) -> TypeInfo:
|
||||
type_info = TypeInfo(type_name="string")
|
||||
|
||||
if self._is_date(value):
|
||||
type_info.format = "date"
|
||||
elif self._is_date_time(value):
|
||||
type_info.format = "date-time"
|
||||
elif self._is_email(value):
|
||||
type_info.format = "email"
|
||||
elif self._is_uri(value):
|
||||
type_info.format = "uri"
|
||||
elif self._is_uuid(value):
|
||||
type_info.format = "uuid"
|
||||
elif self._is_byte(value):
|
||||
type_info.format = "byte"
|
||||
|
||||
return type_info
|
||||
|
||||
def _analyze_number(self, value: float) -> TypeInfo:
|
||||
return TypeInfo(type_name="number", format="double")
|
||||
|
||||
def _analyze_integer(self, value: int) -> TypeInfo:
|
||||
if value >= -2147483648 and value <= 2147483647:
|
||||
return TypeInfo(type_name="integer", format="int32")
|
||||
else:
|
||||
return TypeInfo(type_name="integer", format="int64")
|
||||
|
||||
def _analyze_array(self, value: list, path: str) -> TypeInfo:
|
||||
if not value:
|
||||
return TypeInfo(type_name="array", items=TypeInfo(type_name="string"))
|
||||
|
||||
type_counts: Dict[str, int] = {}
|
||||
item_types: List[TypeInfo] = []
|
||||
|
||||
for item in value:
|
||||
item_type = self._analyze_value(item, f"{path}[]")
|
||||
type_key = item_type.type_name
|
||||
type_counts[type_key] = type_counts.get(type_key, 0) + 1
|
||||
item_types.append(item_type)
|
||||
|
||||
if len(type_counts) == 1:
|
||||
only_type = list(type_counts.keys())[0]
|
||||
if only_type == "object":
|
||||
merged = self._merge_object_properties(item_types)
|
||||
return TypeInfo(type_name="array", items=merged)
|
||||
else:
|
||||
return TypeInfo(type_name="array", items=item_types[0])
|
||||
|
||||
unique_types = list(set(t.type_name for t in item_types))
|
||||
if len(unique_types) == 1:
|
||||
return TypeInfo(type_name="array", items=item_types[0])
|
||||
|
||||
return TypeInfo(
|
||||
type_name="array",
|
||||
items=TypeInfo(
|
||||
type_name="object",
|
||||
properties={
|
||||
"_type": TypeInfo(type_name="string", enum_values=unique_types)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
def _merge_object_properties(self, item_types: List[TypeInfo]) -> TypeInfo:
|
||||
if not item_types:
|
||||
return TypeInfo(type_name="object")
|
||||
|
||||
all_props: Dict[str, List[TypeInfo]] = {}
|
||||
|
||||
for item_type in item_types:
|
||||
if item_type.properties:
|
||||
for prop_name, prop_type in item_type.properties.items():
|
||||
if prop_name not in all_props:
|
||||
all_props[prop_name] = []
|
||||
all_props[prop_name].append(prop_type)
|
||||
|
||||
merged_props: Dict[str, TypeInfo] = {}
|
||||
|
||||
for prop_name, types in all_props.items():
|
||||
if len(types) == 1:
|
||||
merged_props[prop_name] = types[0]
|
||||
else:
|
||||
merged_props[prop_name] = self._merge_types(types)
|
||||
|
||||
return TypeInfo(type_name="object", properties=merged_props)
|
||||
|
||||
def _merge_types(self, types: List[TypeInfo]) -> TypeInfo:
|
||||
type_names = set(t.type_name for t in types)
|
||||
|
||||
if len(type_names) == 1:
|
||||
return types[0]
|
||||
|
||||
if "null" in type_names:
|
||||
non_null = [t for t in types if t.type_name != "null"]
|
||||
if non_null:
|
||||
merged = self._merge_types(non_null)
|
||||
merged.nullable = True
|
||||
return merged
|
||||
return TypeInfo(type_name="null")
|
||||
|
||||
if "integer" in type_names and "number" in type_names:
|
||||
return TypeInfo(type_name="number")
|
||||
|
||||
return TypeInfo(type_name="string")
|
||||
|
||||
def _analyze_object(self, value: dict, path: str) -> TypeInfo:
|
||||
if len(path) > self.max_depth * 3:
|
||||
return TypeInfo(type_name="object")
|
||||
|
||||
properties: Dict[str, TypeInfo] = {}
|
||||
required_fields: List[str] = []
|
||||
|
||||
for key, val in value.items():
|
||||
prop_type = self._analyze_value(val, f"{path}.{key}")
|
||||
properties[key] = prop_type
|
||||
required_fields.append(key)
|
||||
|
||||
return TypeInfo(
|
||||
type_name="object",
|
||||
properties=properties
|
||||
)
|
||||
|
||||
def _is_date(self, value: str) -> bool:
|
||||
from datetime import datetime
|
||||
try:
|
||||
datetime.strptime(value, "%Y-%m-%d")
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def _is_date_time(self, value: str) -> bool:
|
||||
from datetime import datetime
|
||||
formats = [
|
||||
"%Y-%m-%dT%H:%M:%S",
|
||||
"%Y-%m-%dT%H:%M:%SZ",
|
||||
"%Y-%m-%dT%H:%M:%S.%f",
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ",
|
||||
]
|
||||
for fmt in formats:
|
||||
try:
|
||||
datetime.strptime(value, fmt)
|
||||
return True
|
||||
except ValueError:
|
||||
continue
|
||||
return False
|
||||
|
||||
def _is_email(self, value: str) -> bool:
|
||||
import re
|
||||
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
||||
return bool(re.match(pattern, value))
|
||||
|
||||
def _is_uri(self, value: str) -> bool:
|
||||
import re
|
||||
pattern = r"^[a-zA-Z][a-zA-Z0+.-]*://[^\\s]+$"
|
||||
return bool(re.match(pattern, value))
|
||||
|
||||
def _is_uuid(self, value: str) -> bool:
|
||||
import re
|
||||
pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
|
||||
return bool(re.match(pattern, value.lower()))
|
||||
|
||||
def _is_byte(self, value: str) -> bool:
|
||||
try:
|
||||
value.encode('ascii')
|
||||
return True
|
||||
except UnicodeEncodeError:
|
||||
return False
|
||||
|
||||
|
||||
def parse_json_file(file_path: str) -> Any:
|
||||
"""Parse a JSON file and return its contents."""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except JSONDecodeError as e:
|
||||
raise ValueError(f"Invalid JSON in {file_path}: {e}")
|
||||
except FileNotFoundError:
|
||||
raise ValueError(f"File not found: {file_path}")
|
||||
except PermissionError:
|
||||
raise ValueError(f"Permission denied: {file_path}")
|
||||
Reference in New Issue
Block a user