diff --git a/src/schema2mock/core/schema_parser.py b/src/schema2mock/core/schema_parser.py index 70705e7..c47e883 100644 --- a/src/schema2mock/core/schema_parser.py +++ b/src/schema2mock/core/schema_parser.py @@ -1 +1,293 @@ -read \ No newline at end of file +```python +"""Schema parsing module for JSON Schema and OpenAPI specifications.""" + +import json +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Union + +import requests +from openapi_spec_validator import validate + + +class SchemaParseError(Exception): + """Raised when schema parsing fails.""" + pass + + +class SchemaParser(ABC): + """Abstract base class for schema parsers.""" + + def __init__(self, schema: Union[Dict[str, Any], str]): + self.schema = self._load_schema(schema) + self._refs = {} + + def _load_schema(self, schema: Union[Dict[str, Any], str]) -> Dict[str, Any]: + """Load schema from dict, file path, or URL.""" + if isinstance(schema, dict): + return schema + elif isinstance(schema, str): + if schema.startswith("http://") or schema.startswith("https://"): + return self._fetch_schema_from_url(schema) + else: + return self._load_schema_from_file(schema) + else: + raise SchemaParseError(f"Unsupported schema source type: {type(schema)}") + + def _fetch_schema_from_url(self, url: str, timeout: int = 30) -> Dict[str, Any]: + """Fetch schema from a URL.""" + try: + response = requests.get(url, timeout=timeout) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + raise SchemaParseError(f"Failed to fetch schema from {url}: {e}") + + def _load_schema_from_file(self, file_path: str) -> Dict[str, Any]: + """Load schema from a file.""" + try: + with open(file_path, "r", encoding="utf-8") as f: + return json.load(f) + except (IOError, json.JSONDecodeError) as e: + raise SchemaParseError(f"Failed to load schema from {file_path}: {e}") + + @abstractmethod + def parse(self) -> Dict[str, Any]: + """Parse the schema and return structured data.""" + pass + + def resolve_ref(self, ref: str) -> Dict[str, Any]: + """Resolve a $ref reference.""" + if ref in self._refs: + return self._refs[ref] + + if ref.startswith("#/"): + parts = ref.lstrip("#/").split("/") + current = self.schema + for part in parts: + part = part.replace("~1", "/").replace("~0", "~") + if isinstance(current, dict): + current = current.get(part, {}) + else: + current = {} + self._refs[ref] = current + return current + + raise SchemaParseError(f"Unresolved reference: {ref}") + + def extract_type(self, schema: Dict[str, Any]) -> Optional[str]: + """Extract the type from a schema.""" + return schema.get("type") + + def extract_format(self, schema: Dict[str, Any]) -> Optional[str]: + """Extract the format from a schema.""" + return schema.get("format") + + def extract_constraints(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Extract all constraints from a schema.""" + constraints = {} + + if "type" in schema: + constraints["type"] = schema["type"] + + if "minimum" in schema: + constraints["minimum"] = schema["minimum"] + if "maximum" in schema: + constraints["maximum"] = schema["maximum"] + if "exclusiveMinimum" in schema: + constraints["exclusiveMinimum"] = schema["exclusiveMinimum"] + if "exclusiveMaximum" in schema: + constraints["exclusiveMaximum"] = schema["exclusiveMaximum"] + if "multipleOf" in schema: + constraints["multipleOf"] = schema["multipleOf"] + + if "minLength" in schema: + constraints["minLength"] = schema["minLength"] + if "maxLength" in schema: + constraints["maxLength"] = schema["maxLength"] + if "pattern" in schema: + constraints["pattern"] = schema["pattern"] + + if "minItems" in schema: + constraints["minItems"] = schema["minItems"] + if "maxItems" in schema: + constraints["maxItems"] = schema["maxItems"] + if "uniqueItems" in schema: + constraints["uniqueItems"] = schema["uniqueItems"] + + if "enum" in schema: + constraints["enum"] = schema["enum"] + if "const" in schema: + constraints["const"] = schema["const"] + + if "default" in schema: + constraints["default"] = schema["default"] + + return constraints + + def extract_properties(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Extract properties from a schema.""" + return schema.get("properties", {}) + + def extract_items(self, schema: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Extract items schema from an array schema.""" + return schema.get("items") + + def extract_required(self, schema: Dict[str, Any]) -> List[str]: + """Extract required properties from a schema.""" + return schema.get("required", []) + + +class JsonSchemaParser(SchemaParser): + """Parser for JSON Schema documents.""" + + def parse(self) -> Dict[str, Any]: + """Parse a JSON Schema and return operations/schemas.""" + schema = self.resolve_schema(self.schema) + + if "title" in schema: + return { + "title": schema.get("title"), + "description": schema.get("description"), + "type": schema.get("type"), + "properties": schema.get("properties", {}), + "required": schema.get("required", []), + "definitions": schema.get("definitions", schema.get("$defs", {})), + } + + return schema + + def resolve_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Resolve a schema, handling $ref and composition operators.""" + if "$ref" in schema: + ref = schema["$ref"] + return self.resolve_ref(ref) + + result = {} + for key, value in schema.items(): + if key == "allOf": + result["allOf"] = [self.resolve_schema(s) for s in value] + elif key == "anyOf": + result["anyOf"] = [self.resolve_schema(s) for s in value] + elif key == "oneOf": + result["oneOf"] = [self.resolve_schema(s) for s in value] + elif key == "not": + result["not"] = self.resolve_schema(value) + elif isinstance(value, dict): + result[key] = self.resolve_schema(value) + else: + result[key] = value + + return result + + +class OpenApiParser(SchemaParser): + """Parser for OpenAPI 3.x specifications.""" + + def __init__(self, schema: Union[Dict[str, Any], str]): + super().__init__(schema) + self._validate_spec() + + def _validate_spec(self) -> None: + """Validate the OpenAPI specification.""" + try: + validate(self.schema) + except Exception as e: + raise SchemaParseError(f"Invalid OpenAPI specification: {e}") + + def parse(self) -> Dict[str, Any]: + """Parse an OpenAPI spec and extract all operations with their schemas.""" + result = { + "title": self.schema.get("info", {}).get("title", "Untitled"), + "version": self.schema.get("info", {}).get("version", "1.0.0"), + "description": self.schema.get("info", {}).get("description"), + "operations": [], + } + + paths = self.schema.get("paths", {}) + for path, path_item in paths.items(): + for method, operation in path_item.items(): + if method not in ("get", "post", "put", "patch", "delete", "options", "head"): + continue + + op_data = { + "path": path, + "method": method.upper(), + "operationId": operation.get("operationId"), + "summary": operation.get("summary"), + "description": operation.get("description"), + "requestBody": self._extract_request_body(operation), + "responses": self._extract_responses(operation), + } + result["operations"].append(op_data) + + return result + + def _extract_request_body(self, operation: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Extract request body schema from an operation.""" + request_body = operation.get("requestBody") + if not request_body: + return None + + content = request_body.get("content", {}) + json_content = content.get("application/json") + if not json_content: + return None + + return json_content.get("schema") + + def _extract_responses(self, operation: Dict[str, Any]) -> Dict[str, Any]: + """Extract response schemas from an operation.""" + responses = {} + for status_code, response in operation.get("responses", {}).items(): + content = response.get("content", {}) + json_content = content.get("application/json") + if json_content: + responses[status_code] = { + "description": response.get("description", ""), + "schema": json_content.get("schema"), + } + else: + responses[status_code] = { + "description": response.get("description", ""), + "schema": None, + } + return responses + + def resolve_schema(self, schema: Dict[str, Any], base_path: str = "") -> Dict[str, Any]: + """Resolve a schema, handling $ref and composition operators.""" + if "$ref" in schema: + ref = schema["$ref"] + return self._resolve_openapi_ref(ref, base_path) + + result = {} + for key, value in schema.items(): + if key == "allOf": + result["allOf"] = [self.resolve_schema(s, base_path) for s in value] + elif key == "anyOf": + result["anyOf"] = [self.resolve_schema(s, base_path) for s in value] + elif key == "oneOf": + result["oneOf"] = [self.resolve_schema(s, base_path) for s in value] + elif key == "not": + result["not"] = self.resolve_schema(value, base_path) + elif isinstance(value, dict): + result[key] = self.resolve_schema(value, base_path) + else: + result[key] = value + + return result + + def _resolve_openapi_ref(self, ref: str, base_path: str = "") -> Dict[str, Any]: + """Resolve an OpenAPI $ref.""" + if ref.startswith("#/"): + parts = ref.lstrip("#/").split("/") + current = self.schema + for part in parts: + part = part.replace("~1", "/").replace("~0", "~") + if isinstance(current, dict): + current = current.get(part, {}) + else: + current = {} + return current + + raise SchemaParseError(f"Unresolved reference: {ref}") +``` \ No newline at end of file