Compare commits
42 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6f2d016b2c | |||
| 6cb15ff2da | |||
| d44dde5f7a | |||
| c1f1349931 | |||
| 40b81a4527 | |||
| 2e4208b747 | |||
| eda8c4866a | |||
| 91636b903a | |||
| 64d6c53ce8 | |||
| af1ef2103c | |||
| 7247252af2 | |||
| 42c5592b0b | |||
| f12538887b | |||
| 06edc1e0ae | |||
| 963bacba94 | |||
| 6ed6249697 | |||
| dd868731d6 | |||
| eae4514c25 | |||
| e71e84dc80 | |||
| 76686383ae | |||
| 42a7f3222b | |||
| 5f76ce919e | |||
| 1efb120abb | |||
| 6bb16a25a6 | |||
| 5422a0cc1b | |||
| 4a705d233f | |||
| 8ecdd03b29 | |||
| 79d2a57063 | |||
| a8f8ebe13e | |||
| 0823e7ad9d | |||
| 0a7b7ac9ad | |||
| a903500829 | |||
| 92b4281bde | |||
| 1bc1900d95 | |||
| c30f495048 | |||
| 736e58ebdd | |||
| aa7c813bd5 | |||
| 41c43e0da1 | |||
| 51509180fb | |||
| bc4e81a293 | |||
| ec28cca909 | |||
| 7a479aa1ed |
@@ -1,15 +1,14 @@
|
||||
name: CI
|
||||
name: CI/CD
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main, master]
|
||||
branches: [main]
|
||||
pull_request:
|
||||
branches: [main, master]
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
@@ -23,11 +22,14 @@ jobs:
|
||||
python -m pip install --upgrade pip
|
||||
pip install -e ".[dev]"
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
PYTHONPATH=src python -m pytest tests/ -v --tb=short
|
||||
- name: Run linter
|
||||
run: ruff check src/ tests/
|
||||
|
||||
- name: Run linting
|
||||
run: |
|
||||
pip install ruff
|
||||
ruff check .
|
||||
- name: Run type checker
|
||||
run: mypy src/ --ignore-missing-imports
|
||||
|
||||
- name: Run tests
|
||||
run: pytest tests/ -v --tb=short
|
||||
|
||||
- name: Run coverage
|
||||
run: pytest tests/ --cov=src --cov-report=term-missing
|
||||
|
||||
39
.gitignore
vendored
39
.gitignore
vendored
@@ -1,4 +1,3 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
@@ -19,42 +18,14 @@ wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# Virtual Environment
|
||||
venv/
|
||||
.env
|
||||
.venv/
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
|
||||
# IDE
|
||||
.vscode/
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# Testing
|
||||
*.log
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
htmlcov/
|
||||
.nox/
|
||||
.coverage
|
||||
|
||||
# Type checking
|
||||
.mypy_cache/
|
||||
.ruff_cache/
|
||||
|
||||
# Logs
|
||||
logs/
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Environment
|
||||
.env
|
||||
.env.local
|
||||
|
||||
# Database
|
||||
*.db
|
||||
*.sqlite
|
||||
.mypy_cache/
|
||||
|
||||
179
README.md
179
README.md
@@ -1,163 +1,118 @@
|
||||
# LocalAPI Docs
|
||||
|
||||
A CLI tool that generates local, privacy-focused API documentation server from OpenAPI/Swagger specs with interactive endpoint testing, automatic examples, and multiple output formats running entirely locally.
|
||||
|
||||
## Features
|
||||
|
||||
- **Local Documentation Server** - Serve interactive HTML docs locally with no external dependencies
|
||||
- **Multiple Output Formats** - Generate HTML, Markdown, or JSON documentation
|
||||
- **Automatic Example Generation** - Creates realistic request/response examples from your schemas
|
||||
- **Full-Text Search** - Search across endpoints, tags, and descriptions
|
||||
- **Spec Validation** - Validate OpenAPI specs with detailed error messages
|
||||
- **Privacy-Focused** - Everything runs locally, no data leaves your machine
|
||||
A CLI tool that generates local, privacy-focused API documentation server from OpenAPI/Swagger specs. Features include interactive endpoint testing with mock requests, automatic request/response examples, search functionality, and multiple output formats (HTML, Markdown, JSON). Runs entirely locally with no data leaving the machine.
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
# Install from source
|
||||
pip install -e .
|
||||
pip install localapi-docs
|
||||
```
|
||||
|
||||
# Or install with dev dependencies
|
||||
pip install -e .[dev]
|
||||
Or from source:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/yourusername/localapi-docs.git
|
||||
cd localapi-docs
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Serve Interactive Documentation
|
||||
|
||||
Start a local web server with interactive API documentation:
|
||||
### Serve Interactive HTML Documentation
|
||||
|
||||
```bash
|
||||
localapi-docs serve petstore.json
|
||||
localapi-docs serve petstore.yaml --host 0.0.0.0 --port 8080
|
||||
localapi-docs serve openapi.yaml
|
||||
```
|
||||
|
||||
The server will automatically open in your default browser.
|
||||
This starts a local web server at `http://127.0.0.1:8080` with interactive API documentation. You can test endpoints directly in the browser.
|
||||
|
||||
### Generate Static Documentation
|
||||
|
||||
Generate documentation in various formats:
|
||||
|
||||
Generate HTML documentation:
|
||||
```bash
|
||||
# Generate HTML documentation
|
||||
localapi-docs generate petstore.json --format html
|
||||
|
||||
# Generate Markdown documentation
|
||||
localapi-docs generate petstore.json --format markdown
|
||||
|
||||
# Generate JSON documentation
|
||||
localapi-docs generate petstore.json --format json
|
||||
|
||||
# Generate all formats
|
||||
localapi-docs generate petstore.json --format all
|
||||
localapi-docs generate openapi.yaml -o docs.html
|
||||
```
|
||||
|
||||
### Validate OpenAPI Specs
|
||||
Generate Markdown documentation:
|
||||
```bash
|
||||
localapi-docs generate openapi.yaml -o docs.md --format markdown
|
||||
```
|
||||
|
||||
Validate your OpenAPI specification file:
|
||||
Generate JSON documentation:
|
||||
```bash
|
||||
localapi-docs generate openapi.yaml -o docs.json --format json
|
||||
```
|
||||
|
||||
### Validate OpenAPI Specification
|
||||
|
||||
```bash
|
||||
localapi-docs validate petstore.json
|
||||
localapi-docs validate petstore.yaml --json
|
||||
localapi-docs validate openapi.yaml
|
||||
```
|
||||
|
||||
### Search Endpoints
|
||||
|
||||
Search for endpoints in your API specification:
|
||||
|
||||
```bash
|
||||
localapi-docs search petstore.json users
|
||||
localapi-docs search petstore.json "get pet" --limit 10
|
||||
localapi-docs search petstore.json users --json
|
||||
localapi-docs search openapi.yaml "users list"
|
||||
```
|
||||
|
||||
## Commands
|
||||
|
||||
### serve
|
||||
| Command | Description |
|
||||
|---------|-------------|
|
||||
| `serve` | Serve interactive HTML documentation locally |
|
||||
| `generate` | Generate documentation in HTML, Markdown, or JSON format |
|
||||
| `validate` | Validate an OpenAPI specification file |
|
||||
| `search` | Search for endpoints in an OpenAPI specification |
|
||||
|
||||
Start an interactive HTML documentation server.
|
||||
### Serve Options
|
||||
|
||||
Options:
|
||||
- `--host, -h`: Host to bind to (default: 127.0.0.1)
|
||||
- `--port, -p`: Port to serve on (default: 8080)
|
||||
- `--no-browser`: Don't open browser automatically
|
||||
| Option | Default | Description |
|
||||
|--------|---------|-------------|
|
||||
| `--host` | `127.0.0.1` | Host to bind the server to |
|
||||
| `--port` | `8080` | Port to bind the server to |
|
||||
|
||||
### generate
|
||||
### Generate Options
|
||||
|
||||
Generate static documentation in various formats.
|
||||
| Option | Default | Description |
|
||||
|--------|---------|-------------|
|
||||
| `--output, -o` | Auto-generated | Output file path |
|
||||
| `--format` | `html` | Output format (html, markdown, json) |
|
||||
| `--template` | None | Custom template file path |
|
||||
|
||||
Options:
|
||||
- `--output, -o`: Output file or directory
|
||||
- `--format, -f`: Output format (html, markdown, json, all)
|
||||
- `--open`: Open the generated file in browser
|
||||
## Features
|
||||
|
||||
### validate
|
||||
|
||||
Validate an OpenAPI specification file.
|
||||
|
||||
Options:
|
||||
- `--json`: Output as JSON
|
||||
|
||||
### search
|
||||
|
||||
Search for endpoints in an OpenAPI specification.
|
||||
|
||||
Options:
|
||||
- `--limit, -l`: Maximum results (default: 10)
|
||||
- `--json`: Output as JSON
|
||||
|
||||
## Examples
|
||||
|
||||
### Generate and Open HTML Docs
|
||||
|
||||
```bash
|
||||
localapi-docs generate api-spec.yaml --format html --open
|
||||
```
|
||||
|
||||
### Serve on Custom Port
|
||||
|
||||
```bash
|
||||
localapi-docs serve api-spec.json --port 3000
|
||||
```
|
||||
|
||||
### Validate with JSON Output
|
||||
|
||||
```bash
|
||||
localapi-docs validate api-spec.json --json
|
||||
```
|
||||
|
||||
### Search and Get JSON Results
|
||||
|
||||
```bash
|
||||
localapi-docs search api-spec.json users --json
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
No configuration files required. All options are passed via CLI arguments.
|
||||
- **Privacy-First**: All processing happens locally. No data leaves your machine.
|
||||
- **Interactive Testing**: Test API endpoints directly from the HTML documentation.
|
||||
- **Multiple Formats**: Generate HTML, Markdown, or JSON documentation.
|
||||
- **Search**: Full-text search across endpoints, tags, and descriptions.
|
||||
- **Automatic Examples**: Auto-generate request/response examples from schemas.
|
||||
- **OpenAPI 3.0/3.1**: Full support for modern OpenAPI specifications.
|
||||
|
||||
## Development
|
||||
|
||||
Install development dependencies:
|
||||
|
||||
```bash
|
||||
# Install development dependencies
|
||||
pip install -e .[dev]
|
||||
|
||||
# Run tests
|
||||
pytest tests/ -v
|
||||
|
||||
# Run with coverage
|
||||
pytest tests/ --cov=src --cov-report=term-missing
|
||||
pip install -e ".[dev]"
|
||||
```
|
||||
|
||||
## Supported OpenAPI Versions
|
||||
Run tests:
|
||||
|
||||
- OpenAPI 3.0.x
|
||||
- OpenAPI 3.1.x
|
||||
```bash
|
||||
pytest tests/ -v
|
||||
```
|
||||
|
||||
## Input Formats
|
||||
Run linter:
|
||||
|
||||
- JSON (`.json`)
|
||||
- YAML (`.yaml`, `.yml`)
|
||||
```bash
|
||||
ruff check src/ tests/
|
||||
```
|
||||
|
||||
Run type checker:
|
||||
|
||||
```bash
|
||||
mypy src/ --ignore-missing-imports
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
|
||||
92
app/localapi-docs/src/cli.py
Normal file
92
app/localapi-docs/src/cli.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import click
|
||||
from pathlib import Path
|
||||
import json
|
||||
from src.core.parser import parse_openapi_spec, _basic_validate
|
||||
from src.utils.templates import generate_html, generate_markdown, generate_json, serve_docs
|
||||
from src.utils.search import create_search_index, search_index
|
||||
|
||||
|
||||
@click.group()
|
||||
def main():
|
||||
"""LocalAPI Docs - Privacy-First OpenAPI Documentation CLI"""
|
||||
pass
|
||||
|
||||
|
||||
@main.command("serve")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
@click.option("--host", default="127.0.0.1", help="Host to bind the server to")
|
||||
@click.option("--port", default=8080, type=int, help="Port to bind the server to")
|
||||
def serve(spec_path: str, host: str, port: int):
|
||||
"""Serve interactive API documentation locally"""
|
||||
serve_docs(spec_path, host=host, port=port)
|
||||
|
||||
|
||||
@main.command("generate")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||
@click.option("--format", "fmt", type=click.Choice(["html", "markdown", "json"]), default="html", help="Output format")
|
||||
@click.option("--template", type=click.Path(exists=True), help="Custom template file path")
|
||||
def generate(spec_path: str, output: str | None, fmt: str, template: str | None):
|
||||
"""Generate documentation in various formats"""
|
||||
if output is None:
|
||||
if fmt == "html":
|
||||
output = "docs.html"
|
||||
elif fmt == "markdown":
|
||||
output = "docs.md"
|
||||
else:
|
||||
output = "docs.json"
|
||||
try:
|
||||
if fmt == "html":
|
||||
generate_html(spec_path, output, template_path=template)
|
||||
elif fmt == "markdown":
|
||||
generate_markdown(spec_path, output, template_path=template)
|
||||
else:
|
||||
generate_json(spec_path, output, template_path=template)
|
||||
click.echo(f"Documentation generated: {output}")
|
||||
except Exception as e:
|
||||
click.echo(f"Error generating documentation: {e}", err=True)
|
||||
|
||||
|
||||
@main.command("validate")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
def validate(spec_path: str):
|
||||
"""Validate an OpenAPI specification file"""
|
||||
try:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
click.echo(f"Valid OpenAPI spec: {spec.info['title']} v{spec.info['version']}")
|
||||
return True
|
||||
except ValueError as e:
|
||||
click.echo(f"Validation failed: {e}", err=True)
|
||||
return False
|
||||
|
||||
|
||||
@main.command("search")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
@click.argument("query", nargs=-1)
|
||||
def search(spec_path: str, query: tuple):
|
||||
"""Search for endpoints in an OpenAPI specification"""
|
||||
query_str = " ".join(query)
|
||||
if not query_str:
|
||||
click.echo("Please provide a search query")
|
||||
return
|
||||
try:
|
||||
try:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
spec_dict = spec.model_dump()
|
||||
except Exception:
|
||||
content = Path(spec_path).read_text()
|
||||
if spec_path.endswith(('.yaml', '.yml')):
|
||||
import yaml
|
||||
spec_dict = yaml.safe_load(content)
|
||||
else:
|
||||
spec_dict = json.loads(content)
|
||||
index = create_search_index(spec_dict)
|
||||
results = search_index(index, query_str)
|
||||
if results:
|
||||
click.echo(f"Found {len(results)} results for '{query_str}':")
|
||||
for r in results:
|
||||
click.echo(f" [{r.method}] {r.path} - {r.summary or ''}")
|
||||
else:
|
||||
click.echo(f"No results found for '{query_str}'")
|
||||
except Exception as e:
|
||||
click.echo(f"Search failed: {e}", err=True)
|
||||
67
app/localapi-docs/src/core/generator.py
Normal file
67
app/localapi-docs/src/core/generator.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""Generator for documentation output."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
from src.core.parser import parse_openapi_spec, _basic_validate
|
||||
from src.core.models import OpenAPISpec
|
||||
|
||||
|
||||
def generate_docs(
|
||||
spec_source: str | Path | Dict[str, Any],
|
||||
format: str = "html",
|
||||
output_path: Optional[str] = None,
|
||||
template_path: Optional[str] = None,
|
||||
) -> str:
|
||||
if isinstance(spec_source, dict):
|
||||
spec_data = spec_source
|
||||
elif isinstance(spec_source, (str, Path)):
|
||||
spec_path = Path(spec_source)
|
||||
if spec_path.exists():
|
||||
content = spec_path.read_text()
|
||||
if spec_path.suffix in [".yaml", ".yml"]:
|
||||
import yaml
|
||||
spec_data = yaml.safe_load(content)
|
||||
else:
|
||||
spec_data = json.loads(content)
|
||||
else:
|
||||
raise FileNotFoundError(f"Spec file not found: {spec_source}")
|
||||
else:
|
||||
raise ValueError(f"Invalid spec source type: {type(spec_source)}")
|
||||
|
||||
is_valid, errors = _basic_validate(spec_data)
|
||||
if not is_valid:
|
||||
raise ValueError(f"Invalid spec: {errors}")
|
||||
|
||||
spec = parse_openapi_spec(spec_data)
|
||||
return spec
|
||||
|
||||
|
||||
def extract_endpoints(spec: OpenAPISpec) -> list:
|
||||
endpoints = []
|
||||
for path, path_item in spec.paths.items():
|
||||
for method, operation in path_item.model_dump().items():
|
||||
if method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if operation:
|
||||
endpoints.append({
|
||||
"path": path,
|
||||
"method": method.upper(),
|
||||
"summary": operation.get("summary", ""),
|
||||
"description": operation.get("description", ""),
|
||||
"tags": operation.get("tags", []),
|
||||
})
|
||||
return endpoints
|
||||
|
||||
|
||||
def generate_template_context(spec: OpenAPISpec) -> Dict[str, Any]:
|
||||
spec_dict = spec.model_dump()
|
||||
return {
|
||||
"spec": spec_dict,
|
||||
"info": spec_dict.get("info", {}),
|
||||
"paths": spec_dict.get("paths", {}),
|
||||
"servers": spec_dict.get("servers", []),
|
||||
"tags": spec_dict.get("tags", []),
|
||||
"components": spec_dict.get("components", {}),
|
||||
"security": spec_dict.get("security", []),
|
||||
"external_docs": spec_dict.get("externalDocs"),
|
||||
}
|
||||
286
app/localapi-docs/src/core/parser.py
Normal file
286
app/localapi-docs/src/core/parser.py
Normal file
@@ -0,0 +1,286 @@
|
||||
import json
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List, Optional
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from openapi_spec_validator import validate
|
||||
from openapi_spec_validator.versions import consts as validator_consts
|
||||
|
||||
from src.core.models import OpenAPISpec, Schema, PathItem, Operation, Parameter, Response
|
||||
|
||||
|
||||
class OpenAPIParser:
|
||||
def __init__(self, spec_data: Dict[str, Any]):
|
||||
self.spec_data = spec_data
|
||||
self._resolved_refs: Dict[str, Any] = {}
|
||||
self._components_schemas: Dict[str, Schema] = {}
|
||||
self._components_responses: Dict[str, Response] = {}
|
||||
self._components_request_bodies: Dict[str, Any] = {}
|
||||
|
||||
def validate(self) -> List[str]:
|
||||
errors = []
|
||||
try:
|
||||
validate(self.spec_data)
|
||||
except Exception as e:
|
||||
errors.append(str(e))
|
||||
return errors
|
||||
|
||||
def parse(self) -> OpenAPISpec:
|
||||
self._extract_components()
|
||||
return OpenAPISpec(
|
||||
openapi=self.spec_data.get("openapi", "3.0.0"),
|
||||
info=self._parse_info(),
|
||||
servers=self._parse_servers(),
|
||||
paths=self._parse_paths(),
|
||||
components=self._parse_components(),
|
||||
security=self.spec_data.get("security"),
|
||||
tags=self._parse_tags(),
|
||||
external_docs=self.spec_data.get("externalDocs"),
|
||||
)
|
||||
|
||||
def _extract_components(self) -> None:
|
||||
components = self.spec_data.get("components", {})
|
||||
if "schemas" in components:
|
||||
for name, schema_data in components["schemas"].items():
|
||||
self._components_schemas[name] = self._parse_schema(schema_data)
|
||||
if "responses" in components:
|
||||
self._components_responses = components["responses"]
|
||||
if "requestBodies" in components:
|
||||
self._components_request_bodies = components["requestBodies"]
|
||||
|
||||
def _parse_info(self) -> Dict[str, Any]:
|
||||
info_data = self.spec_data.get("info", {})
|
||||
contact_data = info_data.get("contact", {})
|
||||
license_data = info_data.get("license", {})
|
||||
return {
|
||||
"title": info_data.get("title", "API"),
|
||||
"version": info_data.get("version", "1.0.0"),
|
||||
"description": info_data.get("description"),
|
||||
"terms_of_service": info_data.get("termsOfService"),
|
||||
"contact": {
|
||||
"name": contact_data.get("name"),
|
||||
"url": contact_data.get("url"),
|
||||
"email": contact_data.get("email"),
|
||||
} if contact_data else None,
|
||||
"license": {
|
||||
"name": license_data.get("name", ""),
|
||||
"url": license_data.get("url"),
|
||||
} if license_data else None,
|
||||
}
|
||||
|
||||
def _parse_servers(self) -> Optional[List[Dict[str, Any]]]:
|
||||
servers = self.spec_data.get("servers", [])
|
||||
return [{"url": s.get("url", "/"), "description": s.get("description")} for s in servers]
|
||||
|
||||
def _parse_paths(self) -> Dict[str, PathItem]:
|
||||
paths = {}
|
||||
for path, path_item in self.spec_data.get("paths", {}).items():
|
||||
if path.startswith("/"):
|
||||
path_item_data = path_item if path_item else {}
|
||||
paths[path] = self._parse_path_item(path_item_data)
|
||||
return paths
|
||||
|
||||
def _parse_path_item(self, data: Dict[str, Any]) -> PathItem:
|
||||
operations = {}
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in data:
|
||||
operations[method] = self._parse_operation(data[method])
|
||||
return PathItem(
|
||||
ref=data.get("$ref"),
|
||||
summary=data.get("summary"),
|
||||
description=data.get("description"),
|
||||
servers=data.get("servers"),
|
||||
parameters=self._parse_parameters(data.get("parameters", [])),
|
||||
**operations,
|
||||
)
|
||||
|
||||
def _parse_operation(self, data: Dict[str, Any]) -> Operation:
|
||||
parameters = data.get("parameters", [])
|
||||
request_body = data.get("requestBody")
|
||||
responses = {}
|
||||
for status_code, response_data in data.get("responses", {}).items():
|
||||
responses[status_code] = self._parse_response(response_data)
|
||||
return Operation(
|
||||
tags=data.get("tags"),
|
||||
summary=data.get("summary"),
|
||||
description=data.get("description"),
|
||||
external_docs=data.get("externalDocs"),
|
||||
operation_id=data.get("operationId"),
|
||||
parameters=self._parse_parameters(parameters),
|
||||
request_body=self._parse_request_body(request_body) if request_body else None,
|
||||
responses=responses,
|
||||
deprecated=data.get("deprecated"),
|
||||
security=data.get("security"),
|
||||
servers=data.get("servers"),
|
||||
)
|
||||
|
||||
def _parse_parameters(self, params: List[Dict[str, Any]]) -> List[Parameter]:
|
||||
return [
|
||||
Parameter(
|
||||
name=p.get("name", ""),
|
||||
in_=p.get("in", "query"),
|
||||
description=p.get("description"),
|
||||
required=p.get("required"),
|
||||
deprecated=p.get("deprecated"),
|
||||
allow_empty_value=p.get("allowEmptyValue"),
|
||||
style=p.get("style"),
|
||||
explode=p.get("explode"),
|
||||
allow_reserved=p.get("allowReserved"),
|
||||
schema=self._parse_schema(p.get("schema")) if p.get("schema") else None,
|
||||
example=p.get("example"),
|
||||
examples=p.get("examples"),
|
||||
)
|
||||
for p in params
|
||||
]
|
||||
|
||||
def _parse_response(self, data: Dict[str, Any]) -> Response:
|
||||
content = {}
|
||||
for content_type, content_data in data.get("content", {}).items():
|
||||
content[content_type] = {
|
||||
"schema": self._parse_schema(content_data.get("schema")) if content_data.get("schema") else None,
|
||||
"example": content_data.get("example"),
|
||||
"examples": content_data.get("examples"),
|
||||
}
|
||||
return Response(
|
||||
description=data.get("description", ""),
|
||||
content=content,
|
||||
headers=data.get("headers"),
|
||||
links=data.get("links"),
|
||||
)
|
||||
|
||||
def _parse_request_body(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
content = {}
|
||||
for content_type, content_data in data.get("content", {}).items():
|
||||
content[content_type] = {
|
||||
"schema": self._parse_schema(content_data.get("schema")) if content_data.get("schema") else None,
|
||||
"example": content_data.get("example"),
|
||||
"examples": content_data.get("examples"),
|
||||
}
|
||||
return {
|
||||
"description": data.get("description"),
|
||||
"required": data.get("required"),
|
||||
"content": content,
|
||||
}
|
||||
|
||||
def _parse_schema(self, data: Any) -> Optional[Schema]:
|
||||
if data is None:
|
||||
return None
|
||||
if isinstance(data, dict):
|
||||
if "$ref" in data:
|
||||
ref = data["$ref"]
|
||||
resolved = self._resolve_ref(ref)
|
||||
if resolved:
|
||||
return self._parse_schema(resolved)
|
||||
schema_data = dict(data)
|
||||
for key in ["allOf", "anyOf", "oneOf", "not"]:
|
||||
if key in schema_data:
|
||||
nested = schema_data[key]
|
||||
if isinstance(nested, list):
|
||||
schema_data[key] = [
|
||||
self._parse_schema(item) if isinstance(item, dict) else item
|
||||
for item in nested
|
||||
]
|
||||
elif isinstance(nested, dict):
|
||||
schema_data[key] = self._parse_schema(nested)
|
||||
if "items" in schema_data and isinstance(schema_data["items"], dict):
|
||||
schema_data["items"] = self._parse_schema(schema_data["items"])
|
||||
if "properties" in schema_data:
|
||||
schema_data["properties"] = {
|
||||
k: self._parse_schema(v) if isinstance(v, dict) else v
|
||||
for k, v in schema_data["properties"].items()
|
||||
}
|
||||
if "additionalProperties" in schema_data and isinstance(schema_data["additionalProperties"], dict):
|
||||
schema_data["additionalProperties"] = self._parse_schema(schema_data["additionalProperties"])
|
||||
return Schema(**schema_data)
|
||||
return None
|
||||
|
||||
def _resolve_ref(self, ref: str) -> Optional[Dict[str, Any]]:
|
||||
if ref in self._resolved_refs:
|
||||
return self._resolved_refs[ref]
|
||||
if ref.startswith("#/components/"):
|
||||
parts = ref.split("/")[2:]
|
||||
current = self.spec_data.get("components", {})
|
||||
for part in parts:
|
||||
if isinstance(current, dict) and part in current:
|
||||
current = current[part]
|
||||
else:
|
||||
return None
|
||||
self._resolved_refs[ref] = current
|
||||
return current
|
||||
return None
|
||||
|
||||
def _parse_components(self) -> Optional[Dict[str, Any]]:
|
||||
components = self.spec_data.get("components")
|
||||
if not components:
|
||||
return None
|
||||
security_schemes = {}
|
||||
for name, scheme in components.get("securitySchemes", {}).items():
|
||||
security_schemes[name] = {
|
||||
"type": scheme.get("type"),
|
||||
"scheme": scheme.get("scheme"),
|
||||
"bearer_format": scheme.get("bearerFormat"),
|
||||
"flows": scheme.get("flows"),
|
||||
"open_id_connect_url": scheme.get("openIdConnectUrl"),
|
||||
"description": scheme.get("description"),
|
||||
}
|
||||
return {
|
||||
"schemas": self._components_schemas,
|
||||
"responses": self._components_responses,
|
||||
"parameters": components.get("parameters"),
|
||||
"request_bodies": self._components_request_bodies,
|
||||
"headers": components.get("headers"),
|
||||
"security_schemes": security_schemes,
|
||||
"links": components.get("links"),
|
||||
"callbacks": components.get("callbacks"),
|
||||
}
|
||||
|
||||
def _parse_tags(self) -> Optional[List[Dict[str, Any]]]:
|
||||
tags = self.spec_data.get("tags", [])
|
||||
return [{"name": t.get("name"), "description": t.get("description"), "external_docs": t.get("externalDocs")} for t in tags]
|
||||
|
||||
|
||||
def _basic_validate(spec_data: Dict[str, Any]) -> tuple:
|
||||
errors = []
|
||||
if not isinstance(spec_data, dict):
|
||||
errors.append("Spec must be a dictionary")
|
||||
return False, errors
|
||||
if "openapi" not in spec_data:
|
||||
errors.append("Missing 'openapi' version")
|
||||
return False, errors
|
||||
if "info" not in spec_data:
|
||||
errors.append("Missing 'info' object")
|
||||
return False, errors
|
||||
info = spec_data.get("info", {})
|
||||
if not isinstance(info, dict):
|
||||
errors.append("'info' must be an object")
|
||||
return False, errors
|
||||
if "title" not in info:
|
||||
errors.append("Missing 'info.title'")
|
||||
return False, errors
|
||||
if "version" not in info:
|
||||
errors.append("Missing 'info.version'")
|
||||
return False, errors
|
||||
return True, []
|
||||
|
||||
|
||||
def parse_openapi_spec(spec_source: str | Path | Dict[str, Any]) -> OpenAPISpec:
|
||||
if isinstance(spec_source, dict):
|
||||
spec_data = spec_source
|
||||
elif isinstance(spec_source, Path):
|
||||
spec_data = _load_file(spec_source)
|
||||
else:
|
||||
spec_data = _load_file(Path(spec_source))
|
||||
parser = OpenAPIParser(spec_data)
|
||||
errors = parser.validate()
|
||||
if errors:
|
||||
raise ValueError(f"Invalid OpenAPI spec: {errors}")
|
||||
return parser.parse()
|
||||
|
||||
|
||||
def _load_file(path: Path) -> Dict[str, Any]:
|
||||
content = path.read_text()
|
||||
if path.suffix in [".yaml", ".yml"]:
|
||||
import yaml
|
||||
return yaml.safe_load(content)
|
||||
return json.loads(content)
|
||||
22
app/localapi-docs/src/templates/__init__.py
Normal file
22
app/localapi-docs/src/templates/__init__.py
Normal file
@@ -0,0 +1,22 @@
|
||||
"""Template module for documentation generation."""
|
||||
|
||||
import os
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
TEMPLATES_DIR = os.path.join(os.path.dirname(__file__))
|
||||
|
||||
def startswith(s, prefix):
|
||||
return s.startswith(prefix) if s else False
|
||||
|
||||
env = Environment(
|
||||
loader=FileSystemLoader(TEMPLATES_DIR),
|
||||
trim_blocks=True,
|
||||
lstrip_blocks=True,
|
||||
)
|
||||
env.filters["startswith"] = startswith
|
||||
|
||||
HTML_TEMPLATE = env.get_template("html_template.html")
|
||||
MARKDOWN_TEMPLATE = env.get_template("markdown_template.md")
|
||||
JSON_TEMPLATE = env.get_template("json_template.json")
|
||||
|
||||
__all__ = ["HTML_TEMPLATE", "MARKDOWN_TEMPLATE", "JSON_TEMPLATE", "env"]
|
||||
26
app/localapi-docs/src/templates/json_template.json
Normal file
26
app/localapi-docs/src/templates/json_template.json
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"openapi": "{{ spec.openapi }}",
|
||||
"info": {
|
||||
"title": "{{ info.title }}",
|
||||
"version": "{{ info.version }}",
|
||||
"description": {{ info.description | tojson if info.description else 'null' }},
|
||||
"termsOfService": {{ info.terms_of_service | tojson if info.terms_of_service else 'null' }},
|
||||
"contact": {{ info.contact | tojson if info.contact else 'null' }},
|
||||
"license": {{ info.license | tojson if info.license else 'null' }}
|
||||
},
|
||||
"servers": {{ servers | tojson if servers else 'null' }},
|
||||
"paths": {{ paths | tojson }},
|
||||
"components": {
|
||||
"schemas": {{ components.schemas | tojson if components.schemas else 'null' }},
|
||||
"responses": {{ components.responses | tojson if components.responses else 'null' }},
|
||||
"parameters": {{ components.parameters | tojson if components.parameters else 'null' }},
|
||||
"requestBodies": {{ components.request_bodies | tojson if components.request_bodies else 'null' }},
|
||||
"headers": {{ components.headers | tojson if components.headers else 'null' }},
|
||||
"securitySchemes": {{ components.security_schemes | tojson if components.security_schemes else 'null' }},
|
||||
"links": {{ components.links | tojson if components.links else 'null' }},
|
||||
"callbacks": {{ components.callbacks | tojson if components.callbacks else 'null' }}
|
||||
},
|
||||
"security": {{ security | tojson if security else '[]' }},
|
||||
"tags": {{ tags | tojson if tags else '[]' }},
|
||||
"externalDocs": {{ external_docs | tojson if external_docs else 'null' }}
|
||||
}
|
||||
@@ -10,7 +10,7 @@ readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
license = {text = "MIT"}
|
||||
authors = [
|
||||
{name = "LocalAPI Contributors"}
|
||||
{name = "LocalAPI Docs", email = "dev@localapi.example.com"}
|
||||
]
|
||||
keywords = ["api", "documentation", "openapi", "swagger", "cli"]
|
||||
classifiers = [
|
||||
@@ -36,24 +36,32 @@ dependencies = [
|
||||
dev = [
|
||||
"pytest>=7.0.0",
|
||||
"pytest-cov>=4.0.0",
|
||||
"ruff>=0.1.0",
|
||||
"mypy>=1.0.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
localapi-docs = "src.cli:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
python_files = ["test_*.py"]
|
||||
python_classes = ["Test*"]
|
||||
python_functions = ["test_*"]
|
||||
addopts = "-v --tb=short"
|
||||
|
||||
[tool.coverage.run]
|
||||
source = ["src"]
|
||||
omit = ["tests/*"]
|
||||
[tool.ruff]
|
||||
target-version = "py39"
|
||||
line-length = 100
|
||||
|
||||
[tool.coverage.report]
|
||||
exclude_lines = ["pragma: no cover", "def __repr__", "raise AssertionError", "raise NotImplementedError"]
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "W", "C90", "I", "N", "UP", "B", "C4"]
|
||||
ignore = ["C901"]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["F401"]
|
||||
|
||||
[tool.mypy]
|
||||
python_version = "3.9"
|
||||
warn_return_any = true
|
||||
warn_unused_ignores = true
|
||||
disallow_untyped_defs = false
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
# LocalAPI Docs
|
||||
# A CLI tool for generating local API documentation
|
||||
VERSION = "0.1.0"
|
||||
|
||||
142
src/cli.py
142
src/cli.py
@@ -1,75 +1,97 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
from .core.parser import parse_openapi_spec
|
||||
from .core.generator import generate_docs
|
||||
from .core.models import APISpec, Endpoint, RequestExample, ResponseExample
|
||||
from .utils.search import search_endpoints
|
||||
|
||||
from src.core.parser import parse_openapi_spec
|
||||
from src.utils.search import create_search_index, search_index
|
||||
from src.utils.templates import generate_html, generate_json, generate_markdown, serve_docs
|
||||
|
||||
|
||||
@click.group()
|
||||
def main():
|
||||
"""LocalAPI Docs - Generate local API documentation from OpenAPI specs."""
|
||||
"""LocalAPI Docs - Privacy-First OpenAPI Documentation CLI"""
|
||||
pass
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument('spec_file', type=click.Path(exists=True))
|
||||
@click.option('--host', '-h', default='127.0.0.1', help='Host to bind to')
|
||||
@click.option('--port', '-p', default=8080, help='Port to serve on')
|
||||
@click.option('--no-browser', is_flag=True, help='Don\'t open browser automatically')
|
||||
def serve(spec_file, host, port, no_browser):
|
||||
"""Start an interactive HTML documentation server."""
|
||||
from .templates.html_template import generate_html_server
|
||||
generate_html_server(spec_file, host, port, not no_browser)
|
||||
@main.command("serve")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
@click.option("--host", default="127.0.0.1", help="Host to bind the server to")
|
||||
@click.option("--port", default=8080, type=int, help="Port to bind the server to")
|
||||
def serve(spec_path: str, host: str, port: int):
|
||||
"""Serve interactive API documentation locally"""
|
||||
serve_docs(spec_path, host=host, port=port)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument('spec_file', type=click.Path(exists=True))
|
||||
@click.option('--output', '-o', help='Output file or directory')
|
||||
@click.option('--format', '-f', type=click.Choice(['html', 'markdown', 'json', 'all']), default='html', help='Output format')
|
||||
@click.option('--open', is_flag=True, help='Open the generated file in browser')
|
||||
def generate(spec_file, output, format, open):
|
||||
"""Generate static documentation in various formats."""
|
||||
generate_docs(spec_file, output, format, open)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument('spec_file', type=click.Path(exists=True))
|
||||
@click.option('--json', is_flag=True, help='Output as JSON')
|
||||
def validate(spec_file, json_output):
|
||||
"""Validate an OpenAPI specification file."""
|
||||
result = parse_openapi_spec(spec_file)
|
||||
if result.get('valid'):
|
||||
click.echo("✓ OpenAPI spec is valid")
|
||||
if json_output:
|
||||
import json
|
||||
click.echo(json.dumps(result, indent=2))
|
||||
@main.command("generate")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
@click.option("--output", "-o", type=click.Path(), help="Output file path")
|
||||
@click.option(
|
||||
"--format", "fmt", type=click.Choice(["html", "markdown", "json"]),
|
||||
default="html", help="Output format"
|
||||
)
|
||||
@click.option("--template", type=click.Path(exists=True), help="Custom template file path")
|
||||
def generate(spec_path: str, output: str | None, fmt: str, template: str | None):
|
||||
"""Generate documentation in various formats"""
|
||||
if output is None:
|
||||
if fmt == "html":
|
||||
output = "docs.html"
|
||||
elif fmt == "markdown":
|
||||
output = "docs.md"
|
||||
else:
|
||||
click.echo("✗ OpenAPI spec is invalid")
|
||||
if json_output:
|
||||
import json
|
||||
click.echo(json.dumps(result, indent=2))
|
||||
output = "docs.json"
|
||||
try:
|
||||
if fmt == "html":
|
||||
generate_html(spec_path, output, template_path=template)
|
||||
elif fmt == "markdown":
|
||||
generate_markdown(spec_path, output, template_path=template)
|
||||
else:
|
||||
for error in result.get('errors', []):
|
||||
click.echo(f" - {error}")
|
||||
generate_json(spec_path, output, template_path=template)
|
||||
click.echo(f"Documentation generated: {output}")
|
||||
except Exception as e:
|
||||
click.echo(f"Error generating documentation: {e}", err=True)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument('spec_file', type=click.Path(exists=True))
|
||||
@click.argument('query', nargs=-1)
|
||||
@click.option('--limit', '-l', default=10, help='Maximum results')
|
||||
@click.option('--json', is_flag=True, help='Output as JSON')
|
||||
def search(spec_file, query, limit, json_output):
|
||||
"""Search for endpoints in an OpenAPI specification."""
|
||||
search_term = ' '.join(query)
|
||||
results = search_endpoints(spec_file, search_term, limit)
|
||||
if json_output:
|
||||
import json
|
||||
click.echo(json.dumps(results, indent=2))
|
||||
else:
|
||||
if not results:
|
||||
click.echo("No results found.")
|
||||
@main.command("validate")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
def validate(spec_path: str):
|
||||
"""Validate an OpenAPI specification file"""
|
||||
try:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
click.echo(f"Valid OpenAPI spec: {spec.info.title} v{spec.info.version}")
|
||||
return True
|
||||
except ValueError as e:
|
||||
click.echo(f"Validation failed: {e}", err=True)
|
||||
return False
|
||||
|
||||
|
||||
@main.command("search")
|
||||
@click.argument("spec_path", type=click.Path(exists=True))
|
||||
@click.argument("query", nargs=-1)
|
||||
def search(spec_path: str, query: tuple):
|
||||
"""Search for endpoints in an OpenAPI specification"""
|
||||
query_str = " ".join(query)
|
||||
if not query_str:
|
||||
click.echo("Please provide a search query")
|
||||
return
|
||||
for result in results:
|
||||
click.echo(f"\n{result['method']} {result['path']}")
|
||||
click.echo(f" {result.get('summary', result.get('description', 'No description'))}")
|
||||
click.echo(f" Tags: {', '.join(result.get('tags', []))}")
|
||||
try:
|
||||
try:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
spec_dict = spec.model_dump()
|
||||
except Exception:
|
||||
content = Path(spec_path).read_text()
|
||||
if spec_path.endswith(('.yaml', '.yml')):
|
||||
import yaml
|
||||
spec_dict = yaml.safe_load(content)
|
||||
else:
|
||||
spec_dict = json.loads(content)
|
||||
index = create_search_index(spec_dict)
|
||||
results = search_index(index, query_str)
|
||||
if results:
|
||||
click.echo(f"Found {len(results)} results for '{query_str}':")
|
||||
for r in results:
|
||||
click.echo(f" [{r.method}] {r.path} - {r.summary or ''}")
|
||||
else:
|
||||
click.echo(f"No results found for '{query_str}'")
|
||||
except Exception as e:
|
||||
click.echo(f"Search failed: {e}", err=True)
|
||||
|
||||
@@ -1 +1,19 @@
|
||||
# Core modules for API documentation generation
|
||||
from src.core.models import (
|
||||
Components,
|
||||
Contact,
|
||||
HttpMethod,
|
||||
Info,
|
||||
License,
|
||||
OpenAPISpec,
|
||||
Operation,
|
||||
Parameter,
|
||||
ParameterIn,
|
||||
PathItem,
|
||||
RequestBody,
|
||||
Response,
|
||||
Schema,
|
||||
SecurityScheme,
|
||||
Server,
|
||||
Tag,
|
||||
)
|
||||
from src.core.parser import OpenAPIParser, parse_openapi_spec
|
||||
|
||||
@@ -1,70 +1,71 @@
|
||||
"""Generator for documentation output."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from .parser import parse_openapi_spec, load_spec_file
|
||||
from .models import Endpoint, APISpec
|
||||
from typing import Any, Optional
|
||||
|
||||
from src.core.models import OpenAPISpec
|
||||
from src.core.parser import _basic_validate, parse_openapi_spec
|
||||
|
||||
|
||||
def generate_docs(spec_file: str, output: str = None, format: str = 'html', open_browser: bool = False):
|
||||
"""Generate documentation in the specified format.
|
||||
def generate_docs(
|
||||
spec_source: str | Path | dict[str, Any],
|
||||
format: str = "html",
|
||||
output_path: Optional[str] = None,
|
||||
template_path: Optional[str] = None,
|
||||
) -> str:
|
||||
if isinstance(spec_source, dict):
|
||||
spec_data = spec_source
|
||||
elif isinstance(spec_source, (str, Path)):
|
||||
spec_path = Path(spec_source)
|
||||
if spec_path.exists():
|
||||
content = spec_path.read_text()
|
||||
if spec_path.suffix in [".yaml", ".yml"]:
|
||||
import yaml
|
||||
|
||||
Args:
|
||||
spec_file: Path to the OpenAPI spec file
|
||||
output: Output file path or directory
|
||||
format: Output format (html, markdown, json, all)
|
||||
open_browser: Whether to open the generated file in browser
|
||||
"""
|
||||
result = parse_openapi_spec(spec_file)
|
||||
spec_data = yaml.safe_load(content)
|
||||
else:
|
||||
spec_data = json.loads(content)
|
||||
else:
|
||||
raise FileNotFoundError(f"Spec file not found: {spec_source}")
|
||||
else:
|
||||
raise ValueError(f"Invalid spec source type: {type(spec_source)}")
|
||||
|
||||
if not result.get('valid'):
|
||||
raise ValueError(f"Invalid spec: {result.get('errors')}")
|
||||
is_valid, errors = _basic_validate(spec_data)
|
||||
if not is_valid:
|
||||
raise ValueError(f"Invalid spec: {errors}")
|
||||
|
||||
spec = result['spec']
|
||||
|
||||
if format == 'all':
|
||||
for fmt in ['html', 'markdown', 'json']:
|
||||
generate_docs(spec_file, output, fmt, open_browser and fmt == 'html')
|
||||
return
|
||||
|
||||
if not output:
|
||||
base_name = Path(spec_file).stem
|
||||
if format == 'html':
|
||||
output = f"{base_name}.html"
|
||||
elif format == 'markdown':
|
||||
output = f"{base_name}.md"
|
||||
elif format == 'json':
|
||||
output = f"{base_name}_docs.json"
|
||||
|
||||
if format == 'html':
|
||||
from .templates.html_template import generate_html
|
||||
generate_html(spec, output)
|
||||
elif format == 'markdown':
|
||||
from .templates.markdown_template import generate_markdown
|
||||
generate_markdown(spec, output)
|
||||
elif format == 'json':
|
||||
generate_json_docs(spec, output)
|
||||
|
||||
print(f"Generated {format} documentation: {output}")
|
||||
|
||||
if open_browser and format == 'html':
|
||||
import webbrowser
|
||||
webbrowser.open(f'file://{os.path.abspath(output)}')
|
||||
spec = parse_openapi_spec(spec_data)
|
||||
return spec
|
||||
|
||||
|
||||
def generate_json_docs(spec: dict, output: str):
|
||||
"""Generate JSON documentation."""
|
||||
from .parser import extract_endpoints
|
||||
|
||||
endpoints = extract_endpoints(spec)
|
||||
|
||||
docs = {
|
||||
'title': spec.get('info', {}).get('title', 'API Documentation'),
|
||||
'version': spec.get('info', {}).get('version', '1.0.0'),
|
||||
'description': spec.get('info', {}).get('description', ''),
|
||||
'endpoints': endpoints,
|
||||
'tags': spec.get('tags', []),
|
||||
'servers': spec.get('servers', [])
|
||||
def extract_endpoints(spec: OpenAPISpec) -> list:
|
||||
endpoints = []
|
||||
for path, path_item in spec.paths.items():
|
||||
for method, operation in path_item.model_dump().items():
|
||||
if method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if operation:
|
||||
endpoints.append(
|
||||
{
|
||||
"path": path,
|
||||
"method": method.upper(),
|
||||
"summary": operation.get("summary", ""),
|
||||
"description": operation.get("description", ""),
|
||||
"tags": operation.get("tags", []),
|
||||
}
|
||||
)
|
||||
return endpoints
|
||||
|
||||
with open(output, 'w') as f:
|
||||
json.dump(docs, f, indent=2)
|
||||
|
||||
def generate_template_context(spec: OpenAPISpec) -> dict[str, Any]:
|
||||
spec_dict = spec.model_dump()
|
||||
return {
|
||||
"spec": spec_dict,
|
||||
"info": spec_dict.get("info", {}),
|
||||
"paths": spec_dict.get("paths", {}),
|
||||
"servers": spec_dict.get("servers", []),
|
||||
"tags": spec_dict.get("tags", []),
|
||||
"components": spec_dict.get("components", {}),
|
||||
"security": spec_dict.get("security", []),
|
||||
"external_docs": spec_dict.get("externalDocs"),
|
||||
}
|
||||
|
||||
@@ -1,48 +1,192 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
class HTTPMethod(str, Enum):
|
||||
GET = "GET"
|
||||
POST = "PUT"
|
||||
PUT = "PUT"
|
||||
DELETE = "DELETE"
|
||||
PATCH = "PATCH"
|
||||
OPTIONS = "OPTIONS"
|
||||
HEAD = "HEAD"
|
||||
class HttpMethod(str, Enum):
|
||||
GET = "get"
|
||||
POST = "post"
|
||||
PUT = "put"
|
||||
DELETE = "delete"
|
||||
PATCH = "patch"
|
||||
OPTIONS = "options"
|
||||
HEAD = "head"
|
||||
|
||||
|
||||
class Endpoint(BaseModel):
|
||||
path: str
|
||||
method: str
|
||||
class ParameterIn(str, Enum):
|
||||
PATH = "path"
|
||||
QUERY = "query"
|
||||
HEADER = "header"
|
||||
COOKIE = "cookie"
|
||||
|
||||
|
||||
class SchemaType(str, Enum):
|
||||
STRING = "string"
|
||||
NUMBER = "number"
|
||||
INTEGER = "integer"
|
||||
BOOLEAN = "boolean"
|
||||
ARRAY = "array"
|
||||
OBJECT = "object"
|
||||
|
||||
|
||||
class Schema(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
type: Optional[str] = None
|
||||
format: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
nullable: Optional[bool] = None
|
||||
default: Optional[Any] = None
|
||||
example: Optional[Any] = None
|
||||
properties: Optional[dict[str, "Schema"]] = None
|
||||
items: Optional["Schema"] = None
|
||||
required: Optional[list[str]] = None
|
||||
enum: Optional[list[Any]] = None
|
||||
all_of: Optional[list["Schema"]] = Field(None, alias="allOf")
|
||||
any_of: Optional[list["Schema"]] = Field(None, alias="anyOf")
|
||||
one_of: Optional[list["Schema"]] = Field(None, alias="oneOf")
|
||||
not_: Optional["Schema"] = Field(None, alias="not")
|
||||
ref: Optional[str] = Field(None, alias="$ref")
|
||||
additional_properties: Optional[Union[bool, "Schema"]] = Field(
|
||||
None, alias="additionalProperties"
|
||||
)
|
||||
|
||||
|
||||
class Parameter(BaseModel):
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
name: str
|
||||
in_: ParameterIn = Field(..., alias="in")
|
||||
description: Optional[str] = None
|
||||
required: Optional[bool] = None
|
||||
deprecated: Optional[bool] = None
|
||||
allow_empty_value: Optional[bool] = Field(None, alias="allowEmptyValue")
|
||||
style: Optional[str] = None
|
||||
explode: Optional[bool] = None
|
||||
allow_reserved: Optional[bool] = Field(None, alias="allowReserved")
|
||||
schema: Optional[Schema] = Field(None, alias="schema")
|
||||
example: Optional[Any] = None
|
||||
examples: Optional[dict[str, Any]] = None
|
||||
|
||||
|
||||
class Response(BaseModel):
|
||||
description: str
|
||||
content: Optional[dict[str, Any]] = None
|
||||
headers: Optional[dict[str, Any]] = None
|
||||
links: Optional[dict[str, Any]] = None
|
||||
|
||||
|
||||
class RequestBody(BaseModel):
|
||||
description: Optional[str] = None
|
||||
required: Optional[bool] = None
|
||||
content: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class Operation(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
tags: Optional[list[str]] = None
|
||||
summary: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
operation_id: Optional[str] = None
|
||||
tags: List[str] = Field(default_factory=list)
|
||||
parameters: List[Dict[str, Any]] = Field(default_factory=list)
|
||||
request_body: Optional[Dict[str, Any]] = None
|
||||
responses: Dict[str, Any] = Field(default_factory=dict)
|
||||
deprecated: bool = False
|
||||
external_docs: Optional[dict[str, str]] = Field(None, alias="externalDocs")
|
||||
operation_id: Optional[str] = Field(None, alias="operationId")
|
||||
parameters: Optional[list[Parameter]] = None
|
||||
request_body: Optional[RequestBody] = Field(None, alias="requestBody")
|
||||
responses: dict[str, Response] = Field(default_factory=dict)
|
||||
deprecated: Optional[bool] = None
|
||||
security: Optional[list[dict[str, list[str]]]] = None
|
||||
servers: Optional[list[dict[str, Any]]] = None
|
||||
|
||||
|
||||
class APISpec(BaseModel):
|
||||
class PathItem(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
ref: Optional[str] = Field(None, alias="$ref")
|
||||
summary: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
get: Optional[Operation] = None
|
||||
put: Optional[Operation] = None
|
||||
post: Optional[Operation] = None
|
||||
delete: Optional[Operation] = None
|
||||
options: Optional[Operation] = None
|
||||
head: Optional[Operation] = None
|
||||
patch: Optional[Operation] = None
|
||||
trace: Optional[Operation] = None
|
||||
servers: Optional[list[dict[str, Any]]] = None
|
||||
parameters: Optional[list[Parameter]] = None
|
||||
|
||||
|
||||
class Contact(BaseModel):
|
||||
name: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
|
||||
|
||||
class License(BaseModel):
|
||||
name: str
|
||||
url: Optional[str] = None
|
||||
|
||||
|
||||
class Info(BaseModel):
|
||||
title: str
|
||||
version: str
|
||||
description: Optional[str] = None
|
||||
terms_of_service: Optional[str] = Field(None, alias="termsOfService")
|
||||
contact: Optional[Contact] = None
|
||||
license: Optional[License] = None
|
||||
|
||||
|
||||
class ServerVariable(BaseModel):
|
||||
enum: Optional[list[str]] = None
|
||||
default: str
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class Server(BaseModel):
|
||||
url: str
|
||||
description: Optional[str] = None
|
||||
variables: Optional[dict[str, ServerVariable]] = None
|
||||
|
||||
|
||||
class SecurityScheme(BaseModel):
|
||||
type: str
|
||||
scheme: Optional[str] = None
|
||||
bearer_format: Optional[str] = Field(None, alias="bearerFormat")
|
||||
flows: Optional[dict[str, Any]] = None
|
||||
open_id_connect_url: Optional[str] = Field(None, alias="openIdConnectUrl")
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class Components(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
schemas: Optional[dict[str, Schema]] = None
|
||||
responses: Optional[dict[str, Response]] = None
|
||||
parameters: Optional[dict[str, Parameter]] = None
|
||||
request_bodies: Optional[dict[str, RequestBody]] = Field(None, alias="requestBodies")
|
||||
headers: Optional[dict[str, Any]] = None
|
||||
security_schemes: Optional[dict[str, SecurityScheme]] = Field(
|
||||
None, alias="securitySchemes"
|
||||
)
|
||||
links: Optional[dict[str, Any]] = None
|
||||
callbacks: Optional[dict[str, Any]] = None
|
||||
|
||||
|
||||
class Tag(BaseModel):
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
external_docs: Optional[dict[str, str]] = Field(None, alias="externalDocs")
|
||||
|
||||
|
||||
class OpenAPISpec(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
openapi: str
|
||||
info: Dict[str, Any]
|
||||
paths: Dict[str, Dict[str, Any]]
|
||||
tags: List[Dict[str, str]] = Field(default_factory=list)
|
||||
servers: List[Dict[str, str]] = Field(default_factory=list)
|
||||
components: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class RequestExample(BaseModel):
|
||||
method: str
|
||||
path: str
|
||||
headers: Dict[str, str] = Field(default_factory=dict)
|
||||
body: Optional[Any] = None
|
||||
|
||||
|
||||
class ResponseExample(BaseModel):
|
||||
status_code: int
|
||||
headers: Dict[str, str] = Field(default_factory=dict)
|
||||
body: Optional[Any] = None
|
||||
info: Info
|
||||
servers: Optional[list[Server]] = None
|
||||
paths: dict[str, PathItem]
|
||||
components: Optional[Components] = None
|
||||
security: Optional[list[dict[str, list[str]]]] = None
|
||||
tags: Optional[list[Tag]] = None
|
||||
external_docs: Optional[dict[str, str]] = Field(None, alias="externalDocs")
|
||||
|
||||
@@ -1,126 +1,297 @@
|
||||
import json
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from openapi_spec_validator import validate
|
||||
from openapi_spec_validator.versions.consts import OPENAPI_V3
|
||||
|
||||
from src.core.models import OpenAPISpec, Operation, Parameter, PathItem, Response, Schema
|
||||
|
||||
|
||||
class ParseError(Exception):
|
||||
"""Custom exception for parsing errors."""
|
||||
pass
|
||||
|
||||
|
||||
def load_spec_file(spec_path: str) -> Dict[str, Any]:
|
||||
"""Load an OpenAPI specification file.
|
||||
|
||||
Args:
|
||||
spec_path: Path to the OpenAPI spec file (JSON or YAML)
|
||||
|
||||
Returns:
|
||||
Parsed spec as a dictionary
|
||||
|
||||
Raises:
|
||||
ParseError: If the file cannot be loaded or parsed
|
||||
"""
|
||||
path = Path(spec_path)
|
||||
|
||||
if not path.exists():
|
||||
raise ParseError(f"File not found: {spec_path}")
|
||||
|
||||
try:
|
||||
if path.suffix == '.json':
|
||||
with open(path, 'r') as f:
|
||||
return json.load(f)
|
||||
elif path.suffix in ['.yaml', '.yml']:
|
||||
with open(path, 'r') as f:
|
||||
return yaml.safe_load(f)
|
||||
else:
|
||||
raise ParseError(f"Unsupported file format: {path.suffix}. Expected .json, .yaml, or .yml")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ParseError(f"Invalid JSON: {e}")
|
||||
except yaml.YAMLError as e:
|
||||
raise ParseError(f"Invalid YAML: {e}")
|
||||
|
||||
|
||||
def parse_openapi_spec(spec_path: str) -> Dict[str, Any]:
|
||||
"""Parse and validate an OpenAPI specification file.
|
||||
|
||||
Args:
|
||||
spec_path: Path to the OpenAPI spec file
|
||||
|
||||
Returns:
|
||||
Dictionary with 'valid' boolean and optional 'errors' list
|
||||
"""
|
||||
try:
|
||||
spec = load_spec_file(spec_path)
|
||||
class OpenAPIParser:
|
||||
def __init__(self, spec_data: dict[str, Any]):
|
||||
self.spec_data = spec_data
|
||||
self._resolved_refs: dict[str, Any] = {}
|
||||
self._components_schemas: dict[str, Schema] = {}
|
||||
self._components_responses: dict[str, Response] = {}
|
||||
self._components_request_bodies: dict[str, Any] = {}
|
||||
|
||||
def validate(self) -> list[str]:
|
||||
errors = []
|
||||
|
||||
if not isinstance(spec, dict):
|
||||
return {'valid': False, 'errors': ['Spec is not a dictionary']}
|
||||
|
||||
openapi_version = spec.get('openapi', '')
|
||||
if not openapi_version.startswith('3.'):
|
||||
errors.append(f"Expected OpenAPI 3.x version, got: {openapi_version}")
|
||||
|
||||
if 'info' not in spec:
|
||||
errors.append("Missing 'info' field")
|
||||
|
||||
if 'paths' not in spec:
|
||||
errors.append("Missing 'paths' field")
|
||||
|
||||
if errors:
|
||||
return {'valid': False, 'errors': errors}
|
||||
|
||||
try:
|
||||
validate(spec)
|
||||
validate(self.spec_data)
|
||||
except Exception as e:
|
||||
return {'valid': False, 'errors': [str(e)]}
|
||||
errors.append(str(e))
|
||||
return errors
|
||||
|
||||
def parse(self) -> OpenAPISpec:
|
||||
self._extract_components()
|
||||
return OpenAPISpec(
|
||||
openapi=self.spec_data.get("openapi", "3.0.0"),
|
||||
info=self._parse_info(),
|
||||
servers=self._parse_servers(),
|
||||
paths=self._parse_paths(),
|
||||
components=self._parse_components(),
|
||||
security=self.spec_data.get("security"),
|
||||
tags=self._parse_tags(),
|
||||
external_docs=self.spec_data.get("externalDocs"),
|
||||
)
|
||||
|
||||
def _extract_components(self) -> None:
|
||||
components = self.spec_data.get("components", {})
|
||||
if "schemas" in components:
|
||||
for name, schema_data in components["schemas"].items():
|
||||
self._components_schemas[name] = self._parse_schema(schema_data)
|
||||
if "responses" in components:
|
||||
self._components_responses = components["responses"]
|
||||
if "requestBodies" in components:
|
||||
self._components_request_bodies = components["requestBodies"]
|
||||
|
||||
def _parse_info(self) -> dict[str, Any]:
|
||||
info_data = self.spec_data.get("info", {})
|
||||
contact_data = info_data.get("contact", {})
|
||||
license_data = info_data.get("license", {})
|
||||
return {
|
||||
'valid': True,
|
||||
'spec': spec,
|
||||
'version': openapi_version,
|
||||
'title': spec.get('info', {}).get('title', 'Untitled'),
|
||||
'version_num': spec.get('info', {}).get('version', '1.0.0'),
|
||||
'endpoints_count': count_endpoints(spec),
|
||||
'tags': spec.get('tags', [])
|
||||
"title": info_data.get("title", "API"),
|
||||
"version": info_data.get("version", "1.0.0"),
|
||||
"description": info_data.get("description"),
|
||||
"terms_of_service": info_data.get("termsOfService"),
|
||||
"contact": {
|
||||
"name": contact_data.get("name"),
|
||||
"url": contact_data.get("url"),
|
||||
"email": contact_data.get("email"),
|
||||
} if contact_data else None,
|
||||
"license": {
|
||||
"name": license_data.get("name", ""),
|
||||
"url": license_data.get("url"),
|
||||
} if license_data else None,
|
||||
}
|
||||
|
||||
except ParseError as e:
|
||||
return {'valid': False, 'errors': [str(e)]}
|
||||
except Exception as e:
|
||||
return {'valid': False, 'errors': [f"Unexpected error: {e}"]}
|
||||
def _parse_servers(self) -> Optional[list[dict[str, Any]]]:
|
||||
servers = self.spec_data.get("servers", [])
|
||||
return [{"url": s.get("url", "/"), "description": s.get("description")} for s in servers]
|
||||
|
||||
def _parse_paths(self) -> dict[str, PathItem]:
|
||||
paths = {}
|
||||
for path, path_item in self.spec_data.get("paths", {}).items():
|
||||
if path.startswith("/"):
|
||||
path_item_data = path_item if path_item else {}
|
||||
paths[path] = self._parse_path_item(path_item_data)
|
||||
return paths
|
||||
|
||||
def count_endpoints(spec: Dict[str, Any]) -> int:
|
||||
"""Count the total number of endpoints in the spec."""
|
||||
count = 0
|
||||
for path, methods in spec.get('paths', {}).items():
|
||||
for method in methods:
|
||||
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
||||
count += 1
|
||||
return count
|
||||
def _parse_path_item(self, data: dict[str, Any]) -> PathItem:
|
||||
operations = {}
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in data:
|
||||
operations[method] = self._parse_operation(data[method])
|
||||
return PathItem(
|
||||
ref=data.get("$ref"),
|
||||
summary=data.get("summary"),
|
||||
description=data.get("description"),
|
||||
servers=data.get("servers"),
|
||||
parameters=self._parse_parameters(data.get("parameters", [])),
|
||||
**operations,
|
||||
)
|
||||
|
||||
def _parse_operation(self, data: dict[str, Any]) -> Operation:
|
||||
parameters = data.get("parameters", [])
|
||||
request_body = data.get("requestBody")
|
||||
responses = {}
|
||||
for status_code, response_data in data.get("responses", {}).items():
|
||||
responses[status_code] = self._parse_response(response_data)
|
||||
return Operation(
|
||||
tags=data.get("tags"),
|
||||
summary=data.get("summary"),
|
||||
description=data.get("description"),
|
||||
external_docs=data.get("externalDocs"),
|
||||
operation_id=data.get("operationId"),
|
||||
parameters=self._parse_parameters(parameters),
|
||||
request_body=self._parse_request_body(request_body) if request_body else None,
|
||||
responses=responses,
|
||||
deprecated=data.get("deprecated"),
|
||||
security=data.get("security"),
|
||||
servers=data.get("servers"),
|
||||
)
|
||||
|
||||
def extract_endpoints(spec: Dict[str, Any]) -> list:
|
||||
"""Extract all endpoints from the spec."""
|
||||
endpoints = []
|
||||
for path, methods in spec.get('paths', {}).items():
|
||||
for method, details in methods.items():
|
||||
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
||||
endpoint = {
|
||||
'path': path,
|
||||
'method': method.upper(),
|
||||
'summary': details.get('summary'),
|
||||
'description': details.get('description'),
|
||||
'operation_id': details.get('operationId'),
|
||||
'tags': details.get('tags', []),
|
||||
'parameters': details.get('parameters', []),
|
||||
'request_body': details.get('requestBody'),
|
||||
'responses': details.get('responses', {}),
|
||||
'deprecated': details.get('deprecated', False)
|
||||
def _parse_parameters(self, params: list[dict[str, Any]]) -> list[Parameter]:
|
||||
return [
|
||||
Parameter(
|
||||
name=p.get("name", ""),
|
||||
in_=p.get("in", "query"),
|
||||
description=p.get("description"),
|
||||
required=p.get("required"),
|
||||
deprecated=p.get("deprecated"),
|
||||
allow_empty_value=p.get("allowEmptyValue"),
|
||||
style=p.get("style"),
|
||||
explode=p.get("explode"),
|
||||
allow_reserved=p.get("allowReserved"),
|
||||
schema=self._parse_schema(p.get("schema")) if p.get("schema") else None,
|
||||
example=p.get("example"),
|
||||
examples=p.get("examples"),
|
||||
)
|
||||
for p in params
|
||||
]
|
||||
|
||||
def _parse_response(self, data: dict[str, Any]) -> Response:
|
||||
content = {}
|
||||
for content_type, content_data in data.get("content", {}).items():
|
||||
content[content_type] = {
|
||||
"schema": self._parse_schema(content_data.get("schema"))
|
||||
if content_data.get("schema") else None,
|
||||
"example": content_data.get("example"),
|
||||
"examples": content_data.get("examples"),
|
||||
}
|
||||
endpoints.append(endpoint)
|
||||
return endpoints
|
||||
return Response(
|
||||
description=data.get("description", ""),
|
||||
content=content,
|
||||
headers=data.get("headers"),
|
||||
links=data.get("links"),
|
||||
)
|
||||
|
||||
def _parse_request_body(self, data: dict[str, Any]) -> dict[str, Any]:
|
||||
content = {}
|
||||
for content_type, content_data in data.get("content", {}).items():
|
||||
content[content_type] = {
|
||||
"schema": self._parse_schema(content_data.get("schema"))
|
||||
if content_data.get("schema") else None,
|
||||
"example": content_data.get("example"),
|
||||
"examples": content_data.get("examples"),
|
||||
}
|
||||
return {
|
||||
"description": data.get("description"),
|
||||
"required": data.get("required"),
|
||||
"content": content,
|
||||
}
|
||||
|
||||
def _parse_schema(self, data: Any) -> Optional[Schema]:
|
||||
if data is None:
|
||||
return None
|
||||
if isinstance(data, dict):
|
||||
if "$ref" in data:
|
||||
ref = data["$ref"]
|
||||
resolved = self._resolve_ref(ref)
|
||||
if resolved:
|
||||
return self._parse_schema(resolved)
|
||||
schema_data = dict(data)
|
||||
for key in ["allOf", "anyOf", "oneOf", "not"]:
|
||||
if key in schema_data:
|
||||
nested = schema_data[key]
|
||||
if isinstance(nested, list):
|
||||
schema_data[key] = [
|
||||
self._parse_schema(item) if isinstance(item, dict) else item
|
||||
for item in nested
|
||||
]
|
||||
elif isinstance(nested, dict):
|
||||
schema_data[key] = self._parse_schema(nested)
|
||||
if "items" in schema_data and isinstance(schema_data["items"], dict):
|
||||
schema_data["items"] = self._parse_schema(schema_data["items"])
|
||||
if "properties" in schema_data:
|
||||
schema_data["properties"] = {
|
||||
k: self._parse_schema(v) if isinstance(v, dict) else v
|
||||
for k, v in schema_data["properties"].items()
|
||||
}
|
||||
if "additionalProperties" in schema_data and isinstance(
|
||||
schema_data["additionalProperties"], dict
|
||||
):
|
||||
schema_data["additionalProperties"] = self._parse_schema(
|
||||
schema_data["additionalProperties"]
|
||||
)
|
||||
return Schema(**schema_data)
|
||||
return None
|
||||
|
||||
def _resolve_ref(self, ref: str) -> Optional[dict[str, Any]]:
|
||||
if ref in self._resolved_refs:
|
||||
return self._resolved_refs[ref]
|
||||
if ref.startswith("#/components/"):
|
||||
parts = ref.split("/")[2:]
|
||||
current = self.spec_data.get("components", {})
|
||||
for part in parts:
|
||||
if isinstance(current, dict) and part in current:
|
||||
current = current[part]
|
||||
else:
|
||||
return None
|
||||
self._resolved_refs[ref] = current
|
||||
return current
|
||||
return None
|
||||
|
||||
def _parse_components(self) -> Optional[dict[str, Any]]:
|
||||
components = self.spec_data.get("components")
|
||||
if not components:
|
||||
return None
|
||||
security_schemes = {}
|
||||
for name, scheme in components.get("securitySchemes", {}).items():
|
||||
security_schemes[name] = {
|
||||
"type": scheme.get("type"),
|
||||
"scheme": scheme.get("scheme"),
|
||||
"bearer_format": scheme.get("bearerFormat"),
|
||||
"flows": scheme.get("flows"),
|
||||
"open_id_connect_url": scheme.get("openIdConnectUrl"),
|
||||
"description": scheme.get("description"),
|
||||
}
|
||||
return {
|
||||
"schemas": self._components_schemas,
|
||||
"responses": self._components_responses,
|
||||
"parameters": components.get("parameters"),
|
||||
"request_bodies": self._components_request_bodies,
|
||||
"headers": components.get("headers"),
|
||||
"security_schemes": security_schemes,
|
||||
"links": components.get("links"),
|
||||
"callbacks": components.get("callbacks"),
|
||||
}
|
||||
|
||||
def _parse_tags(self) -> Optional[list[dict[str, Any]]]:
|
||||
tags = self.spec_data.get("tags", [])
|
||||
return [
|
||||
{
|
||||
"name": t.get("name"),
|
||||
"description": t.get("description"),
|
||||
"external_docs": t.get("externalDocs"),
|
||||
}
|
||||
for t in tags
|
||||
]
|
||||
|
||||
|
||||
def _basic_validate(spec_data: dict[str, Any]) -> tuple:
|
||||
errors = []
|
||||
if not isinstance(spec_data, dict):
|
||||
errors.append("Spec must be a dictionary")
|
||||
return False, errors
|
||||
if "openapi" not in spec_data:
|
||||
errors.append("Missing 'openapi' version")
|
||||
return False, errors
|
||||
if "info" not in spec_data:
|
||||
errors.append("Missing 'info' object")
|
||||
return False, errors
|
||||
info = spec_data.get("info", {})
|
||||
if not isinstance(info, dict):
|
||||
errors.append("'info' must be an object")
|
||||
return False, errors
|
||||
if "title" not in info:
|
||||
errors.append("Missing 'info.title'")
|
||||
return False, errors
|
||||
if "version" not in info:
|
||||
errors.append("Missing 'info.version'")
|
||||
return False, errors
|
||||
return True, []
|
||||
|
||||
|
||||
def parse_openapi_spec(spec_source: str | Path | dict[str, Any]) -> OpenAPISpec:
|
||||
if isinstance(spec_source, dict):
|
||||
spec_data = spec_source
|
||||
elif isinstance(spec_source, Path):
|
||||
spec_data = _load_file(spec_source)
|
||||
else:
|
||||
spec_data = _load_file(Path(spec_source))
|
||||
parser = OpenAPIParser(spec_data)
|
||||
errors = parser.validate()
|
||||
if errors:
|
||||
raise ValueError(f"Invalid OpenAPI spec: {errors}")
|
||||
return parser.parse()
|
||||
|
||||
|
||||
def _load_file(path: Path) -> dict[str, Any]:
|
||||
content = path.read_text()
|
||||
if path.suffix in [".yaml", ".yml"]:
|
||||
import yaml
|
||||
|
||||
return yaml.safe_load(content)
|
||||
return json.loads(content)
|
||||
|
||||
@@ -1 +1,23 @@
|
||||
# Templates for documentation generation
|
||||
"""Template module for documentation generation."""
|
||||
|
||||
import os
|
||||
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
TEMPLATES_DIR = os.path.join(os.path.dirname(__file__))
|
||||
|
||||
def startswith(s, prefix):
|
||||
return s.startswith(prefix) if s else False
|
||||
|
||||
env = Environment(
|
||||
loader=FileSystemLoader(TEMPLATES_DIR),
|
||||
trim_blocks=True,
|
||||
lstrip_blocks=True,
|
||||
)
|
||||
env.filters["startswith"] = startswith
|
||||
|
||||
HTML_TEMPLATE = env.get_template("html_template.html")
|
||||
MARKDOWN_TEMPLATE = env.get_template("markdown_template.md")
|
||||
JSON_TEMPLATE = env.get_template("json_template.json")
|
||||
|
||||
__all__ = ["HTML_TEMPLATE", "MARKDOWN_TEMPLATE", "JSON_TEMPLATE", "env"]
|
||||
|
||||
@@ -1 +1,4 @@
|
||||
# Utility functions for LocalAPI Docs
|
||||
from src.core.models import OpenAPISpec
|
||||
from src.core.parser import parse_openapi_spec
|
||||
from src.utils.examples import generate_examples_from_schema
|
||||
from src.utils.search import SearchIndex, create_search_index, search_index
|
||||
|
||||
@@ -1,146 +1,183 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
import random
|
||||
from typing import Any
|
||||
|
||||
from src.core.models import Schema
|
||||
|
||||
|
||||
FAKE_DATA = {
|
||||
'names': ['John', 'Jane', 'Bob', 'Alice', 'Charlie', 'Diana', 'Eve', 'Frank'],
|
||||
'domains': ['example.com', 'test.org', 'sample.net', 'demo.io'],
|
||||
'cities': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'],
|
||||
'streets': ['Main St', 'Oak Ave', 'Maple Dr', 'Cedar Ln', 'Pine Rd'],
|
||||
'countries': ['USA', 'Canada', 'UK', 'Germany', 'France'],
|
||||
'companies': ['Acme Corp', 'TechStart', 'Global Inc', 'Local LLC', 'Digital Co'],
|
||||
'job_titles': ['Engineer', 'Manager', 'Designer', 'Developer', 'Analyst'],
|
||||
'departments': ['Engineering', 'Marketing', 'Sales', 'HR', 'Finance'],
|
||||
'products': ['Widget', 'Gadget', 'Tool', 'Device', 'Component'],
|
||||
'adjectives': ['Premium', 'Essential', 'Professional', 'Standard', 'Deluxe'],
|
||||
'lorem_words': ['lorem', 'ipsum', 'dolor', 'sit', 'amet', 'consectetur', 'adipiscing', 'elit'],
|
||||
'statuses': ['active', 'pending', 'completed', 'cancelled', 'archived'],
|
||||
'id_prefixes': ['usr_', 'ord_', 'prd_', 'inv_', 'txn_']
|
||||
}
|
||||
class ExampleGenerator:
|
||||
def __init__(self, components_schemas: dict[str, Schema] | None = None):
|
||||
self.components_schemas = components_schemas or {}
|
||||
|
||||
def generate(self, schema: Schema | dict) -> Any:
|
||||
if schema is None:
|
||||
return None
|
||||
if isinstance(schema, dict):
|
||||
schema = Schema(**schema)
|
||||
schema_dict = schema.model_dump(exclude_none=True)
|
||||
return self._generate_from_schema(schema, schema_dict)
|
||||
|
||||
def generate_id(prefix: str = None) -> str:
|
||||
prefix = prefix or random.choice(FAKE_DATA['id_prefixes'])
|
||||
return f"{prefix}{random.randint(10000, 99999)}"
|
||||
|
||||
|
||||
def generate_name() -> str:
|
||||
first = random.choice(FAKE_DATA['names'])
|
||||
last = random.choice(FAKE_DATA['names'])
|
||||
return f"{first} {last}"
|
||||
|
||||
|
||||
def generate_email(name: str = None) -> str:
|
||||
name = (name or generate_name()).lower().replace(' ', '.')
|
||||
domain = random.choice(FAKE_DATA['domains'])
|
||||
return f"{name}@{domain}"
|
||||
|
||||
|
||||
def generate_phone() -> str:
|
||||
return f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}"
|
||||
|
||||
|
||||
def generate_address() -> Dict[str, Any]:
|
||||
return {
|
||||
'street': f"{random.randint(100, 9999)} {random.choice(FAKE_DATA['streets'])}",
|
||||
'city': random.choice(FAKE_DATA['cities']),
|
||||
'state': f"{random.choice(['CA', 'NY', 'TX', 'FL', 'IL'])}",
|
||||
'zip': f"{random.randint(10000, 99999)}",
|
||||
'country': random.choice(FAKE_DATA['countries'])
|
||||
}
|
||||
|
||||
|
||||
def generate_company() -> Dict[str, Any]:
|
||||
adj = random.choice(FAKE_DATA['adjectives'])
|
||||
product = random.choice(FAKE_DATA['products'])
|
||||
return {
|
||||
'name': f"{adj} {product} {random.choice(FAKE_DATA['companies'])}",
|
||||
'industry': random.choice(['Technology', 'Healthcare', 'Finance', 'Retail', 'Manufacturing']),
|
||||
'employees': random.randint(10, 10000),
|
||||
'founded': random.randint(1950, 2023)
|
||||
}
|
||||
|
||||
|
||||
def generate_user() -> Dict[str, Any]:
|
||||
return {
|
||||
'id': generate_id('usr_'),
|
||||
'name': generate_name(),
|
||||
'email': generate_email(),
|
||||
'phone': generate_phone(),
|
||||
'address': generate_address(),
|
||||
'created_at': '2024-01-15T10:30:00Z',
|
||||
'status': random.choice(FAKE_DATA['statuses'])
|
||||
}
|
||||
|
||||
|
||||
def generate_product() -> Dict[str, Any]:
|
||||
adj = random.choice(FAKE_DATA['adjectives'])
|
||||
product = random.choice(FAKE_DATA['products'])
|
||||
return {
|
||||
'id': generate_id('prd_'),
|
||||
'name': f"{adj} {product}",
|
||||
'description': ' '.join(random.choices(FAKE_DATA['lorem_words'], k=10)),
|
||||
'price': round(random.uniform(9.99, 999.99), 2),
|
||||
'sku': f"SKU-{random.randint(10000, 99999)}",
|
||||
'in_stock': random.choice([True, False]),
|
||||
'category': random.choice(['Electronics', 'Clothing', 'Home', 'Sports', 'Books'])
|
||||
}
|
||||
|
||||
|
||||
def generate_order() -> Dict[str, Any]:
|
||||
return {
|
||||
'id': generate_id('ord_'),
|
||||
'customer_id': generate_id('usr_'),
|
||||
'items': [generate_product() for _ in range(random.randint(1, 5))],
|
||||
'total': round(random.uniform(50, 2000), 2),
|
||||
'status': random.choice(FAKE_DATA['statuses']),
|
||||
'created_at': '2024-01-15T14:30:00Z'
|
||||
}
|
||||
|
||||
|
||||
def generate(schema: Dict[str, Any], depth: int = 0) -> Any:
|
||||
if depth > 3:
|
||||
def _generate_from_schema(self, schema: Schema, schema_dict: dict) -> Any:
|
||||
schema_type = schema.type
|
||||
if schema_type == "string":
|
||||
return self._generate_string(schema)
|
||||
elif schema_type == "integer":
|
||||
return self._generate_integer(schema)
|
||||
elif schema_type == "number":
|
||||
return self._generate_number(schema)
|
||||
elif schema_type == "boolean":
|
||||
return self._generate_boolean(schema)
|
||||
elif schema_type == "array":
|
||||
return self._generate_array(schema, schema_dict)
|
||||
elif schema_type == "object":
|
||||
return self._generate_object(schema, schema_dict)
|
||||
elif schema.all_of:
|
||||
return self._generate_all_of(schema.all_of, schema_dict)
|
||||
elif schema.any_of:
|
||||
return self._generate_any_of(schema.any_of, schema_dict)
|
||||
elif schema.one_of:
|
||||
return self._generate_one_of(schema.one_of, schema_dict)
|
||||
elif schema.not_:
|
||||
return None
|
||||
elif schema.ref:
|
||||
return self._resolve_ref(schema.ref)
|
||||
elif schema.enum:
|
||||
return schema.enum[0] if schema.enum else None
|
||||
if schema.default is not None:
|
||||
return schema.default
|
||||
return None
|
||||
|
||||
if not schema:
|
||||
return None
|
||||
def _generate_string(self, schema: Schema) -> str:
|
||||
if schema.example is not None:
|
||||
return str(schema.example)
|
||||
if schema.enum:
|
||||
return str(schema.enum[0])
|
||||
format_str = schema.format
|
||||
if format_str == "date-time":
|
||||
return "2024-01-01T00:00:00Z"
|
||||
elif format_str == "date":
|
||||
return "2024-01-01"
|
||||
elif format_str == "email":
|
||||
return "user@example.com"
|
||||
elif format_str == "uri":
|
||||
return "https://example.com"
|
||||
elif format_str == "uuid":
|
||||
return "550e8400-e29b-41d4-a716-446655440000"
|
||||
elif format_str == "hostname":
|
||||
return "example.com"
|
||||
elif format_str == "ipv4":
|
||||
return "192.168.1.1"
|
||||
elif format_str == "ipv6":
|
||||
return "::1"
|
||||
return "string"
|
||||
|
||||
schema_type = schema.get('type', 'object')
|
||||
def _generate_integer(self, schema: Schema) -> int:
|
||||
if schema.example is not None:
|
||||
try:
|
||||
return int(schema.example)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if schema.default is not None:
|
||||
try:
|
||||
return int(schema.default)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
return 0
|
||||
|
||||
if schema_type == 'object' and 'properties' in schema:
|
||||
def _generate_number(self, schema: Schema) -> float:
|
||||
if schema.example is not None:
|
||||
try:
|
||||
return float(schema.example)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if schema.default is not None:
|
||||
try:
|
||||
return float(schema.default)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
return 0.0
|
||||
|
||||
def _generate_boolean(self, schema: Schema) -> bool:
|
||||
if schema.example is not None:
|
||||
return bool(schema.example)
|
||||
if schema.default is not None:
|
||||
return bool(schema.default)
|
||||
return False
|
||||
|
||||
def _generate_array(self, schema: Schema, schema_dict: dict) -> list:
|
||||
items = schema.items
|
||||
if items is None and "items" in schema_dict:
|
||||
items = schema_dict["items"]
|
||||
if items is None:
|
||||
return []
|
||||
if isinstance(items, dict):
|
||||
items = Schema(**items)
|
||||
has_dump = hasattr(items, "model_dump")
|
||||
example = self._generate_from_schema(
|
||||
items, items.model_dump() if has_dump else items
|
||||
)
|
||||
return [example]
|
||||
|
||||
def _generate_object(self, schema: Schema, schema_dict: dict) -> dict:
|
||||
result = {}
|
||||
for prop_name, prop_schema in schema['properties'].items():
|
||||
required = schema.get('required', [])
|
||||
if prop_name in required or random.choice([True, False]):
|
||||
result[prop_name] = generate(prop_schema, depth + 1)
|
||||
properties = schema.properties or schema_dict.get("properties", {})
|
||||
for prop_name, prop_schema in properties.items():
|
||||
if prop_schema is None:
|
||||
continue
|
||||
if isinstance(prop_schema, dict):
|
||||
prop_schema = Schema(**prop_schema)
|
||||
has_dump = hasattr(prop_schema, "model_dump")
|
||||
result[prop_name] = self._generate_from_schema(
|
||||
prop_schema, prop_schema.model_dump() if has_dump else prop_schema
|
||||
)
|
||||
return result
|
||||
|
||||
elif schema_type == 'array':
|
||||
item_schema = schema.get('items', {})
|
||||
return [generate(item_schema, depth + 1) for _ in range(random.randint(1, 3))]
|
||||
def _generate_all_of(self, schemas: list, schema_dict: dict) -> dict:
|
||||
result = {}
|
||||
for s in schemas:
|
||||
if s is None:
|
||||
continue
|
||||
if isinstance(s, dict):
|
||||
s = Schema(**s)
|
||||
has_dump = hasattr(s, "model_dump")
|
||||
partial = self._generate_from_schema(s, s.model_dump() if has_dump else s)
|
||||
if isinstance(partial, dict):
|
||||
result.update(partial)
|
||||
return result
|
||||
|
||||
elif schema_type == 'string':
|
||||
string_format = schema.get('format')
|
||||
if string_format == 'date-time':
|
||||
return '2024-01-15T10:30:00Z'
|
||||
elif string_format == 'date':
|
||||
return '2024-01-15'
|
||||
elif string_format == 'email':
|
||||
return generate_email()
|
||||
elif string_format == 'uri':
|
||||
return 'https://example.com/api'
|
||||
elif string_format == 'uuid':
|
||||
return '550e8400-e29b-41d4-a716-446655440000'
|
||||
else:
|
||||
return random.choice(['sample', 'example', 'test', 'demo'])
|
||||
|
||||
elif schema_type == 'integer' or schema_type == 'number':
|
||||
return random.randint(1, 1000)
|
||||
|
||||
elif schema_type == 'boolean':
|
||||
return random.choice([True, False])
|
||||
|
||||
elif schema_type == 'null':
|
||||
def _generate_any_of(self, schemas: list, schema_dict: dict) -> Any:
|
||||
for s in schemas:
|
||||
if s is None:
|
||||
continue
|
||||
if isinstance(s, dict):
|
||||
s = Schema(**s)
|
||||
has_dump = hasattr(s, "model_dump")
|
||||
result = self._generate_from_schema(s, s.model_dump() if has_dump else s)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
def _generate_one_of(self, schemas: list, schema_dict: dict) -> Any:
|
||||
for s in schemas:
|
||||
if s is None:
|
||||
continue
|
||||
if isinstance(s, dict):
|
||||
s = Schema(**s)
|
||||
has_dump = hasattr(s, "model_dump")
|
||||
result = self._generate_from_schema(s, s.model_dump() if has_dump else s)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
def _resolve_ref(self, ref: str) -> Any:
|
||||
if ref.startswith("#/components/schemas/"):
|
||||
schema_name = ref.split("/")[-1]
|
||||
if schema_name in self.components_schemas:
|
||||
return self.generate(self.components_schemas[schema_name])
|
||||
return None
|
||||
|
||||
|
||||
def generate_examples_from_schema(
|
||||
schema: Schema | dict,
|
||||
components_schemas: dict[str, Schema] | None = None,
|
||||
) -> Any:
|
||||
generator = ExampleGenerator(components_schemas)
|
||||
return generator.generate(schema)
|
||||
|
||||
@@ -1,66 +1,127 @@
|
||||
from typing import List, Dict, Any
|
||||
from ..core.parser import load_spec_file
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
def search_endpoints(spec_path: str, query: str, limit: int = 10) -> List[Dict[str, Any]]:
|
||||
"""Search for endpoints matching the query.
|
||||
@dataclass
|
||||
class SearchResult:
|
||||
path: str
|
||||
method: str
|
||||
operation_id: str | None
|
||||
summary: str | None
|
||||
description: str | None
|
||||
tags: list[str]
|
||||
matched_terms: list[str]
|
||||
score: float
|
||||
|
||||
Args:
|
||||
spec_path: Path to the OpenAPI spec file
|
||||
query: Search query string
|
||||
limit: Maximum number of results to return
|
||||
|
||||
Returns:
|
||||
List of matching endpoints
|
||||
"""
|
||||
spec = load_spec_file(spec_path)
|
||||
results = []
|
||||
@dataclass
|
||||
class SearchIndex:
|
||||
paths: dict[str, dict[str, Any]] = field(default_factory=dict)
|
||||
schemas: dict[str, dict[str, Any]] = field(default_factory=dict)
|
||||
tags: list[str] = field(default_factory=list)
|
||||
|
||||
def add_path(self, path: str, methods: dict[str, Any]) -> None:
|
||||
self.paths[path] = methods
|
||||
|
||||
def add_schema(self, name: str, schema: dict[str, Any]) -> None:
|
||||
self.schemas[name] = schema
|
||||
|
||||
def add_tag(self, tag: str) -> None:
|
||||
if tag not in self.tags:
|
||||
self.tags.append(tag)
|
||||
|
||||
|
||||
def create_search_index(spec: dict[str, Any]) -> SearchIndex:
|
||||
index = SearchIndex()
|
||||
for tag in spec.get("tags", []):
|
||||
if isinstance(tag, dict):
|
||||
index.add_tag(tag.get("name", ""))
|
||||
else:
|
||||
index.add_tag(tag)
|
||||
for path, path_item in spec.get("paths", {}).items():
|
||||
if hasattr(path_item, 'model_dump'):
|
||||
path_item = path_item.model_dump()
|
||||
methods = {}
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in path_item and path_item[method]:
|
||||
op = path_item[method]
|
||||
methods[method] = {
|
||||
"summary": op.get("summary"),
|
||||
"description": op.get("description"),
|
||||
"operation_id": op.get("operationId"),
|
||||
"tags": op.get("tags", []),
|
||||
"parameters": op.get("parameters", []),
|
||||
"request_body": op.get("requestBody"),
|
||||
"responses": op.get("responses", {}),
|
||||
}
|
||||
index.add_path(path, methods)
|
||||
components = spec.get("components") or {}
|
||||
for name, schema in components.get("schemas", {}).items():
|
||||
index.add_schema(name, schema)
|
||||
return index
|
||||
|
||||
|
||||
def search_index(index: SearchIndex, query: str) -> list[SearchResult]:
|
||||
query_lower = query.lower()
|
||||
|
||||
for path, methods in spec.get('paths', {}).items():
|
||||
for method, details in methods.items():
|
||||
if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
||||
continue
|
||||
|
||||
match_score = 0
|
||||
matches = []
|
||||
|
||||
path_match = query_lower in path.lower()
|
||||
if path_match:
|
||||
match_score += 10
|
||||
matches.append(f"Path: {path}")
|
||||
|
||||
summary = details.get('summary', '') or ''
|
||||
if query_lower in summary.lower():
|
||||
match_score += 5
|
||||
matches.append(f"Summary: {summary}")
|
||||
|
||||
description = details.get('description', '') or ''
|
||||
if query_lower in description.lower():
|
||||
match_score += 3
|
||||
matches.append(f"Description: {description[:100]}...")
|
||||
|
||||
tags = details.get('tags', [])
|
||||
for tag in tags:
|
||||
if query_lower in tag.lower():
|
||||
match_score += 4
|
||||
matches.append(f"Tag: {tag}")
|
||||
|
||||
operation_id = details.get('operationId', '') or ''
|
||||
if query_lower in operation_id.lower():
|
||||
match_score += 2
|
||||
|
||||
if match_score > 0:
|
||||
results.append({
|
||||
'path': path,
|
||||
'method': method.upper(),
|
||||
'summary': details.get('summary'),
|
||||
'description': details.get('description'),
|
||||
'tags': tags,
|
||||
'operation_id': operation_id,
|
||||
'score': match_score,
|
||||
'matches': matches
|
||||
})
|
||||
|
||||
results.sort(key=lambda x: x['score'], reverse=True)
|
||||
return results[:limit]
|
||||
query_terms = re.findall(r'\w+', query_lower)
|
||||
results = []
|
||||
for path, methods in index.paths.items():
|
||||
for method, op_data in methods.items():
|
||||
score = 0.0
|
||||
matched_terms = []
|
||||
for term in query_terms:
|
||||
term_score = 0.0
|
||||
if term in path.lower():
|
||||
term_score += 5.0
|
||||
summary = op_data.get("summary", "") or ""
|
||||
if term in summary.lower():
|
||||
term_score += 3.0
|
||||
description = op_data.get("description", "") or ""
|
||||
if term in description.lower():
|
||||
term_score += 2.0
|
||||
operation_id = op_data.get("operation_id", "") or ""
|
||||
if term in operation_id.lower():
|
||||
term_score += 4.0
|
||||
for tag in op_data.get("tags", []):
|
||||
if term in tag.lower():
|
||||
term_score += 2.0
|
||||
if term_score > 0:
|
||||
score += term_score
|
||||
matched_terms.append(term)
|
||||
if score > 0:
|
||||
results.append(SearchResult(
|
||||
path=path,
|
||||
method=method.upper(),
|
||||
operation_id=op_data.get("operation_id"),
|
||||
summary=op_data.get("summary"),
|
||||
description=op_data.get("description"),
|
||||
tags=op_data.get("tags", []),
|
||||
matched_terms=matched_terms,
|
||||
score=score,
|
||||
))
|
||||
for schema_name, schema in index.schemas.items():
|
||||
score = 0.0
|
||||
matched_terms = []
|
||||
for term in query_terms:
|
||||
term_score = 0.0
|
||||
if term in schema_name.lower():
|
||||
term_score += 3.0
|
||||
schema_desc = schema.get("description", "") or ""
|
||||
if term in schema_desc.lower():
|
||||
term_score += 2.0
|
||||
if term_score > 0:
|
||||
score += term_score
|
||||
matched_terms.append(term)
|
||||
if score > 0:
|
||||
results.append(SearchResult(
|
||||
path=f"#/components/schemas/{schema_name}",
|
||||
method="SCHEMA",
|
||||
operation_id=None,
|
||||
summary=schema_name,
|
||||
description=schema.get("description"),
|
||||
tags=[],
|
||||
matched_terms=matched_terms,
|
||||
score=score,
|
||||
))
|
||||
return sorted(results, key=lambda x: x.score, reverse=True)
|
||||
|
||||
190
src/utils/templates.py
Normal file
190
src/utils/templates.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import http.server
|
||||
import json
|
||||
import shutil
|
||||
import socketserver
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import click
|
||||
import jinja2
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
from src.core.parser import parse_openapi_spec
|
||||
from src.utils.examples import ExampleGenerator
|
||||
|
||||
|
||||
class Jinja2Loader(BaseLoader):
|
||||
def __init__(self, templates_dir: Path):
|
||||
self.templates_dir = templates_dir
|
||||
|
||||
def get_source(self, environment: jinja2.Environment, template: str) -> tuple:
|
||||
path = self.templates_dir / template
|
||||
if not path.exists():
|
||||
raise jinja2.TemplateNotFound(template)
|
||||
return path.read_text(), str(path), lambda: True
|
||||
|
||||
|
||||
def generate_html(spec_path: str, output_path: str, template_path: str | None = None) -> None:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
if template_path:
|
||||
template_dir = Path(template_path).parent
|
||||
else:
|
||||
template_dir = Path(__file__).parent.parent / "templates"
|
||||
loader = Jinja2Loader(template_dir)
|
||||
env = jinja2.Environment(loader=loader)
|
||||
env.filters["tojson"] = lambda x: json.dumps(x, indent=2)
|
||||
template = env.get_template(Path(template_path).name if template_path else "html_template.html")
|
||||
spec_dict = spec.model_dump()
|
||||
components_schemas = spec_dict.get("components", {}).get("schemas", {})
|
||||
generator = ExampleGenerator(components_schemas)
|
||||
paths = spec_dict.get("paths", {})
|
||||
for _path, path_item in paths.items():
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in path_item:
|
||||
op = path_item[method]
|
||||
if "requestBody" in op:
|
||||
rb = op["requestBody"]
|
||||
if "content" in rb:
|
||||
for _ct, content in rb["content"].items():
|
||||
if "schema" in content:
|
||||
content["example"] = generator.generate(content["schema"])
|
||||
info = spec_dict["info"]
|
||||
tags = spec_dict.get("tags", [])
|
||||
endpoints_by_tag: dict[str, dict[str, dict[str, Any]]] = {}
|
||||
for path, path_item in paths.items():
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in path_item:
|
||||
op = path_item[method]
|
||||
op_tags = op.get("tags", ["Other"])
|
||||
for tag in op_tags:
|
||||
if tag not in endpoints_by_tag:
|
||||
endpoints_by_tag[tag] = {}
|
||||
if path not in endpoints_by_tag[tag]:
|
||||
endpoints_by_tag[tag][path] = {}
|
||||
endpoints_by_tag[tag][path][method] = op
|
||||
servers = spec_dict.get("servers", [])
|
||||
components = spec_dict.get("components", {})
|
||||
output = template.render(
|
||||
spec=spec_dict,
|
||||
info=info,
|
||||
paths=paths,
|
||||
servers=servers,
|
||||
tags=tags,
|
||||
endpoints_by_tag=endpoints_by_tag,
|
||||
components=components,
|
||||
security=spec_dict.get("security", []),
|
||||
external_docs=spec_dict.get("externalDocs"),
|
||||
)
|
||||
Path(output_path).write_text(output)
|
||||
|
||||
|
||||
def generate_markdown(spec_path: str, output_path: str, template_path: str | None = None) -> None:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
if template_path:
|
||||
template_dir = Path(template_path).parent
|
||||
else:
|
||||
template_dir = Path(__file__).parent.parent / "templates"
|
||||
loader = Jinja2Loader(template_dir)
|
||||
env = jinja2.Environment(loader=loader)
|
||||
env.filters["tojson"] = lambda x: json.dumps(x, indent=2)
|
||||
template = env.get_template(
|
||||
Path(template_path).name if template_path else "markdown_template.md"
|
||||
)
|
||||
spec_dict = spec.model_dump()
|
||||
components_schemas = spec_dict.get("components", {}).get("schemas", {})
|
||||
generator = ExampleGenerator(components_schemas)
|
||||
paths = spec_dict.get("paths", {})
|
||||
for _path, path_item in paths.items():
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in path_item:
|
||||
op = path_item[method]
|
||||
if "requestBody" in op:
|
||||
rb = op["requestBody"]
|
||||
if "content" in rb:
|
||||
for _ct, content in rb["content"].items():
|
||||
if "schema" in content:
|
||||
content["example"] = generator.generate(content["schema"])
|
||||
info = spec_dict["info"]
|
||||
tags = spec_dict.get("tags", [])
|
||||
endpoints_by_tag: dict[str, dict[str, dict[str, Any]]] = {}
|
||||
for path, path_item in paths.items():
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in path_item:
|
||||
op = path_item[method]
|
||||
op_tags = op.get("tags", ["Other"])
|
||||
for tag in op_tags:
|
||||
if tag not in endpoints_by_tag:
|
||||
endpoints_by_tag[tag] = {}
|
||||
if path not in endpoints_by_tag[tag]:
|
||||
endpoints_by_tag[tag][path] = {}
|
||||
endpoints_by_tag[tag][path][method] = op
|
||||
servers = spec_dict.get("servers", [])
|
||||
components = spec_dict.get("components", {})
|
||||
output = template.render(
|
||||
spec=spec_dict,
|
||||
info=info,
|
||||
paths=paths,
|
||||
servers=servers,
|
||||
tags=tags,
|
||||
endpoints_by_tag=endpoints_by_tag,
|
||||
components=components,
|
||||
security=spec_dict.get("security", []),
|
||||
external_docs=spec_dict.get("externalDocs"),
|
||||
)
|
||||
Path(output_path).write_text(output)
|
||||
|
||||
|
||||
def generate_json(spec_path: str, output_path: str, template_path: str | None = None) -> None:
|
||||
spec = parse_openapi_spec(spec_path)
|
||||
spec_dict = spec.model_dump()
|
||||
components_schemas = spec_dict.get("components", {}).get("schemas", {})
|
||||
generator = ExampleGenerator(components_schemas)
|
||||
paths = spec_dict.get("paths", {})
|
||||
for _path, path_item in paths.items():
|
||||
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
|
||||
if method in path_item:
|
||||
op = path_item[method]
|
||||
if "requestBody" in op:
|
||||
rb = op["requestBody"]
|
||||
if "content" in rb:
|
||||
for _ct, content in rb["content"].items():
|
||||
if "schema" in content:
|
||||
content["example"] = generator.generate(content["schema"])
|
||||
for _status_code, _response in spec_dict.get("paths", {}).items():
|
||||
pass
|
||||
output = json.dumps(spec_dict, indent=2)
|
||||
Path(output_path).write_text(output)
|
||||
|
||||
|
||||
class LocalDocsHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def __init__(self, *args, directory: str | None = None, **kwargs):
|
||||
self.docs_dir = directory
|
||||
super().__init__(*args, directory=directory, **kwargs)
|
||||
|
||||
def do_GET(self):
|
||||
if self.path == "/":
|
||||
self.path = "/index.html"
|
||||
return super().do_GET()
|
||||
|
||||
|
||||
class _LocalDocsHandlerWithDir(LocalDocsHandler):
|
||||
def __init__(self, *args, directory: str, **kwargs):
|
||||
self.docs_dir = directory
|
||||
super().__init__(*args, directory=directory, **kwargs)
|
||||
|
||||
|
||||
def serve_docs(spec_path: str, host: str = "127.0.0.1", port: int = 8080) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
generate_html(spec_path, str(Path(tmpdir) / "index.html"))
|
||||
shutil.copy(Path(__file__).parent.parent / "templates" / "html_template.html", tmpdir)
|
||||
try:
|
||||
Path.cwd().chdir(tmpdir)
|
||||
with socketserver.TCPServer(
|
||||
(host, port),
|
||||
lambda *args, **kwargs: _LocalDocsHandlerWithDir(*args, directory=tmpdir, **kwargs)
|
||||
) as httpd:
|
||||
click.echo(f"Serving API documentation at http://{host}:{port}")
|
||||
httpd.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
@@ -1 +1 @@
|
||||
# Tests for LocalAPI Docs
|
||||
|
||||
|
||||
@@ -1,32 +1,94 @@
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
from src.cli import main
|
||||
|
||||
from src.cli import generate, main, search, serve, validate
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def runner():
|
||||
return CliRunner()
|
||||
def sample_spec_path(tmp_path):
|
||||
spec = {
|
||||
"openapi": "3.0.3",
|
||||
"info": {"title": "Test API", "version": "1.0.0"},
|
||||
"paths": {
|
||||
"/users": {
|
||||
"get": {
|
||||
"summary": "List users",
|
||||
"description": "Get all users",
|
||||
"tags": ["Users"],
|
||||
"responses": {"200": {"description": "Success"}}
|
||||
}
|
||||
},
|
||||
"/users/{id}": {
|
||||
"get": {
|
||||
"summary": "Get user",
|
||||
"description": "Get a user by ID",
|
||||
"tags": ["Users"],
|
||||
"parameters": [
|
||||
{"name": "id", "in": "path", "required": True, "schema": {"type": "string"}}
|
||||
],
|
||||
"responses": {"200": {"description": "Success"}}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
path = tmp_path / "openapi.json"
|
||||
import json
|
||||
path.write_text(json.dumps(spec))
|
||||
return str(path)
|
||||
|
||||
|
||||
def test_cli_help(runner):
|
||||
result = runner.invoke(main, ['--help'])
|
||||
class TestCLI:
|
||||
def test_main_help(self):
|
||||
result = runner.invoke(main, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert 'LocalAPI Docs' in result.output
|
||||
assert "LocalAPI Docs" in result.output
|
||||
|
||||
|
||||
def test_cli_serve(runner, tmp_path, sample_spec):
|
||||
result = runner.invoke(main, ['serve', str(sample_spec)])
|
||||
def test_serve_help(self):
|
||||
result = runner.invoke(serve, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "serve" in result.output
|
||||
|
||||
|
||||
def test_cli_generate(runner, tmp_path, sample_spec):
|
||||
output = tmp_path / "output.html"
|
||||
result = runner.invoke(main, ['generate', str(sample_spec), '-o', str(output)])
|
||||
def test_generate_help(self):
|
||||
result = runner.invoke(generate, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert output.exists()
|
||||
assert "generate" in result.output
|
||||
|
||||
|
||||
def test_cli_validate(runner, sample_spec):
|
||||
result = runner.invoke(main, ['validate', str(sample_spec)])
|
||||
def test_validate_help(self):
|
||||
result = runner.invoke(validate, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert 'valid' in result.output.lower()
|
||||
assert "validate" in result.output
|
||||
|
||||
def test_search_help(self):
|
||||
result = runner.invoke(search, ["--help"])
|
||||
assert result.exit_code == 0
|
||||
assert "search" in result.output
|
||||
|
||||
|
||||
class TestValidateCommand:
|
||||
def test_validate_valid_spec(self, sample_spec_path):
|
||||
result = runner.invoke(validate, [sample_spec_path])
|
||||
assert result.exit_code == 0
|
||||
assert "Valid OpenAPI spec" in result.output
|
||||
assert "Test API" in result.output
|
||||
|
||||
def test_validate_nonexistent_file(self):
|
||||
result = runner.invoke(validate, ["/nonexistent/path.json"])
|
||||
assert result.exit_code != 0
|
||||
|
||||
|
||||
class TestSearchCommand:
|
||||
def test_search_query(self, sample_spec_path):
|
||||
result = runner.invoke(search, [sample_spec_path, "users"])
|
||||
assert result.exit_code == 0
|
||||
assert "users" in result.output.lower() or "found" in result.output.lower()
|
||||
|
||||
def test_search_no_query(self, sample_spec_path):
|
||||
result = runner.invoke(search, [sample_spec_path])
|
||||
assert result.exit_code == 0
|
||||
assert "query" in result.output.lower()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
|
||||
@@ -1,38 +1,156 @@
|
||||
import pytest
|
||||
from src.core.generator import generate_docs
|
||||
from pathlib import Path
|
||||
|
||||
from src.core.models import Schema
|
||||
from src.utils.examples import ExampleGenerator, generate_examples_from_schema
|
||||
|
||||
|
||||
def test_generate_html(tmp_path, sample_spec):
|
||||
output = tmp_path / "test.html"
|
||||
generate_docs(str(sample_spec), str(output), 'html')
|
||||
assert output.exists()
|
||||
html = output.read_text()
|
||||
assert '<html>' in html
|
||||
assert 'API Documentation' in html
|
||||
class TestExampleGenerator:
|
||||
def test_generate_string(self):
|
||||
schema = Schema(type="string")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == "string"
|
||||
|
||||
def test_generate_string_with_format(self):
|
||||
schema = Schema(type="string", format="email")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == "user@example.com"
|
||||
|
||||
def test_generate_string_with_date_format(self):
|
||||
schema = Schema(type="string", format="date")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == "2024-01-01"
|
||||
|
||||
def test_generate_string_with_datetime_format(self):
|
||||
schema = Schema(type="string", format="date-time")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert "T" in result
|
||||
|
||||
def test_generate_integer(self):
|
||||
schema = Schema(type="integer")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == 0
|
||||
assert isinstance(result, int)
|
||||
|
||||
def test_generate_integer_with_default(self):
|
||||
schema = Schema(type="integer", default=42)
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == 42
|
||||
|
||||
def test_generate_number(self):
|
||||
schema = Schema(type="number")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == 0.0
|
||||
assert isinstance(result, float)
|
||||
|
||||
def test_generate_boolean(self):
|
||||
schema = Schema(type="boolean")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result is False
|
||||
assert isinstance(result, bool)
|
||||
|
||||
def test_generate_boolean_with_example(self):
|
||||
schema = Schema(type="boolean", example=True)
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result is True
|
||||
|
||||
def test_generate_array(self):
|
||||
schema = Schema(type="array", items={"type": "string"})
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert isinstance(result, list)
|
||||
assert len(result) == 1
|
||||
assert result[0] == "string"
|
||||
|
||||
def test_generate_object(self):
|
||||
schema = Schema(
|
||||
type="object",
|
||||
properties={
|
||||
"name": Schema(type="string"),
|
||||
"age": Schema(type="integer")
|
||||
},
|
||||
required=["name"]
|
||||
)
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert isinstance(result, dict)
|
||||
assert "name" in result
|
||||
assert "age" in result
|
||||
|
||||
def test_generate_object_with_enum(self):
|
||||
schema = Schema(
|
||||
type="object",
|
||||
properties={
|
||||
"status": Schema(type="string", enum=["active", "inactive"])
|
||||
}
|
||||
)
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert isinstance(result, dict)
|
||||
assert result["status"] in ["active", "inactive"]
|
||||
|
||||
def test_generate_with_example(self):
|
||||
schema = Schema(type="string", example="custom-value")
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result == "custom-value"
|
||||
|
||||
def test_generate_null_schema(self):
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(None)
|
||||
assert result is None
|
||||
|
||||
def test_generate_with_all_of(self):
|
||||
schema = Schema(all_of=[
|
||||
{"type": "object", "properties": {"name": {"type": "string"}}},
|
||||
{"type": "object", "properties": {"email": {"type": "string"}}}
|
||||
])
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert isinstance(result, dict)
|
||||
assert "name" in result
|
||||
assert "email" in result
|
||||
|
||||
def test_generate_with_any_of(self):
|
||||
schema = Schema(any_of=[
|
||||
{"type": "string"},
|
||||
{"type": "integer"}
|
||||
])
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result is not None
|
||||
|
||||
def test_generate_with_one_of(self):
|
||||
schema = Schema(one_of=[
|
||||
{"type": "string"},
|
||||
{"type": "integer"}
|
||||
])
|
||||
generator = ExampleGenerator()
|
||||
result = generator.generate(schema)
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_generate_markdown(tmp_path, sample_spec):
|
||||
output = tmp_path / "test.md"
|
||||
generate_docs(str(sample_spec), str(output), 'markdown')
|
||||
assert output.exists()
|
||||
md = output.read_text()
|
||||
assert '# API Documentation' in md
|
||||
class TestGenerateExamplesFromSchema:
|
||||
def test_generate_from_dict_schema(self):
|
||||
schema = {"type": "string"}
|
||||
result = generate_examples_from_schema(schema)
|
||||
assert result == "string"
|
||||
|
||||
def test_generate_with_components(self):
|
||||
schemas = {"User": Schema(type="object", properties={"name": Schema(type="string")})}
|
||||
generator = ExampleGenerator(schemas)
|
||||
result = generator.generate({"$ref": "#/components/schemas/User"})
|
||||
assert isinstance(result, dict)
|
||||
assert "name" in result
|
||||
|
||||
|
||||
def test_generate_json(tmp_path, sample_spec):
|
||||
output = tmp_path / "test.json"
|
||||
generate_docs(str(sample_spec), str(output), 'json')
|
||||
assert output.exists()
|
||||
import json
|
||||
data = json.loads(output.read_text())
|
||||
assert 'title' in data
|
||||
assert 'endpoints' in data
|
||||
|
||||
|
||||
def test_invalid_spec(tmp_path):
|
||||
invalid_spec = tmp_path / "invalid.json"
|
||||
invalid_spec.write_text('{"invalid": "spec"}')
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
generate_docs(str(invalid_spec), 'html')
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
|
||||
@@ -1,74 +1,225 @@
|
||||
|
||||
import pytest
|
||||
from src.core.parser import parse_openapi_spec, load_spec_file, ParseError
|
||||
|
||||
from src.core.models import Schema
|
||||
from src.core.parser import OpenAPIParser
|
||||
|
||||
@pytest.fixture
|
||||
def sample_spec(tmp_path):
|
||||
spec = {
|
||||
"openapi": "3.0.0",
|
||||
VALID_OPENAPI_SPEC = {
|
||||
"openapi": "3.0.3",
|
||||
"info": {
|
||||
"title": "Test API",
|
||||
"version": "1.0.0"
|
||||
"version": "1.0.0",
|
||||
"description": "A test API"
|
||||
},
|
||||
"servers": [
|
||||
{"url": "https://api.example.com/v1", "description": "Production"}
|
||||
],
|
||||
"paths": {
|
||||
"/users": {
|
||||
"get": {
|
||||
"summary": "Get users",
|
||||
"description": "Retrieve a list of users",
|
||||
"tags": ["users"],
|
||||
"summary": "List users",
|
||||
"description": "Get a list of all users",
|
||||
"tags": ["Users"],
|
||||
"parameters": [
|
||||
{
|
||||
"name": "limit", "in": "query", "schema": {"type": "integer"},
|
||||
"required": False
|
||||
},
|
||||
{
|
||||
"name": "offset", "in": "query", "schema": {"type": "integer"},
|
||||
"required": False
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {"description": "Successful response"}
|
||||
"200": {"description": "Success"},
|
||||
"400": {"description": "Bad Request"}
|
||||
}
|
||||
},
|
||||
"post": {
|
||||
"summary": "Create user",
|
||||
"description": "Create a new user",
|
||||
"tags": ["Users"],
|
||||
"requestBody": {
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string"},
|
||||
"email": {"type": "string", "format": "email"}
|
||||
},
|
||||
"required": ["name", "email"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"responses": {
|
||||
"201": {"description": "Created"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/users/{id}": {
|
||||
"get": {
|
||||
"summary": "Get user",
|
||||
"description": "Get a user by ID",
|
||||
"tags": ["Users"],
|
||||
"parameters": [
|
||||
{"name": "id", "in": "path", "required": True, "schema": {"type": "string"}}
|
||||
],
|
||||
"responses": {
|
||||
"200": {"description": "Success"},
|
||||
"404": {"description": "Not Found"}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"components": {
|
||||
"schemas": {
|
||||
"User": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {"type": "string"},
|
||||
"name": {"type": "string"},
|
||||
"email": {"type": "string", "format": "email"}
|
||||
},
|
||||
"required": ["id", "name", "email"]
|
||||
},
|
||||
"Error": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"code": {"type": "integer"},
|
||||
"message": {"type": "string"}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
path = tmp_path / "test.json"
|
||||
path.write_text(__import__('json').dumps(spec))
|
||||
return path
|
||||
|
||||
|
||||
def test_parse_valid_spec(sample_spec):
|
||||
result = parse_openapi_spec(str(sample_spec))
|
||||
assert result['valid'] == True
|
||||
assert result['title'] == 'Test API'
|
||||
assert result['version_num'] == '1.0.0'
|
||||
class TestOpenAPIParser:
|
||||
def test_parse_valid_spec(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
assert spec.info.title == "Test API"
|
||||
assert spec.info.version == "1.0.0"
|
||||
assert spec.openapi == "3.0.3"
|
||||
|
||||
def test_parse_paths(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
assert "/users" in spec.paths
|
||||
assert "/users/{id}" in spec.paths
|
||||
|
||||
def test_parse_operations(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
users_path = spec.paths["/users"]
|
||||
assert users_path.get is not None
|
||||
assert users_path.post is not None
|
||||
assert users_path.get.summary == "List users"
|
||||
|
||||
def test_parse_parameters(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
users_path = spec.paths["/users"]
|
||||
get_op = users_path.get
|
||||
assert get_op is not None
|
||||
params = get_op.parameters or []
|
||||
assert len(params) == 2
|
||||
param_names = [p.name for p in params]
|
||||
assert "limit" in param_names
|
||||
assert "offset" in param_names
|
||||
|
||||
def test_parse_request_body(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
users_path = spec.paths["/users"]
|
||||
post_op = users_path.post
|
||||
assert post_op is not None
|
||||
assert post_op.request_body is not None
|
||||
|
||||
def test_parse_responses(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
users_path = spec.paths["/users"]
|
||||
get_op = users_path.get
|
||||
assert get_op is not None
|
||||
assert "200" in get_op.responses
|
||||
assert "400" in get_op.responses
|
||||
|
||||
def test_parse_components(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
assert spec.components is not None
|
||||
assert "User" in (spec.components.schemas or {})
|
||||
assert "Error" in (spec.components.schemas or {})
|
||||
|
||||
def test_parse_servers(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
assert spec.servers is not None
|
||||
assert len(spec.servers) == 1
|
||||
assert spec.servers[0].url == "https://api.example.com/v1"
|
||||
|
||||
def test_parse_tags(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
spec = parser.parse()
|
||||
users_path = spec.paths["/users"]
|
||||
get_op = users_path.get
|
||||
assert get_op is not None
|
||||
assert "Users" in (get_op.tags or [])
|
||||
|
||||
def test_validation_valid_spec(self):
|
||||
parser = OpenAPIParser(VALID_OPENAPI_SPEC)
|
||||
errors = parser.validate()
|
||||
assert len(errors) == 0
|
||||
|
||||
def test_validation_invalid_spec(self):
|
||||
invalid_spec = {"openapi": "3.0.0", "info": {}}
|
||||
parser = OpenAPIParser(invalid_spec)
|
||||
errors = parser.validate()
|
||||
assert len(errors) > 0
|
||||
|
||||
|
||||
def test_parse_invalid_spec(tmp_path):
|
||||
invalid = tmp_path / "invalid.json"
|
||||
invalid.write_text('{"invalid": "spec"}')
|
||||
result = parse_openapi_spec(str(invalid))
|
||||
assert result['valid'] == False
|
||||
assert 'errors' in result
|
||||
class TestSchemaParsing:
|
||||
def test_parse_string_schema(self):
|
||||
schema_data = {"type": "string", "format": "email", "description": "User email"}
|
||||
schema = Schema(**schema_data)
|
||||
assert schema.type == "string"
|
||||
assert schema.format == "email"
|
||||
|
||||
def test_parse_object_schema(self):
|
||||
schema_data = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string"},
|
||||
"age": {"type": "integer"}
|
||||
},
|
||||
"required": ["name"]
|
||||
}
|
||||
schema = Schema(**schema_data)
|
||||
assert schema.type == "object"
|
||||
assert schema.properties is not None
|
||||
assert "name" in schema.properties
|
||||
|
||||
def test_parse_array_schema(self):
|
||||
schema_data = {
|
||||
"type": "array",
|
||||
"items": {"type": "string"}
|
||||
}
|
||||
schema = Schema(**schema_data)
|
||||
assert schema.type == "array"
|
||||
assert schema.items is not None
|
||||
|
||||
def test_parse_enum_schema(self):
|
||||
schema_data = {
|
||||
"type": "string",
|
||||
"enum": ["active", "inactive", "pending"]
|
||||
}
|
||||
schema = Schema(**schema_data)
|
||||
assert schema.enum is not None
|
||||
assert len(schema.enum) == 3
|
||||
|
||||
|
||||
def test_load_json_spec(sample_spec):
|
||||
spec = load_spec_file(str(sample_spec))
|
||||
assert spec['info']['title'] == 'Test API'
|
||||
|
||||
|
||||
def test_load_yaml_spec(tmp_path):
|
||||
content = '''
|
||||
openapi: "3.0.0"
|
||||
info:
|
||||
title: Test API
|
||||
version: "1.0.0"
|
||||
paths: {}
|
||||
'''
|
||||
path = tmp_path / "test.yaml"
|
||||
path.write_text(content)
|
||||
spec = load_spec_file(str(path))
|
||||
assert spec['info']['title'] == 'Test API'
|
||||
|
||||
|
||||
def test_parse_error_missing_file():
|
||||
with pytest.raises(ParseError):
|
||||
load_spec_file('/nonexistent/path.json')
|
||||
|
||||
|
||||
def test_parse_error_unsupported_format(tmp_path):
|
||||
path = tmp_path / "test.txt"
|
||||
path.write_text('some content')
|
||||
with pytest.raises(ParseError):
|
||||
load_spec_file(str(path))
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
|
||||
Reference in New Issue
Block a user