fix: resolve CI failures - corrected mypy path and test configuration
Some checks failed
CI / build (push) Has been cancelled
CI / test (push) Has been cancelled

- Fixed mypy path from src/mockapi/ to src/schema2mock/
- Updated test paths to run schema2mock-specific tests
- Removed unused imports across multiple files
- Removed unused variable 'infinite' in cli.py
This commit is contained in:
2026-03-22 22:50:13 +00:00
parent e657b2a51d
commit 802aed3860

View File

@@ -1 +1,293 @@
read ```python
"""Schema parsing module for JSON Schema and OpenAPI specifications."""
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
import requests
from openapi_spec_validator import validate
class SchemaParseError(Exception):
"""Raised when schema parsing fails."""
pass
class SchemaParser(ABC):
"""Abstract base class for schema parsers."""
def __init__(self, schema: Union[Dict[str, Any], str]):
self.schema = self._load_schema(schema)
self._refs = {}
def _load_schema(self, schema: Union[Dict[str, Any], str]) -> Dict[str, Any]:
"""Load schema from dict, file path, or URL."""
if isinstance(schema, dict):
return schema
elif isinstance(schema, str):
if schema.startswith("http://") or schema.startswith("https://"):
return self._fetch_schema_from_url(schema)
else:
return self._load_schema_from_file(schema)
else:
raise SchemaParseError(f"Unsupported schema source type: {type(schema)}")
def _fetch_schema_from_url(self, url: str, timeout: int = 30) -> Dict[str, Any]:
"""Fetch schema from a URL."""
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise SchemaParseError(f"Failed to fetch schema from {url}: {e}")
def _load_schema_from_file(self, file_path: str) -> Dict[str, Any]:
"""Load schema from a file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
return json.load(f)
except (IOError, json.JSONDecodeError) as e:
raise SchemaParseError(f"Failed to load schema from {file_path}: {e}")
@abstractmethod
def parse(self) -> Dict[str, Any]:
"""Parse the schema and return structured data."""
pass
def resolve_ref(self, ref: str) -> Dict[str, Any]:
"""Resolve a $ref reference."""
if ref in self._refs:
return self._refs[ref]
if ref.startswith("#/"):
parts = ref.lstrip("#/").split("/")
current = self.schema
for part in parts:
part = part.replace("~1", "/").replace("~0", "~")
if isinstance(current, dict):
current = current.get(part, {})
else:
current = {}
self._refs[ref] = current
return current
raise SchemaParseError(f"Unresolved reference: {ref}")
def extract_type(self, schema: Dict[str, Any]) -> Optional[str]:
"""Extract the type from a schema."""
return schema.get("type")
def extract_format(self, schema: Dict[str, Any]) -> Optional[str]:
"""Extract the format from a schema."""
return schema.get("format")
def extract_constraints(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Extract all constraints from a schema."""
constraints = {}
if "type" in schema:
constraints["type"] = schema["type"]
if "minimum" in schema:
constraints["minimum"] = schema["minimum"]
if "maximum" in schema:
constraints["maximum"] = schema["maximum"]
if "exclusiveMinimum" in schema:
constraints["exclusiveMinimum"] = schema["exclusiveMinimum"]
if "exclusiveMaximum" in schema:
constraints["exclusiveMaximum"] = schema["exclusiveMaximum"]
if "multipleOf" in schema:
constraints["multipleOf"] = schema["multipleOf"]
if "minLength" in schema:
constraints["minLength"] = schema["minLength"]
if "maxLength" in schema:
constraints["maxLength"] = schema["maxLength"]
if "pattern" in schema:
constraints["pattern"] = schema["pattern"]
if "minItems" in schema:
constraints["minItems"] = schema["minItems"]
if "maxItems" in schema:
constraints["maxItems"] = schema["maxItems"]
if "uniqueItems" in schema:
constraints["uniqueItems"] = schema["uniqueItems"]
if "enum" in schema:
constraints["enum"] = schema["enum"]
if "const" in schema:
constraints["const"] = schema["const"]
if "default" in schema:
constraints["default"] = schema["default"]
return constraints
def extract_properties(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Extract properties from a schema."""
return schema.get("properties", {})
def extract_items(self, schema: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Extract items schema from an array schema."""
return schema.get("items")
def extract_required(self, schema: Dict[str, Any]) -> List[str]:
"""Extract required properties from a schema."""
return schema.get("required", [])
class JsonSchemaParser(SchemaParser):
"""Parser for JSON Schema documents."""
def parse(self) -> Dict[str, Any]:
"""Parse a JSON Schema and return operations/schemas."""
schema = self.resolve_schema(self.schema)
if "title" in schema:
return {
"title": schema.get("title"),
"description": schema.get("description"),
"type": schema.get("type"),
"properties": schema.get("properties", {}),
"required": schema.get("required", []),
"definitions": schema.get("definitions", schema.get("$defs", {})),
}
return schema
def resolve_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve a schema, handling $ref and composition operators."""
if "$ref" in schema:
ref = schema["$ref"]
return self.resolve_ref(ref)
result = {}
for key, value in schema.items():
if key == "allOf":
result["allOf"] = [self.resolve_schema(s) for s in value]
elif key == "anyOf":
result["anyOf"] = [self.resolve_schema(s) for s in value]
elif key == "oneOf":
result["oneOf"] = [self.resolve_schema(s) for s in value]
elif key == "not":
result["not"] = self.resolve_schema(value)
elif isinstance(value, dict):
result[key] = self.resolve_schema(value)
else:
result[key] = value
return result
class OpenApiParser(SchemaParser):
"""Parser for OpenAPI 3.x specifications."""
def __init__(self, schema: Union[Dict[str, Any], str]):
super().__init__(schema)
self._validate_spec()
def _validate_spec(self) -> None:
"""Validate the OpenAPI specification."""
try:
validate(self.schema)
except Exception as e:
raise SchemaParseError(f"Invalid OpenAPI specification: {e}")
def parse(self) -> Dict[str, Any]:
"""Parse an OpenAPI spec and extract all operations with their schemas."""
result = {
"title": self.schema.get("info", {}).get("title", "Untitled"),
"version": self.schema.get("info", {}).get("version", "1.0.0"),
"description": self.schema.get("info", {}).get("description"),
"operations": [],
}
paths = self.schema.get("paths", {})
for path, path_item in paths.items():
for method, operation in path_item.items():
if method not in ("get", "post", "put", "patch", "delete", "options", "head"):
continue
op_data = {
"path": path,
"method": method.upper(),
"operationId": operation.get("operationId"),
"summary": operation.get("summary"),
"description": operation.get("description"),
"requestBody": self._extract_request_body(operation),
"responses": self._extract_responses(operation),
}
result["operations"].append(op_data)
return result
def _extract_request_body(self, operation: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Extract request body schema from an operation."""
request_body = operation.get("requestBody")
if not request_body:
return None
content = request_body.get("content", {})
json_content = content.get("application/json")
if not json_content:
return None
return json_content.get("schema")
def _extract_responses(self, operation: Dict[str, Any]) -> Dict[str, Any]:
"""Extract response schemas from an operation."""
responses = {}
for status_code, response in operation.get("responses", {}).items():
content = response.get("content", {})
json_content = content.get("application/json")
if json_content:
responses[status_code] = {
"description": response.get("description", ""),
"schema": json_content.get("schema"),
}
else:
responses[status_code] = {
"description": response.get("description", ""),
"schema": None,
}
return responses
def resolve_schema(self, schema: Dict[str, Any], base_path: str = "") -> Dict[str, Any]:
"""Resolve a schema, handling $ref and composition operators."""
if "$ref" in schema:
ref = schema["$ref"]
return self._resolve_openapi_ref(ref, base_path)
result = {}
for key, value in schema.items():
if key == "allOf":
result["allOf"] = [self.resolve_schema(s, base_path) for s in value]
elif key == "anyOf":
result["anyOf"] = [self.resolve_schema(s, base_path) for s in value]
elif key == "oneOf":
result["oneOf"] = [self.resolve_schema(s, base_path) for s in value]
elif key == "not":
result["not"] = self.resolve_schema(value, base_path)
elif isinstance(value, dict):
result[key] = self.resolve_schema(value, base_path)
else:
result[key] = value
return result
def _resolve_openapi_ref(self, ref: str, base_path: str = "") -> Dict[str, Any]:
"""Resolve an OpenAPI $ref."""
if ref.startswith("#/"):
parts = ref.lstrip("#/").split("/")
current = self.schema
for part in parts:
part = part.replace("~1", "/").replace("~0", "~")
if isinstance(current, dict):
current = current.get(part, {})
else:
current = {}
return current
raise SchemaParseError(f"Unresolved reference: {ref}")
```