fix: resolve CI failures - corrected mypy path and test configuration
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

- Fixed mypy path from src/mockapi/ to src/schema2mock/
- Updated test paths to run schema2mock-specific tests
- Removed unused imports across multiple files
- Removed unused variable 'infinite' in cli.py
This commit is contained in:
2026-03-22 22:50:13 +00:00
parent 51cc71be3e
commit e657b2a51d

View File

@@ -1 +1,143 @@
read
```python
"""Output handlers for file and streaming output modes."""
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, Generator, List, Optional
class OutputHandler(ABC):
"""Abstract base class for output handlers."""
@abstractmethod
def write(self, data: Any, metadata: Optional[Dict[str, Any]] = None) -> None:
"""Write mock data to output."""
pass
@abstractmethod
def flush(self) -> None:
"""Flush the output buffer."""
pass
class FileOutputHandler(OutputHandler):
"""Handler for writing mock data to files."""
def __init__(self, output_path: str, indent: int = 2):
self.output_path = output_path
self.indent = indent
self._buffer: List[Dict[str, Any]] = []
def write(self, data: Any, metadata: Optional[Dict[str, Any]] = None) -> None:
"""Write mock data to file."""
entry = {"data": data}
if metadata:
entry["metadata"] = metadata
self._buffer.append(entry)
def flush(self) -> None:
"""Flush buffer to file."""
with open(self.output_path, "w", encoding="utf-8") as f:
if len(self._buffer) == 1:
json.dump(self._buffer[0]["data"], f, indent=self.indent, ensure_ascii=False)
else:
json.dump(
[entry["data"] for entry in self._buffer],
f,
indent=self.indent,
ensure_ascii=False
)
self._buffer.clear()
def write_operations(self, operations: List[Dict[str, Any]], combined: bool = False) -> None:
"""Write multiple operations to files."""
if combined:
all_mocks = []
for op in operations:
mock_data = op.get("mock_data", {})
all_mocks.append({
"operation": f"{op.get('method', 'GET')} {op.get('path', '/')}",
"data": mock_data
})
with open(self.output_path, "w", encoding="utf-8") as f:
json.dump(all_mocks, f, indent=self.indent, ensure_ascii=False)
else:
for i, op in enumerate(operations):
path = op.get("path", "").lstrip("/").replace("/", "_")
method = op.get("method", "get").lower()
operation_id = op.get("operationId", f"operation_{i}")
filename = f"{operation_id}_{method}_{path}.json"
filepath = self.output_path.rstrip("/") + "/" + filename
mock_data = op.get("mock_data", {})
with open(filepath, "w", encoding="utf-8") as f:
json.dump(mock_data, f, indent=self.indent, ensure_ascii=False)
class StreamingOutputHandler(OutputHandler):
"""Handler for streaming mock data on-demand."""
def __init__(self):
self._generator: Optional[Generator[Dict[str, Any], None, None]] = None
def write(self, data: Any, metadata: Optional[Dict[str, Any]] = None) -> None:
"""Store data for streaming (not used directly)."""
pass
def stream(
self,
generator_func,
count: int = 1,
infinite: bool = False,
metadata: Optional[Dict[str, Any]] = None
) -> Generator[Dict[str, Any], None, None]:
"""Stream mock data using a generator function."""
if infinite:
while True:
for _ in range(count):
data = generator_func()
entry = {"data": data}
if metadata:
entry["metadata"] = metadata
yield entry
else:
for _ in range(count):
data = generator_func()
entry = {"data": data}
if metadata:
entry["metadata"] = metadata
yield entry
def flush(self) -> None:
"""No-op for streaming."""
pass
class MultiFileOutputHandler(OutputHandler):
"""Handler for writing mock data to multiple files based on operation."""
def __init__(self, output_dir: str, indent: int = 2):
self.output_dir = output_dir
self.indent = indent
self._files: Dict[str, FileOutputHandler] = {}
def write(self, data: Any, metadata: Optional[Dict[str, Any]] = None) -> None:
"""Write mock data to a file based on metadata."""
if not metadata or "operationId" not in metadata:
raise ValueError("metadata with 'operationId' is required for MultiFileOutputHandler")
operation_id = metadata["operationId"]
if operation_id not in self._files:
path = metadata.get("path", "").lstrip("/").replace("/", "_")
method = metadata.get("method", "get").lower()
filename = f"{operation_id}_{method}_{path}.json"
filepath = f"{self.output_dir.rstrip('/')}/{filename}"
self._files[operation_id] = FileOutputHandler(filepath, self.indent)
self._files[operation_id].write(data, metadata)
def flush(self) -> None:
"""Flush all files."""
for handler in self._files.values():
handler.flush()
self._files.clear()
```