diff --git a/configforge/commands/schema.py b/configforge/commands/schema.py new file mode 100644 index 0000000..8ffccb1 --- /dev/null +++ b/configforge/commands/schema.py @@ -0,0 +1,259 @@ +"""Interactive schema generator command for ConfigForge.""" + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +import click + +from configforge.exceptions import ( + FileOperationError, + InvalidConfigFormatError, + SchemaGenerationError, +) +from configforge.formatters import JSONHandler, YAMLHandler +from configforge.utils.file_parser import detect_format + + +@click.command("schema") +@click.option( + "--output", + "-o", + type=click.Path(), + help="Output file path for the generated schema", +) +@click.option( + "--format", + "-f", + type=click.Choice(["json", "yaml"]), + default="json", + help="Output format for the generated schema", +) +@click.option( + "--from-config", + type=click.Path(exists=True), + help="Generate schema from an existing config file", +) +def schema( + output: Optional[str], + format: str, + from_config: Optional[str], +) -> None: + """Interactively generate a JSON Schema from configuration.""" + try: + if from_config: + schema_dict = _generate_from_config(from_config) + else: + schema_dict = _run_interactive_wizard() + + schema_output = _format_schema(schema_dict, format) + + if output: + _write_schema(output, schema_output, format) + click.echo(f"Generated schema -> {output}") + else: + click.echo(schema_output) + + except (InvalidConfigFormatError, FileOperationError, SchemaGenerationError) as e: + click.echo(f"Error: {e.message}", err=True) + click.get_current_context().exit(1) + + +def _generate_from_config(config_file: str) -> Dict[str, Any]: + """Generate a schema from an existing configuration file.""" + format_type = detect_format(config_file) + + if format_type == "json": + data = JSONHandler.read(config_file) + elif format_type in ("yaml", "yml"): + data = YAMLHandler.read(config_file) + else: + raise InvalidConfigFormatError( + f"Unsupported format: {format_type}. Use JSON or YAML." + ) + + return _data_to_schema(data) + + +def _data_to_schema(data: Any, name: str = "") -> Dict[str, Any]: + """Convert sample data to JSON Schema.""" + if data is None: + return {"type": "null", "description": "Null value"} + + if isinstance(data, bool): + return {"type": "boolean", "description": "Boolean value"} + + if isinstance(data, (int, float)): + schema: Dict[str, Any] = {"type": "number", "description": "Number value"} + if isinstance(data, int): + schema["type"] = "integer" + return schema + + if isinstance(data, str): + return {"type": "string", "description": "String value"} + + if isinstance(data, list): + if not data: + return {"type": "array", "items": {"type": "any"}, "description": "Array of values"} + + all_schemas = [_data_to_schema(item) for item in data] + merged = _merge_schemas(all_schemas) + + return {"type": "array", "items": merged, "description": "Array of values"} + + if isinstance(data, dict): + properties: Dict[str, Any] = {} + required: List[str] = [] + + for key, value in data.items(): + properties[key] = _data_to_schema(value, key) + required.append(key) + + schema: Dict[str, Any] = { + "type": "object", + "properties": properties, + "required": required, + "additionalProperties": False, + "description": "Configuration object" + } + + return schema + + return {"type": "any", "description": "Any value"} + + +def _merge_schemas(schemas: List[Dict[str, Any]]) -> Dict[str, Any]: + """Merge multiple schemas into one.""" + if not schemas: + return {"type": "any"} + + if len(schemas) == 1: + return schemas[0] + + types = set() + for schema in schemas: + schema_type = schema.get("type") + if schema_type: + types.add(schema_type) + + if len(types) > 1: + return {"type": " | ".join(sorted(types)), "description": "Union of multiple types"} + + common_type = types.pop() if types else "any" + + if common_type == "object": + all_props: Dict[str, Any] = {} + all_required = set() + + for schema in schemas: + props = schema.get("properties", {}) + all_props.update(props) + required = schema.get("required", []) + all_required.update(required) + + return { + "type": "object", + "properties": all_props, + "required": list(all_required), + "additionalProperties": False + } + + return {"type": common_type} + + +def _run_interactive_wizard() -> Dict[str, Any]: + """Run the interactive schema wizard.""" + click.echo("ConfigForge Schema Wizard") + click.echo("=" * 40) + + schema_name = click.prompt("Schema name", default="Config") + schema_description = click.prompt("Schema description", default="") + + properties: Dict[str, Any] = {} + required: List[str] = [] + + while True: + click.echo("") + add_property = click.confirm("Add a property?") + if not add_property: + break + + prop_name = click.prompt("Property name") + prop_type = click.prompt( + "Property type", + type=click.Choice(["string", "integer", "number", "boolean", "array", "object"]), + default="string" + ) + + is_required = click.confirm("Is this property required?") + if is_required: + required.append(prop_name) + + prop_description = click.prompt("Property description", default="") + + prop_schema: Dict[str, Any] = { + "type": prop_type, + "description": prop_description + } + + if prop_type == "string": + if click.confirm("Add string constraints?"): + min_length = click.prompt(" Min length (0 for none)", type=int, default=0) + if min_length > 0: + prop_schema["minLength"] = min_length + max_length = click.prompt(" Max length (0 for none)", type=int, default=0) + if max_length > 0: + prop_schema["maxLength"] = max_length + + elif prop_type in ("integer", "number"): + if click.confirm("Add numeric constraints?"): + minimum = click.prompt(" Minimum value", type=float, default=None) + if minimum is not None: + prop_schema["minimum"] = minimum + maximum = click.prompt(" Maximum value", type=float, default=None) + if maximum is not None: + prop_schema["maximum"] = maximum + + elif prop_type == "array": + if click.confirm("Add array constraints?"): + min_items = click.prompt(" Min items", type=int, default=0) + if min_items > 0: + prop_schema["minItems"] = min_items + max_items = click.prompt(" Max items", type=int, default=0) + if max_items > 0: + prop_schema["maxItems"] = max_items + + properties[prop_name] = prop_schema + + schema: Dict[str, Any] = { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": schema_name, + "type": "object", + "properties": properties, + "required": required, + "additionalProperties": False, + } + + if schema_description: + schema["description"] = schema_description + + return schema + + +def _format_schema(schema: Dict[str, Any], format: str) -> str: + """Format schema to output string.""" + if format == "json": + return JSONHandler.dumps(schema, indent=2) + else: + return YAMLHandler.dumps(schema) + + +def _write_schema(filepath: str, content: str, format: str) -> None: + """Write schema to file.""" + try: + with open(filepath, "w", encoding="utf-8") as f: + f.write(content) + except PermissionError: + raise FileOperationError(f"Permission denied: {filepath}", filepath=filepath) + except OSError as e: + raise FileOperationError(f"Failed to write {filepath}: {str(e)}", filepath=filepath)