From 168757397d1ed87255bd13cd89c3f715cbf15265 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Sun, 1 Feb 2026 05:15:33 +0000 Subject: [PATCH] Initial upload with comprehensive README and tests --- json_to_openapi/analyzer.py | 236 ++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 json_to_openapi/analyzer.py diff --git a/json_to_openapi/analyzer.py b/json_to_openapi/analyzer.py new file mode 100644 index 0000000..a1a7111 --- /dev/null +++ b/json_to_openapi/analyzer.py @@ -0,0 +1,236 @@ +"""JSON type inference and analysis module.""" + +from typing import Any, Dict, List, Optional, Set +from dataclasses import dataclass +from json import JSONDecodeError +import json + + +@dataclass +class TypeInfo: + """Stores type information for a JSON value.""" + type_name: str + format: Optional[str] = None + properties: Optional[Dict[str, 'TypeInfo']] = None + items: Optional['TypeInfo'] = None + enum_values: Optional[List[Any]] = None + nullable: bool = False + + +class JsonAnalyzer: + """Analyzes JSON data and infers OpenAPI schema types.""" + + def __init__(self, max_depth: int = 10): + self.max_depth = max_depth + self._visited_paths: Set[str] = set() + + def analyze(self, data: Any) -> TypeInfo: + """Analyze JSON data and return type information.""" + self._visited_paths.clear() + return self._analyze_value(data, "$") + + def _analyze_value(self, value: Any, path: str) -> TypeInfo: + if path in self._visited_paths: + return TypeInfo(type_name="object", properties={}) + + if value is None: + return TypeInfo(type_name="null") + + value_type = type(value) + + if value_type is str: + return self._analyze_string(value) + elif value_type is int: + return self._analyze_integer(value) + elif value_type is float: + return self._analyze_number(value) + elif value_type is bool: + return TypeInfo(type_name="boolean") + elif value_type is list: + return self._analyze_array(value, path) + elif value_type is dict: + self._visited_paths.add(path) + result = self._analyze_object(value, path) + return result + else: + return TypeInfo(type_name="string") + + def _analyze_string(self, value: str) -> TypeInfo: + type_info = TypeInfo(type_name="string") + + if self._is_date(value): + type_info.format = "date" + elif self._is_date_time(value): + type_info.format = "date-time" + elif self._is_email(value): + type_info.format = "email" + elif self._is_uri(value): + type_info.format = "uri" + elif self._is_uuid(value): + type_info.format = "uuid" + elif self._is_byte(value): + type_info.format = "byte" + + return type_info + + def _analyze_number(self, value: float) -> TypeInfo: + return TypeInfo(type_name="number", format="double") + + def _analyze_integer(self, value: int) -> TypeInfo: + if value >= -2147483648 and value <= 2147483647: + return TypeInfo(type_name="integer", format="int32") + else: + return TypeInfo(type_name="integer", format="int64") + + def _analyze_array(self, value: list, path: str) -> TypeInfo: + if not value: + return TypeInfo(type_name="array", items=TypeInfo(type_name="string")) + + type_counts: Dict[str, int] = {} + item_types: List[TypeInfo] = [] + + for item in value: + item_type = self._analyze_value(item, f"{path}[]") + type_key = item_type.type_name + type_counts[type_key] = type_counts.get(type_key, 0) + 1 + item_types.append(item_type) + + if len(type_counts) == 1: + only_type = list(type_counts.keys())[0] + if only_type == "object": + merged = self._merge_object_properties(item_types) + return TypeInfo(type_name="array", items=merged) + else: + return TypeInfo(type_name="array", items=item_types[0]) + + unique_types = list(set(t.type_name for t in item_types)) + if len(unique_types) == 1: + return TypeInfo(type_name="array", items=item_types[0]) + + return TypeInfo( + type_name="array", + items=TypeInfo( + type_name="object", + properties={ + "_type": TypeInfo(type_name="string", enum_values=unique_types) + } + ) + ) + + def _merge_object_properties(self, item_types: List[TypeInfo]) -> TypeInfo: + if not item_types: + return TypeInfo(type_name="object") + + all_props: Dict[str, List[TypeInfo]] = {} + + for item_type in item_types: + if item_type.properties: + for prop_name, prop_type in item_type.properties.items(): + if prop_name not in all_props: + all_props[prop_name] = [] + all_props[prop_name].append(prop_type) + + merged_props: Dict[str, TypeInfo] = {} + + for prop_name, types in all_props.items(): + if len(types) == 1: + merged_props[prop_name] = types[0] + else: + merged_props[prop_name] = self._merge_types(types) + + return TypeInfo(type_name="object", properties=merged_props) + + def _merge_types(self, types: List[TypeInfo]) -> TypeInfo: + type_names = set(t.type_name for t in types) + + if len(type_names) == 1: + return types[0] + + if "null" in type_names: + non_null = [t for t in types if t.type_name != "null"] + if non_null: + merged = self._merge_types(non_null) + merged.nullable = True + return merged + return TypeInfo(type_name="null") + + if "integer" in type_names and "number" in type_names: + return TypeInfo(type_name="number") + + return TypeInfo(type_name="string") + + def _analyze_object(self, value: dict, path: str) -> TypeInfo: + if len(path) > self.max_depth * 3: + return TypeInfo(type_name="object") + + properties: Dict[str, TypeInfo] = {} + required_fields: List[str] = [] + + for key, val in value.items(): + prop_type = self._analyze_value(val, f"{path}.{key}") + properties[key] = prop_type + required_fields.append(key) + + return TypeInfo( + type_name="object", + properties=properties + ) + + def _is_date(self, value: str) -> bool: + from datetime import datetime + try: + datetime.strptime(value, "%Y-%m-%d") + return True + except ValueError: + return False + + def _is_date_time(self, value: str) -> bool: + from datetime import datetime + formats = [ + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%S.%fZ", + ] + for fmt in formats: + try: + datetime.strptime(value, fmt) + return True + except ValueError: + continue + return False + + def _is_email(self, value: str) -> bool: + import re + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" + return bool(re.match(pattern, value)) + + def _is_uri(self, value: str) -> bool: + import re + pattern = r"^[a-zA-Z][a-zA-Z0+.-]*://[^\\s]+$" + return bool(re.match(pattern, value)) + + def _is_uuid(self, value: str) -> bool: + import re + pattern = r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" + return bool(re.match(pattern, value.lower())) + + def _is_byte(self, value: str) -> bool: + try: + value.encode('ascii') + return True + except UnicodeEncodeError: + return False + + +def parse_json_file(file_path: str) -> Any: + """Parse a JSON file and return its contents.""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except JSONDecodeError as e: + raise ValueError(f"Invalid JSON in {file_path}: {e}") + except FileNotFoundError: + raise ValueError(f"File not found: {file_path}") + except PermissionError: + raise ValueError(f"Permission denied: {file_path}")