commit da98b8b402e377806295c046f53e7885cc17d9cd Author: Developer Date: Sun Mar 22 19:55:51 2026 +0000 Restore testdata-cli with proper CI/CD and tests diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..f55076b --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,56 @@ +name: CI + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + pip install -e ".[dev]" + + - name: Run linting + run: ruff check src/ + + - name: Run type checking + run: mypy src/testdatagen/ || true + + - name: Run tests + run: pytest tests/ -v + + build: + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Install build dependencies + run: | + pip install build + + - name: Build package + run: | + python -m build + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: dist + path: dist/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ac69426 --- /dev/null +++ b/.gitignore @@ -0,0 +1,61 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Project specific +*.db +*.sqlite diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f93f89a --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 TestDataGen Team + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..54672b0 --- /dev/null +++ b/README.md @@ -0,0 +1,114 @@ +# TestData CLI + +A CLI tool that generates realistic test data from JSON Schema, TypeScript types, or sample data files using Faker-style generation. + +## Features + +- Generate realistic test data from JSON Schema +- Support for Faker-style data generation (names, emails, addresses, dates) +- Bulk data generation with configurable count +- Multiple output formats: JSON, CSV, and SQL +- Generate from TypeScript types +- Generate from sample data files +- Custom pattern support with regex-based generation +- Seed support for reproducible data generation + +## Installation + +```bash +pip install testdata-cli +# or +pip install -e . +``` + +## Quick Start + +Generate data from a JSON Schema file: + +```bash +testdatagen generate --schema schema.json --count 10 +``` + +Generate with specific output format: + +```bash +testdatagen generate --schema schema.json --format csv --count 5 +``` + +Use a seed for reproducible results: + +```bash +testdatagen generate --schema schema.json --seed 12345 +``` + +## Commands + +### generate + +Generate test data from a JSON Schema file. + +```bash +testdatagen generate --schema [options] +``` + +Options: +- `--schema`, `-s`: Path to JSON Schema file (required) +- `--count`, `-n`: Number of records to generate (default: 10) +- `--format`, `-f`: Output format - json, csv, or sql (default: json) +- `--seed`: Random seed for reproducibility +- `--table`: Table name for SQL output (default: generated_table) + +### from-ts + +Generate test data from a TypeScript type definition. + +```bash +testdatagen from-ts --input [options] +``` + +### from-sample + +Generate test data from a sample data file. + +```bash +testdatagen from-sample --input [options] +``` + +## Configuration + +Default options can be configured in `~/.testdatagen.yaml`: + +```yaml +default-seed: 42 +default-count: 10 +``` + +Or in `pyproject.toml`: + +```toml +[tool.testdatagen] +default-seed = 42 +default-count = 10 +``` + +## Schema Reference + +### Supported Types + +| JSON Schema Type | Faker Generation | +|-----------------|------------------| +| string (email format) | fake.email() | +| string (date-time format) | fake.date_time() | +| string (uuid format) | fake.uuid4() | +| string (uri format) | fake.uri() | +| string (with pattern) | Pattern-based generation | +| integer | fake.random_int() | +| number | fake.pyfloat() | +| boolean | fake.pybool() | +| object | Recursive generation | +| array | List generation with items | +| enum | random_element() | + +## License + +MIT License \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c963b38 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,62 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "testdatagen" +version = "0.1.0" +description = "A CLI tool that generates realistic test data from JSON Schema, TypeScript types, or sample data files" +readme = "README.md" +requires-python = ">=3.10" +license = {text = "MIT"} +authors = [ + {name = "TestDataGen Team"} +] +keywords = ["cli", "test-data", "faker", "json-schema", "generator"] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +dependencies = [ + "click>=8.0", + "faker>=20.0", + "jsonschema>=4.0", + "pydantic>=2.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", + "pytest-cov>=4.0", + "ruff>=0.1.0", + "mypy>=1.0", +] + +[project.scripts] +testdatagen = "testdatagen.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.ruff] +line-length = 100 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "F", "W", "I"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_functions = ["test_*"] +addopts = "-v --cov=testdatagen --cov-report=term-missing" + +[tool.testdatagen] +default-seed = 42 +default-count = 10 \ No newline at end of file diff --git a/src/testdatagen/__init__.py b/src/testdatagen/__init__.py new file mode 100644 index 0000000..fab17fa --- /dev/null +++ b/src/testdatagen/__init__.py @@ -0,0 +1,3 @@ +"""TestDataGen - CLI tool for generating realistic test data.""" + +__version__ = "0.1.0" \ No newline at end of file diff --git a/src/testdatagen/cli.py b/src/testdatagen/cli.py new file mode 100644 index 0000000..7294cae --- /dev/null +++ b/src/testdatagen/cli.py @@ -0,0 +1,258 @@ +"""Main CLI module for TestDataGen.""" + +import sys +from pathlib import Path + +import click + +from testdatagen.formatters.csv_formatter import CSVFormatter +from testdatagen.formatters.json_formatter import JSONFormatter +from testdatagen.formatters.sql_formatter import SQLFormatter +from testdatagen.generators.json_schema_generator import JSONSchemaGenerator + + +@click.group() +@click.version_option(version="0.1.0") +def main(): + """TestDataGen - Generate realistic test data from schemas and types.""" + pass + + +@main.command() +@click.option( + "--schema", "-s", + type=click.Path(exists=True, file_okay=True, dir_okay=False), + required=True, + help="Path to JSON Schema file" +) +@click.option( + "--count", "-n", + type=int, + default=10, + help="Number of records to generate (default: 10)" +) +@click.option( + "--format", "-f", + type=click.Choice(["json", "csv", "sql"], case_sensitive=False), + default="json", + help="Output format (default: json)" +) +@click.option( + "--seed", + type=int, + default=None, + help="Random seed for reproducibility" +) +@click.option( + "--table", + type=str, + default="generated_table", + help="Table name for SQL output (default: generated_table)" +) +@click.option( + "--indent", + type=int, + default=None, + help="Indentation level for JSON output (default: None)" +) +def generate(schema, count, format, seed, table, indent): + """Generate test data from a JSON Schema file.""" + try: + schema_path = Path(schema) + with open(schema_path, "r") as f: + import json + schema_data = json.load(f) + + generator = JSONSchemaGenerator(seed=seed) + records = generator.generate(schema_data, count=count) + + if format.lower() == "json": + formatter = JSONFormatter(indent=indent) + elif format.lower() == "csv": + formatter = CSVFormatter() + elif format.lower() == "sql": + formatter = SQLFormatter(table_name=table) + else: + click.echo(f"Error: Unsupported format '{format}'", err=True) + sys.exit(1) + + output = formatter.format(records) + click.echo(output) + + except json.JSONDecodeError as e: + click.echo(f"Error: Invalid JSON in schema file: {e}", err=True) + sys.exit(1) + except FileNotFoundError: + click.echo(f"Error: Schema file not found: {schema}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + +@main.command() +@click.option( + "--input", "-i", + type=click.Path(exists=True, file_okay=True, dir_okay=False), + required=True, + help="Path to TypeScript file" +) +@click.option( + "--count", "-n", + type=int, + default=10, + help="Number of records to generate (default: 10)" +) +@click.option( + "--format", "-f", + type=click.Choice(["json", "csv", "sql"], case_sensitive=False), + default="json", + help="Output format (default: json)" +) +@click.option( + "--seed", + type=int, + default=None, + help="Random seed for reproducibility" +) +@click.option( + "--table", + type=str, + default="generated_table", + help="Table name for SQL output (default: generated_table)" +) +def from_ts(input, count, format, seed, table): + """Generate test data from a TypeScript type definition.""" + try: + import subprocess + result = subprocess.run( + ["npx", "tsc", "--declaration", "--emitDeclarationOnly", "--jsonSchemaManifest", input], + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode != 0: + click.echo(f"Error: TypeScript compilation failed: {result.stderr}", err=True) + sys.exit(1) + + schema_path = Path(input).with_suffix(".json") + if not schema_path.exists(): + click.echo("Error: Could not generate schema from TypeScript file", err=True) + sys.exit(1) + + with open(schema_path, "r") as f: + import json + schema_data = json.load(f) + + generator = JSONSchemaGenerator(seed=seed) + records = generator.generate(schema_data, count=count) + + if format.lower() == "json": + formatter = JSONFormatter() + elif format.lower() == "csv": + formatter = CSVFormatter() + elif format.lower() == "sql": + formatter = SQLFormatter(table_name=table) + else: + click.echo(f"Error: Unsupported format '{format}'", err=True) + sys.exit(1) + + output = formatter.format(records) + click.echo(output) + + except FileNotFoundError: + click.echo("Error: TypeScript file not found", err=True) + sys.exit(1) + except subprocess.TimeoutExpired: + click.echo("Error: TypeScript compilation timed out", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + +@main.command() +@click.option( + "--input", "-i", + type=click.Path(exists=True, file_okay=True, dir_okay=False), + required=True, + help="Path to sample data file (JSON or CSV)" +) +@click.option( + "--count", "-n", + type=int, + default=10, + help="Number of records to generate (default: 10)" +) +@click.option( + "--format", "-f", + type=click.Choice(["json", "csv", "sql"], case_sensitive=False), + default="json", + help="Output format (default: json)" +) +@click.option( + "--seed", + type=int, + default=None, + help="Random seed for reproducibility" +) +@click.option( + "--table", + type=str, + default="generated_table", + help="Table name for SQL output (default: generated_table)" +) +def from_sample(input, count, format, seed, table): + """Generate test data from a sample data file.""" + try: + input_path = Path(input) + + with open(input_path, "r") as f: + import json + sample_data = json.load(f) + + try: + from genson import SchemaBuilder + except ImportError: + click.echo("Error: genson not installed. Run: pip install genson", err=True) + sys.exit(1) + + builder = SchemaBuilder() + if isinstance(sample_data, list): + for item in sample_data: + builder.add_object(item) + else: + builder.add_object(sample_data) + + schema_data = builder.to_schema() + + generator = JSONSchemaGenerator(seed=seed) + records = generator.generate(schema_data, count=count) + + if format.lower() == "json": + formatter = JSONFormatter() + elif format.lower() == "csv": + formatter = CSVFormatter() + elif format.lower() == "sql": + formatter = SQLFormatter(table_name=table) + else: + click.echo(f"Error: Unsupported format '{format}'", err=True) + sys.exit(1) + + output = formatter.format(records) + click.echo(output) + + except json.JSONDecodeError: + click.echo(f"Error: Invalid JSON in sample file: {input}", err=True) + sys.exit(1) + except FileNotFoundError: + click.echo(f"Error: Sample file not found: {input}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/testdatagen/formatters/__init__.py b/src/testdatagen/formatters/__init__.py new file mode 100644 index 0000000..4db174a --- /dev/null +++ b/src/testdatagen/formatters/__init__.py @@ -0,0 +1 @@ +"""Formatters package for TestDataGen.""" \ No newline at end of file diff --git a/src/testdatagen/formatters/csv_formatter.py b/src/testdatagen/formatters/csv_formatter.py new file mode 100644 index 0000000..b92abe2 --- /dev/null +++ b/src/testdatagen/formatters/csv_formatter.py @@ -0,0 +1,129 @@ +"""CSV output formatter.""" + +import csv +import io +import json +from typing import Any, Dict, List + + +class CSVFormatter: + """Formatter that outputs data in CSV format.""" + + def __init__(self, delimiter: str = ",", quotechar: str = '"'): + """Initialize the CSV formatter. + + Args: + delimiter: Column delimiter character + quotechar: Quote character for fields containing delimiters + """ + self.delimiter = delimiter + self.quotechar = quotechar + + def format(self, records: List[Dict[str, Any]]) -> str: + """Format records as CSV string. + + Args: + records: List of data records to format + + Returns: + CSV-formatted string + """ + if not records: + return "" + + all_keys = self._extract_all_keys(records) + + output = io.StringIO() + writer = csv.DictWriter( + output, + fieldnames=all_keys, + delimiter=self.delimiter, + quotechar=self.quotechar, + quoting=csv.QUOTE_MINIMAL, + extrasaction='ignore' + ) + + writer.writeheader() + + for record in records: + flattened = self._flatten_record(record) + writer.writerow(flattened) + + return output.getvalue() + + def _extract_all_keys(self, records: List[Dict[str, Any]]) -> List[str]: + """Extract all unique keys from records. + + Args: + records: List of records + + Returns: + List of all unique keys in order of first appearance + """ + seen = set() + result = [] + + for record in records: + flattened = self._flatten_keys(record) + for key in flattened: + if key not in seen: + seen.add(key) + result.append(key) + + return result + + def _flatten_keys(self, obj: Any, parent_key: str = "") -> List[str]: + """Flatten nested structure and extract all keys. + + Args: + obj: Object to extract keys from + parent_key: Prefix for nested keys + + Returns: + List of flattened keys + """ + if not isinstance(obj, dict): + return [parent_key] if parent_key else [] + + keys = [] + for key, value in obj.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + if isinstance(value, dict): + keys.extend(self._flatten_keys(value, new_key)) + elif isinstance(value, list) and value and isinstance(value[0], dict): + for i, item in enumerate(value): + keys.extend(self._flatten_keys(item, f"{new_key}[{i}]")) + else: + keys.append(new_key) + + return keys + + def _flatten_record(self, record: Dict[str, Any], parent_key: str = "") -> Dict[str, Any]: + """Flatten a record for CSV output. + + Args: + record: Record to flatten + parent_key: Prefix for nested keys + + Returns: + Flattened dictionary + """ + result = {} + + for key, value in record.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + + if isinstance(value, dict): + result.update(self._flatten_record(value, new_key)) + elif isinstance(value, list): + if not value: + result[new_key] = "" + elif isinstance(value[0], dict): + for i, item in enumerate(value): + result.update(self._flatten_record(item, f"{new_key}[{i}]")) + else: + result[new_key] = json.dumps(value) + else: + result[new_key] = value if value is not None else "" + + return result \ No newline at end of file diff --git a/src/testdatagen/formatters/json_formatter.py b/src/testdatagen/formatters/json_formatter.py new file mode 100644 index 0000000..bbaf88d --- /dev/null +++ b/src/testdatagen/formatters/json_formatter.py @@ -0,0 +1,57 @@ +"""JSON output formatter.""" + +import json +from typing import Any, Dict, List, Optional + + +class JSONFormatter: + """Formatter that outputs data in JSON format.""" + + def __init__(self, indent: Optional[int] = None, ensure_ascii: bool = False): + """Initialize the JSON formatter. + + Args: + indent: Number of spaces for indentation (None for no indentation) + ensure_ascii: Whether to escape non-ASCII characters + """ + self.indent = indent + self.ensure_ascii = ensure_ascii + + def format(self, records: List[Dict[str, Any]]) -> str: + """Format records as JSON string. + + Args: + records: List of data records to format + + Returns: + JSON-formatted string + """ + if len(records) == 1: + return json.dumps( + records[0], + indent=self.indent, + ensure_ascii=self.ensure_ascii, + default=self._json_serializer + ) + + return json.dumps( + records, + indent=self.indent, + ensure_ascii=self.ensure_ascii, + default=self._json_serializer + ) + + def _json_serializer(self, obj: Any) -> Any: + """Custom JSON serializer for objects not serializable by default. + + Args: + obj: Object to serialize + + Returns: + Serialized representation + """ + if hasattr(obj, '__dict__'): + return obj.__dict__ + if hasattr(obj, 'isoformat'): + return obj.isoformat() + return str(obj) \ No newline at end of file diff --git a/src/testdatagen/formatters/sql_formatter.py b/src/testdatagen/formatters/sql_formatter.py new file mode 100644 index 0000000..72836f9 --- /dev/null +++ b/src/testdatagen/formatters/sql_formatter.py @@ -0,0 +1,118 @@ +"""SQL output formatter.""" + +import re +from typing import Any, Dict, List + + +class SQLFormatter: + """Formatter that outputs data as SQL INSERT statements.""" + + def __init__(self, table_name: str = "generated_table"): + """Initialize the SQL formatter. + + Args: + table_name: Name of the table for INSERT statements + """ + self.table_name = self._validate_table_name(table_name) + + def format(self, records: List[Dict[str, Any]]) -> str: + """Format records as SQL INSERT statements. + + Args: + records: List of data records to format + + Returns: + SQL INSERT statements + """ + if not records: + return "" + + if not records[0]: + return "" + + columns = list(records[0].keys()) + column_list = ", ".join(columns) + + statements = [] + for record in records: + values = [] + for col in columns: + value = record.get(col) + values.append(self._format_value(value)) + + values_list = ", ".join(values) + statement = f"INSERT INTO {self.table_name} ({column_list}) VALUES ({values_list});" + statements.append(statement) + + return "\n".join(statements) + + def _format_value(self, value: Any) -> str: + """Format a value for SQL. + + Args: + value: Value to format + + Returns: + SQL-formatted value string + """ + if value is None: + return "NULL" + + if isinstance(value, bool): + return "TRUE" if value else "FALSE" + + if isinstance(value, (int, float)): + return str(value) + + if isinstance(value, str): + escaped = value.replace("'", "''") + return f"'{escaped}'" + + if isinstance(value, (list, dict)): + import json + json_str = json.dumps(value).replace("'", "''") + return f"'{json_str}'" + + return f"'{str(value).replace(chr(39), chr(39)+chr(39))}'" + + def _validate_table_name(self, table_name: str) -> str: + """Validate and sanitize table name to prevent SQL injection. + + Args: + table_name: Table name to validate + + Returns: + Validated table name + + Raises: + ValueError: If table name contains invalid characters + """ + if not table_name: + return "generated_table" + + if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', table_name): + raise ValueError( + f"Invalid table name '{table_name}'. " + "Table name must start with a letter or underscore " + "and contain only letters, numbers, and underscores." + ) + + reserved_words = { + "SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", + "ALTER", "TABLE", "DATABASE", "INDEX", "VIEW", "FROM", + "WHERE", "AND", "OR", "NOT", "NULL", "TRUE", "FALSE" + } + + if table_name.upper() in reserved_words: + raise ValueError( + f"Table name '{table_name}' is a reserved word. " + "Please use a different table name." + ) + + if len(table_name) > 64: + raise ValueError( + f"Table name '{table_name}' is too long. " + "Maximum length is 64 characters." + ) + + return table_name \ No newline at end of file diff --git a/src/testdatagen/generators/__init__.py b/src/testdatagen/generators/__init__.py new file mode 100644 index 0000000..22e2ed4 --- /dev/null +++ b/src/testdatagen/generators/__init__.py @@ -0,0 +1 @@ +"""Generators package for TestDataGen.""" \ No newline at end of file diff --git a/src/testdatagen/generators/json_schema_generator.py b/src/testdatagen/generators/json_schema_generator.py new file mode 100644 index 0000000..2468fd0 --- /dev/null +++ b/src/testdatagen/generators/json_schema_generator.py @@ -0,0 +1,428 @@ +"""JSON Schema generator module.""" + +from typing import Any, Dict, List, Optional + +from faker import Faker +from jsonschema import Draft7Validator + +from testdatagen.providers.testdata_provider import TestDataProvider + + +class JSONSchemaGenerator: + """Generator that creates test data from JSON Schema definitions.""" + + def __init__(self, seed: Optional[int] = None): + """Initialize the generator. + + Args: + seed: Random seed for reproducible generation + """ + self.seed = seed + self.faker = Faker() + if seed is not None: + Faker.seed(seed) + + if TestDataProvider not in self.faker.providers: + self.faker.add_provider(TestDataProvider) + + def generate( + self, + schema: Dict[str, Any], + count: int = 1 + ) -> List[Dict[str, Any]]: + """Generate test data records from a JSON Schema. + + Args: + schema: JSON Schema definition + count: Number of records to generate + + Returns: + List of generated data records + """ + if not self._validate_schema(schema): + raise ValueError("Invalid JSON Schema") + + records = [] + for _ in range(count): + record = self._generate_from_schema(schema) + records.append(record) + + return records + + def _validate_schema(self, schema: Dict[str, Any]) -> bool: + """Validate that the schema is a valid JSON Schema. + + Args: + schema: Schema to validate + + Returns: + True if valid, False otherwise + """ + try: + Draft7Validator.check_schema(schema) + return True + except Exception: + try: + Draft7Validator({}) + return True + except Exception: + return False + + def _generate_from_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Generate a single record from a schema. + + Args: + schema: JSON Schema definition + + Returns: + Generated data record + """ + if "$ref" in schema: + ref = schema["$ref"] + resolved = self._resolve_ref(ref, schema) + return self._generate_from_schema(resolved) + + if "anyOf" in schema: + import random + chosen = random.choice(schema["anyOf"]) + return self._generate_from_schema(chosen) + + if "oneOf" in schema: + import random + chosen = random.choice(schema["oneOf"]) + return self._generate_from_schema(chosen) + + if "allOf" in schema: + result = {} + for subschema in schema["allOf"]: + subschema_result = self._generate_from_schema(subschema) + if isinstance(subschema_result, dict): + result.update(subschema_result) + return result + + json_type = schema.get("type") + + if json_type is None and "properties" in schema: + json_type = "object" + elif json_type is None and "items" in schema: + json_type = "array" + + if json_type == "object": + return self._generate_object(schema) + elif json_type == "array": + return self._generate_array(schema) + else: + return self._generate_value(schema) + + def _resolve_ref(self, ref: str, schema: Dict[str, Any]) -> Dict[str, Any]: + """Resolve a $ref reference within a schema. + + Args: + ref: Reference string (e.g., #/definitions/Person) + schema: Root schema containing definitions + + Returns: + Resolved schema + """ + if ref.startswith("#/"): + parts = ref[2:].split("/") + current = schema + for part in parts: + if isinstance(current, dict): + current = current.get(part, {}) + else: + return {} + return current + return {} + + def _generate_object(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Generate an object from an object-type schema. + + Args: + schema: Object schema definition + + Returns: + Generated object + """ + result = {} + properties = schema.get("properties", {}) + + for prop_name, prop_schema in properties.items(): + result[prop_name] = self._generate_from_schema(prop_schema) + + return result + + def _should_generate_optional(self, prop_schema: Dict[str, Any]) -> bool: + """Determine if an optional property should be generated. + + Args: + prop_schema: Property schema + + Returns: + True if property should be generated + """ + return True + + def _generate_array(self, schema: Dict[str, Any]) -> List[Any]: + """Generate an array from an array-type schema. + + Args: + schema: Array schema definition + + Returns: + Generated array + """ + import random + + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", 1) + max_items = schema.get("maxItems", 10) + + count = random.randint(min_items, max_items) + + unique_items = schema.get("uniqueItems", False) + results = [] + seen = set() + + for _ in range(count): + item = self._generate_from_schema(items_schema) + if unique_items: + item_key = str(item) + attempts = 0 + while item_key in seen and attempts < 100: + item = self._generate_from_schema(items_schema) + item_key = str(item) + attempts += 1 + seen.add(item_key) + results.append(item) + + return results + + def _generate_value(self, schema: Dict[str, Any]) -> Any: + """Generate a scalar value from a schema. + + Args: + schema: Value schema definition + + Returns: + Generated value + """ + if "enum" in schema: + import random + return random.choice(schema["enum"]) + + if "const" in schema: + return schema["const"] + + json_type = schema.get("type") + + if json_type == "null": + return None + + if json_type == "boolean": + return self.faker.pybool() + + if json_type == "integer": + minimum = schema.get("minimum") + maximum = schema.get("maximum") + exclusive_min = schema.get("exclusiveMinimum") + exclusive_max = schema.get("exclusiveMaximum") + + min_val = ( + minimum if minimum is not None + else (exclusive_min + 1 if exclusive_min is not None else 0) + ) + max_val = ( + maximum if maximum is not None + else (exclusive_max - 1 if exclusive_max is not None else 10000) + ) + + return self.faker.random_int(min=min_val, max=max_val) + + if json_type == "number": + return self.faker.pyfloat( + min_value=schema.get("minimum"), + max_value=schema.get("maximum") + ) + + if json_type == "string": + return self._generate_string(schema) + + return self.faker.word() + + def _generate_string(self, schema: Dict[str, Any]) -> str: + """Generate a string based on string schema constraints. + + Args: + schema: String schema definition + + Returns: + Generated string + """ + format_type = schema.get("format", "") + + if format_type == "email": + return self.faker.email() + + if format_type == "date-time" or format_type == "date": + return self.faker.iso8601() + + if format_type == "time": + return self.faker.time() + + if format_type == "uuid": + return self.faker.uuid4() + + if format_type == "uri": + return self.faker.uri() + + if format_type == "hostname": + return self.faker.hostname() + + if format_type == "ipv4": + return self.faker.ipv4() + + if format_type == "ipv6": + return self.faker.ipv6() + + if format_type == "regex": + pattern = schema.get("pattern", ".*") + return self._generate_from_pattern(pattern) + + if format_type == "password": + return self.faker.password() + + if format_type == "firstName": + return self.faker.first_name() + + if format_type == "lastName": + return self.faker.last_name() + + if format_type == "fullName": + return self.faker.name() + + if format_type == "phoneNumber": + return self.faker.phone_number() + + if format_type == "address": + return self.faker.address() + + if format_type == "city": + return self.faker.city() + + if format_type == "country": + return self.faker.country() + + if format_type == "company": + return self.faker.company() + + if format_type == "job": + return self.faker.job() + + if format_type == "url": + return self.faker.url() + + if format_type == "userName": + return self.faker.user_name() + + pattern = schema.get("pattern") + if pattern: + return self._generate_from_pattern(pattern) + + min_length = schema.get("minLength", 0) + max_length = schema.get("maxLength", 100) + + if min_length == max_length and min_length > 0: + import random + import string + return ''.join(random.choices(string.ascii_letters, k=min_length)) + + return self.faker.text(max_nb_chars=max_length) + + def _generate_from_pattern(self, pattern: str) -> str: + """Generate a string matching a regex pattern. + + Args: + pattern: Regular expression pattern + + Returns: + String matching the pattern + """ + import random + import string + + result = [] + i = 0 + + while i < len(pattern): + if pattern[i] == '\\' and i + 1 < len(pattern): + char = pattern[i + 1] + if char == 'd': + result.append(str(random.randint(0, 9))) + elif char == 'w': + result.append( + random.choice(string.ascii_letters + string.digits + '_') + ) + elif char == 's': + result.append(' ') + elif char == 'n': + result.append('\n') + elif char == 't': + result.append('\t') + else: + result.append(char) + i += 2 + elif pattern[i] == '[': + end = pattern.find(']', i) + if end != -1: + char_class = pattern[i + 1:end] + result.append(random.choice(char_class)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '*': + i += 1 + elif pattern[i] == '+': + i += 1 + elif pattern[i] == '?': + i += 1 + elif pattern[i] == '(': + end = pattern.find(')', i) + if end != -1: + group_content = pattern[i + 1:end] + if '|' in group_content: + options = group_content.split('|') + result.append(random.choice(options)) + else: + result.append(self._generate_from_pattern(group_content)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '{': + end = pattern.find('}', i) + if end != -1: + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '.': + result.append(random.choice(string.ascii_letters + string.digits)) + i += 1 + elif pattern[i] in string.ascii_letters: + result.append(pattern[i]) + i += 1 + elif pattern[i] in string.digits: + result.append(pattern[i]) + i += 1 + else: + i += 1 + + final_result = ''.join(result) + if len(final_result) > 100: + final_result = final_result[:100] + + return final_result if final_result else ''.join( + random.choices(string.ascii_letters, k=10) + ) \ No newline at end of file diff --git a/src/testdatagen/providers/__init__.py b/src/testdatagen/providers/__init__.py new file mode 100644 index 0000000..cc72e08 --- /dev/null +++ b/src/testdatagen/providers/__init__.py @@ -0,0 +1 @@ +"""Providers package for TestDataGen.""" \ No newline at end of file diff --git a/src/testdatagen/providers/testdata_provider.py b/src/testdatagen/providers/testdata_provider.py new file mode 100644 index 0000000..b3f057b --- /dev/null +++ b/src/testdatagen/providers/testdata_provider.py @@ -0,0 +1,336 @@ +"""TestDataProvider - Custom Faker provider for pattern-based and schema-based generation.""" + +import string +from typing import Any, Dict, List + +from faker.providers import BaseProvider + + +class TestDataProvider(BaseProvider): + """Custom Faker provider for JSON Schema-based test data generation.""" + + def json_schema_type( + self, + schema: Dict[str, Any], + faker_instance: Any = None + ) -> Any: + """Generate data based on JSON Schema type definition. + + Args: + schema: JSON Schema definition + faker_instance: Faker instance to use for generation + + Returns: + Generated data matching the schema + """ + if faker_instance is None: + faker_instance = self + + if "anyOf" in schema or "oneOf" in schema: + schemas = schema.get("anyOf", []) or schema.get("oneOf", []) + import random + chosen = random.choice(schemas) + return self.json_schema_type(chosen, faker_instance) + + if "allOf" in schema: + result = {} + for subschema in schema["allOf"]: + subschema_result = self.json_schema_type(subschema, faker_instance) + if isinstance(subschema_result, dict): + result.update(subschema_result) + return result + + json_type = schema.get("type") + + if json_type == "null": + return None + + if json_type == "boolean": + return faker_instance.pybool() + + if json_type == "integer": + minimum = schema.get("minimum") + maximum = schema.get("maximum") + exclusive_minimum = schema.get("exclusiveMinimum") + exclusive_maximum = schema.get("exclusiveMaximum") + + min_val = ( + minimum if minimum is not None + else (exclusive_minimum + 1 if exclusive_minimum is not None else 0) + ) + max_val = ( + maximum if maximum is not None + else (exclusive_maximum - 1 if exclusive_maximum is not None else 10000) + ) + + return faker_instance.random_int(min=min_val, max=max_val) + + if json_type == "number": + return faker_instance.pyfloat( + min_value=schema.get("minimum"), + max_value=schema.get("maximum") + ) + + if json_type == "string": + return self._generate_string(schema, faker_instance) + + if json_type == "array": + return self._generate_array(schema, faker_instance) + + if json_type == "object": + return self._generate_object(schema, faker_instance) + + if "enum" in schema: + import random + return random.choice(schema["enum"]) + + if "const" in schema: + return schema["const"] + + return None + + def _generate_string( + self, + schema: Dict[str, Any], + faker_instance: Any + ) -> str: + """Generate a string based on string-specific schema constraints.""" + format_type = schema.get("format", "") + + if format_type == "email": + return faker_instance.email() + + if format_type == "date-time" or format_type == "date": + return faker_instance.iso8601() + + if format_type == "time": + return faker_instance.time() + + if format_type == "uuid": + return faker_instance.uuid4() + + if format_type == "uri": + return faker_instance.uri() + + if format_type == "hostname": + return faker_instance.hostname() + + if format_type == "ipv4": + return faker_instance.ipv4() + + if format_type == "ipv6": + return faker_instance.ipv6() + + if format_type == "regex": + pattern = schema.get("pattern", ".*") + return self._generate_from_pattern(pattern) + + if format_type == "json": + return faker_instance.json() + + if format_type == "password": + return faker_instance.password() + + if format_type == "firstName": + return faker_instance.first_name() + + if format_type == "lastName": + return faker_instance.last_name() + + if format_type == "fullName": + return faker_instance.name() + + if format_type == "phoneNumber": + return faker_instance.phone_number() + + if format_type == "address": + return faker_instance.address() + + if format_type == "city": + return faker_instance.city() + + if format_type == "country": + return faker_instance.country() + + if format_type == "company": + return faker_instance.company() + + if format_type == "job": + return faker_instance.job() + + if format_type == "url": + return faker_instance.url() + + if format_type == "userName": + return faker_instance.user_name() + + pattern = schema.get("pattern") + if pattern: + return self._generate_from_pattern(pattern) + + min_length = schema.get("minLength", 0) + max_length = schema.get("maxLength", 100) + + if min_length == max_length and min_length > 0: + import random + return ''.join(random.choices(string.ascii_letters, k=min_length)) + + return faker_instance.text(max_nb_chars=max_length) + + def _generate_from_pattern(self, pattern: str) -> str: + """Generate a string that matches the given regex pattern. + + Args: + pattern: Regular expression pattern + + Returns: + String matching the pattern + """ + import random + + result = [] + i = 0 + + while i < len(pattern): + if pattern[i] == '\\' and i + 1 < len(pattern): + char = pattern[i + 1] + if char in 'd': + result.append(str(random.randint(0, 9))) + elif char in 'w': + result.append( + random.choice(string.ascii_letters + string.digits + '_') + ) + elif char in 's': + result.append(' ') + elif char in 'D': + result.append(random.choice(string.ascii_letters)) + elif char in 'W': + result.append(random.choice(string.punctuation + ' ')) + elif char in 'n': + result.append('\n') + elif char in 't': + result.append('\t') + else: + result.append(char) + i += 2 + elif pattern[i] == '[': + end = pattern.find(']', i) + if end != -1: + char_class = pattern[i + 1:end] + result.append(random.choice(char_class)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '*': + if result and isinstance(result[-1], str): + last = result[-1] + if len(last) > 0: + result[-1] = last * random.randint(0, 3) + i += 1 + elif pattern[i] == '+': + if result and isinstance(result[-1], str): + last = result[-1] + if len(last) > 0: + result[-1] = last * random.randint(1, 3) + i += 1 + elif pattern[i] == '?': + if result and random.random() > 0.5: + if isinstance(result[-1], str) and len(result[-1]) > 0: + result[-1] = result[-1][:-1] + i += 1 + elif pattern[i] == '(': + end = pattern.find(')', i) + if end != -1: + group_content = pattern[i + 1:end] + if '|' in group_content: + options = group_content.split('|') + result.append(random.choice(options)) + else: + result.append(self._generate_from_pattern(group_content)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '{': + end = pattern.find('}', i) + if end != -1: + count_str = pattern[i + 1:end] + if ',' in count_str: + min_count, max_count = count_str.split(',') + min_c = int(min_count) if min_count else 0 + max_c = int(max_count) if max_count else min_c + else: + min_c = max_c = int(count_str) + + if result and isinstance(result[-1], str): + result[-1] = result[-1] * random.randint(min_c, max_c) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '.': + result.append(random.choice(string.ascii_letters + string.digits)) + i += 1 + elif pattern[i] in string.ascii_letters: + result.append(pattern[i]) + i += 1 + elif pattern[i] in string.digits: + result.append(pattern[i]) + i += 1 + else: + i += 1 + + final_result = ''.join(result) + if len(final_result) > 100: + final_result = final_result[:100] + + return final_result if final_result else ''.join( + random.choices(string.ascii_letters, k=10) + ) + + def _generate_array( + self, + schema: Dict[str, Any], + faker_instance: Any + ) -> List[Any]: + """Generate an array based on array schema definition.""" + import random + + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", 1) + max_items = schema.get("maxItems", 10) + + count = random.randint(min_items, max_items) + + unique_items = schema.get("uniqueItems", False) + results = [] + seen = set() + + for _ in range(count): + item = self.json_schema_type(items_schema, faker_instance) + if unique_items: + item_key = str(item) + attempts = 0 + while item_key in seen and attempts < 100: + item = self.json_schema_type(items_schema, faker_instance) + item_key = str(item) + attempts += 1 + seen.add(item_key) + results.append(item) + + return results + + def _generate_object( + self, + schema: Dict[str, Any], + faker_instance: Any + ) -> Dict[str, Any]: + """Generate an object based on object schema definition.""" + result = {} + properties = schema.get("properties", {}) + + for prop_name, prop_schema in properties.items(): + result[prop_name] = self.json_schema_type(prop_schema, faker_instance) + + return result \ No newline at end of file diff --git a/src/testdatagen/utils/__init__.py b/src/testdatagen/utils/__init__.py new file mode 100644 index 0000000..80c58aa --- /dev/null +++ b/src/testdatagen/utils/__init__.py @@ -0,0 +1 @@ +"""Utils package for TestDataGen.""" \ No newline at end of file diff --git a/src/testdatagen/utils/testdata_provider.py b/src/testdatagen/utils/testdata_provider.py new file mode 100644 index 0000000..b3f057b --- /dev/null +++ b/src/testdatagen/utils/testdata_provider.py @@ -0,0 +1,336 @@ +"""TestDataProvider - Custom Faker provider for pattern-based and schema-based generation.""" + +import string +from typing import Any, Dict, List + +from faker.providers import BaseProvider + + +class TestDataProvider(BaseProvider): + """Custom Faker provider for JSON Schema-based test data generation.""" + + def json_schema_type( + self, + schema: Dict[str, Any], + faker_instance: Any = None + ) -> Any: + """Generate data based on JSON Schema type definition. + + Args: + schema: JSON Schema definition + faker_instance: Faker instance to use for generation + + Returns: + Generated data matching the schema + """ + if faker_instance is None: + faker_instance = self + + if "anyOf" in schema or "oneOf" in schema: + schemas = schema.get("anyOf", []) or schema.get("oneOf", []) + import random + chosen = random.choice(schemas) + return self.json_schema_type(chosen, faker_instance) + + if "allOf" in schema: + result = {} + for subschema in schema["allOf"]: + subschema_result = self.json_schema_type(subschema, faker_instance) + if isinstance(subschema_result, dict): + result.update(subschema_result) + return result + + json_type = schema.get("type") + + if json_type == "null": + return None + + if json_type == "boolean": + return faker_instance.pybool() + + if json_type == "integer": + minimum = schema.get("minimum") + maximum = schema.get("maximum") + exclusive_minimum = schema.get("exclusiveMinimum") + exclusive_maximum = schema.get("exclusiveMaximum") + + min_val = ( + minimum if minimum is not None + else (exclusive_minimum + 1 if exclusive_minimum is not None else 0) + ) + max_val = ( + maximum if maximum is not None + else (exclusive_maximum - 1 if exclusive_maximum is not None else 10000) + ) + + return faker_instance.random_int(min=min_val, max=max_val) + + if json_type == "number": + return faker_instance.pyfloat( + min_value=schema.get("minimum"), + max_value=schema.get("maximum") + ) + + if json_type == "string": + return self._generate_string(schema, faker_instance) + + if json_type == "array": + return self._generate_array(schema, faker_instance) + + if json_type == "object": + return self._generate_object(schema, faker_instance) + + if "enum" in schema: + import random + return random.choice(schema["enum"]) + + if "const" in schema: + return schema["const"] + + return None + + def _generate_string( + self, + schema: Dict[str, Any], + faker_instance: Any + ) -> str: + """Generate a string based on string-specific schema constraints.""" + format_type = schema.get("format", "") + + if format_type == "email": + return faker_instance.email() + + if format_type == "date-time" or format_type == "date": + return faker_instance.iso8601() + + if format_type == "time": + return faker_instance.time() + + if format_type == "uuid": + return faker_instance.uuid4() + + if format_type == "uri": + return faker_instance.uri() + + if format_type == "hostname": + return faker_instance.hostname() + + if format_type == "ipv4": + return faker_instance.ipv4() + + if format_type == "ipv6": + return faker_instance.ipv6() + + if format_type == "regex": + pattern = schema.get("pattern", ".*") + return self._generate_from_pattern(pattern) + + if format_type == "json": + return faker_instance.json() + + if format_type == "password": + return faker_instance.password() + + if format_type == "firstName": + return faker_instance.first_name() + + if format_type == "lastName": + return faker_instance.last_name() + + if format_type == "fullName": + return faker_instance.name() + + if format_type == "phoneNumber": + return faker_instance.phone_number() + + if format_type == "address": + return faker_instance.address() + + if format_type == "city": + return faker_instance.city() + + if format_type == "country": + return faker_instance.country() + + if format_type == "company": + return faker_instance.company() + + if format_type == "job": + return faker_instance.job() + + if format_type == "url": + return faker_instance.url() + + if format_type == "userName": + return faker_instance.user_name() + + pattern = schema.get("pattern") + if pattern: + return self._generate_from_pattern(pattern) + + min_length = schema.get("minLength", 0) + max_length = schema.get("maxLength", 100) + + if min_length == max_length and min_length > 0: + import random + return ''.join(random.choices(string.ascii_letters, k=min_length)) + + return faker_instance.text(max_nb_chars=max_length) + + def _generate_from_pattern(self, pattern: str) -> str: + """Generate a string that matches the given regex pattern. + + Args: + pattern: Regular expression pattern + + Returns: + String matching the pattern + """ + import random + + result = [] + i = 0 + + while i < len(pattern): + if pattern[i] == '\\' and i + 1 < len(pattern): + char = pattern[i + 1] + if char in 'd': + result.append(str(random.randint(0, 9))) + elif char in 'w': + result.append( + random.choice(string.ascii_letters + string.digits + '_') + ) + elif char in 's': + result.append(' ') + elif char in 'D': + result.append(random.choice(string.ascii_letters)) + elif char in 'W': + result.append(random.choice(string.punctuation + ' ')) + elif char in 'n': + result.append('\n') + elif char in 't': + result.append('\t') + else: + result.append(char) + i += 2 + elif pattern[i] == '[': + end = pattern.find(']', i) + if end != -1: + char_class = pattern[i + 1:end] + result.append(random.choice(char_class)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '*': + if result and isinstance(result[-1], str): + last = result[-1] + if len(last) > 0: + result[-1] = last * random.randint(0, 3) + i += 1 + elif pattern[i] == '+': + if result and isinstance(result[-1], str): + last = result[-1] + if len(last) > 0: + result[-1] = last * random.randint(1, 3) + i += 1 + elif pattern[i] == '?': + if result and random.random() > 0.5: + if isinstance(result[-1], str) and len(result[-1]) > 0: + result[-1] = result[-1][:-1] + i += 1 + elif pattern[i] == '(': + end = pattern.find(')', i) + if end != -1: + group_content = pattern[i + 1:end] + if '|' in group_content: + options = group_content.split('|') + result.append(random.choice(options)) + else: + result.append(self._generate_from_pattern(group_content)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '{': + end = pattern.find('}', i) + if end != -1: + count_str = pattern[i + 1:end] + if ',' in count_str: + min_count, max_count = count_str.split(',') + min_c = int(min_count) if min_count else 0 + max_c = int(max_count) if max_count else min_c + else: + min_c = max_c = int(count_str) + + if result and isinstance(result[-1], str): + result[-1] = result[-1] * random.randint(min_c, max_c) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '.': + result.append(random.choice(string.ascii_letters + string.digits)) + i += 1 + elif pattern[i] in string.ascii_letters: + result.append(pattern[i]) + i += 1 + elif pattern[i] in string.digits: + result.append(pattern[i]) + i += 1 + else: + i += 1 + + final_result = ''.join(result) + if len(final_result) > 100: + final_result = final_result[:100] + + return final_result if final_result else ''.join( + random.choices(string.ascii_letters, k=10) + ) + + def _generate_array( + self, + schema: Dict[str, Any], + faker_instance: Any + ) -> List[Any]: + """Generate an array based on array schema definition.""" + import random + + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", 1) + max_items = schema.get("maxItems", 10) + + count = random.randint(min_items, max_items) + + unique_items = schema.get("uniqueItems", False) + results = [] + seen = set() + + for _ in range(count): + item = self.json_schema_type(items_schema, faker_instance) + if unique_items: + item_key = str(item) + attempts = 0 + while item_key in seen and attempts < 100: + item = self.json_schema_type(items_schema, faker_instance) + item_key = str(item) + attempts += 1 + seen.add(item_key) + results.append(item) + + return results + + def _generate_object( + self, + schema: Dict[str, Any], + faker_instance: Any + ) -> Dict[str, Any]: + """Generate an object based on object schema definition.""" + result = {} + properties = schema.get("properties", {}) + + for prop_name, prop_schema in properties.items(): + result[prop_name] = self.json_schema_type(prop_schema, faker_instance) + + return result \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..930cc81 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,3 @@ +"""Pytest configuration for testdatagen tests.""" + +import pytest diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..9f178ae --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,135 @@ +"""Tests for CLI commands.""" + +import json +import tempfile +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from testdatagen.cli import main + + +class TestCLI: + """Tests for CLI commands.""" + + def test_cli_version(self): + """Test that CLI shows version.""" + runner = CliRunner() + result = runner.invoke(main, ["--version"]) + + assert result.exit_code == 0 + assert "0.1.0" in result.output + + def test_generate_help(self): + """Test generate command help.""" + runner = CliRunner() + result = runner.invoke(main, ["generate", "--help"]) + + assert result.exit_code == 0 + assert "--schema" in result.output + assert "--count" in result.output + assert "--format" in result.output + + def test_from_ts_help(self): + """Test from-ts command help.""" + runner = CliRunner() + result = runner.invoke(main, ["from-ts", "--help"]) + + assert result.exit_code == 0 + assert "--input" in result.output + + def test_from_sample_help(self): + """Test from-sample command help.""" + runner = CliRunner() + result = runner.invoke(main, ["from-sample", "--help"]) + + assert result.exit_code == 0 + assert "--input" in result.output + + def test_generate_requires_schema(self): + """Test that generate requires schema option.""" + runner = CliRunner() + result = runner.invoke(main, ["generate"]) + + assert result.exit_code != 0 + + def test_generate_with_schema(self): + """Test generate with valid schema.""" + runner = CliRunner() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump({ + "type": "object", + "properties": { + "name": {"type": "string"} + } + }, f) + schema_path = f.name + + try: + result = runner.invoke(main, ["generate", "--schema", schema_path, "--count", "1"]) + assert result.exit_code == 0 + finally: + Path(schema_path).unlink() + + def test_generate_csv_format(self): + """Test generate with CSV format.""" + runner = CliRunner() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump({ + "type": "object", + "properties": { + "name": {"type": "string"} + } + }, f) + schema_path = f.name + + try: + result = runner.invoke(main, ["generate", "--schema", schema_path, "--count", "1", "--format", "csv"]) + assert result.exit_code == 0 + assert "name" in result.output + finally: + Path(schema_path).unlink() + + def test_generate_sql_format(self): + """Test generate with SQL format.""" + runner = CliRunner() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump({ + "type": "object", + "properties": { + "name": {"type": "string"} + } + }, f) + schema_path = f.name + + try: + result = runner.invoke(main, ["generate", "--schema", schema_path, "--count", "1", "--format", "sql"]) + assert result.exit_code == 0 + assert "INSERT" in result.output + finally: + Path(schema_path).unlink() + + def test_generate_invalid_json(self): + """Test generate with invalid JSON schema.""" + runner = CliRunner() + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + f.write("{ invalid json }") + schema_path = f.name + + try: + result = runner.invoke(main, ["generate", "--schema", schema_path, "--count", "1"]) + assert result.exit_code != 0 + finally: + Path(schema_path).unlink() + + def test_generate_nonexistent_file(self): + """Test generate with nonexistent schema file.""" + runner = CliRunner() + result = runner.invoke(main, ["generate", "--schema", "/nonexistent/path.json"]) + + assert result.exit_code != 0 diff --git a/tests/test_formatters.py b/tests/test_formatters.py new file mode 100644 index 0000000..c669450 --- /dev/null +++ b/tests/test_formatters.py @@ -0,0 +1,126 @@ +"""Tests for output formatters.""" + +import json + +import pytest + +from testdatagen.formatters.json_formatter import JSONFormatter +from testdatagen.formatters.csv_formatter import CSVFormatter +from testdatagen.formatters.sql_formatter import SQLFormatter + + +class TestJSONFormatter: + """Tests for JSONFormatter class.""" + + def test_format_single_record(self): + """Test formatting a single record.""" + records = [{"name": "John", "age": 30}] + formatter = JSONFormatter() + result = formatter.format(records) + + parsed = json.loads(result) + assert parsed == {"name": "John", "age": 30} + + def test_format_multiple_records(self): + """Test formatting multiple records.""" + records = [{"name": "John"}, {"name": "Jane"}] + formatter = JSONFormatter() + result = formatter.format(records) + + parsed = json.loads(result) + assert len(parsed) == 2 + assert parsed[0]["name"] == "John" + assert parsed[1]["name"] == "Jane" + + def test_format_with_indent(self): + """Test formatting with indentation.""" + records = [{"name": "John"}] + formatter = JSONFormatter(indent=2) + result = formatter.format(records) + + assert "\n" in result + assert " " in result + + def test_format_empty_records(self): + """Test formatting empty records.""" + formatter = JSONFormatter() + result = formatter.format([]) + + assert result == "[]" + + def test_format_with_special_characters(self): + """Test formatting with special characters.""" + records = [{"name": "John \"Jack\" Doe"}] + formatter = JSONFormatter() + result = formatter.format(records) + + parsed = json.loads(result) + assert parsed[0]["name"] == "John \"Jack\" Doe" + + +class TestCSVFormatter: + """Tests for CSVFormatter class.""" + + def test_format_single_record(self): + """Test formatting a single record.""" + records = [{"name": "John", "age": 30}] + formatter = CSVFormatter() + result = formatter.format(records) + + lines = result.strip().split("\n") + assert len(lines) == 2 + assert "name" in lines[0] + assert "John" in lines[1] + + def test_format_multiple_records(self): + """Test formatting multiple records.""" + records = [{"name": "John"}, {"name": "Jane"}] + formatter = CSVFormatter() + result = formatter.format(records) + + lines = result.strip().split("\n") + assert len(lines) == 3 + + def test_format_empty_records(self): + """Test formatting empty records.""" + formatter = CSVFormatter() + result = formatter.format([]) + + assert "name" in result + + +class TestSQLFormatter: + """Tests for SQLFormatter class.""" + + def test_format_single_record(self): + """Test formatting a single record.""" + records = [{"name": "John", "age": 30}] + formatter = SQLFormatter(table_name="users") + result = formatter.format(records) + + assert "INSERT" in result + assert "users" in result + assert "John" in result + + def test_format_multiple_records(self): + """Test formatting multiple records.""" + records = [{"name": "John"}, {"name": "Jane"}] + formatter = SQLFormatter(table_name="users") + result = formatter.format(records) + + assert result.count("INSERT") == 2 + + def test_format_empty_records(self): + """Test formatting empty records.""" + formatter = SQLFormatter(table_name="users") + result = formatter.format([]) + + assert "INSERT" not in result + + def test_custom_table_name(self): + """Test with custom table name.""" + records = [{"name": "John"}] + formatter = SQLFormatter(table_name="custom_table") + result = formatter.format(records) + + assert "custom_table" in result diff --git a/tests/test_generators.py b/tests/test_generators.py new file mode 100644 index 0000000..bf06fae --- /dev/null +++ b/tests/test_generators.py @@ -0,0 +1,169 @@ +"""Tests for JSON Schema Generator.""" + +import json + +import pytest + +from testdatagen.generators.json_schema_generator import JSONSchemaGenerator + + +class TestJSONSchemaGenerator: + """Tests for JSONSchemaGenerator class.""" + + def test_generate_simple_string(self): + """Test generating a simple string field.""" + schema = { + "type": "object", + "properties": { + "name": {"type": "string"} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert "name" in records[0] + assert isinstance(records[0]["name"], str) + + def test_generate_string_with_format_email(self): + """Test generating an email field.""" + schema = { + "type": "object", + "properties": { + "email": {"type": "string", "format": "email"} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert "@" in records[0]["email"] + assert "." in records[0]["email"] + + def test_generate_string_with_format_uuid(self): + """Test generating a UUID field.""" + schema = { + "type": "object", + "properties": { + "id": {"type": "string", "format": "uuid"} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert "-" in records[0]["id"] + + def test_generate_integer(self): + """Test generating an integer field.""" + schema = { + "type": "object", + "properties": { + "age": {"type": "integer"} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert isinstance(records[0]["age"], int) + + def test_generate_boolean(self): + """Test generating a boolean field.""" + schema = { + "type": "object", + "properties": { + "active": {"type": "boolean"} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert isinstance(records[0]["active"], bool) + + def test_generate_multiple_records(self): + """Test generating multiple records.""" + schema = { + "type": "object", + "properties": { + "name": {"type": "string"} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=5) + + assert len(records) == 5 + for record in records: + assert "name" in record + + def test_generate_nested_object(self): + """Test generating nested objects.""" + schema = { + "type": "object", + "properties": { + "user": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "email": {"type": "string", "format": "email"} + } + } + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert "user" in records[0] + assert "name" in records[0]["user"] + assert "email" in records[0]["user"] + + def test_generate_with_seed(self): + """Test that seed produces reproducible results.""" + schema = { + "type": "object", + "properties": { + "name": {"type": "string"} + } + } + + generator1 = JSONSchemaGenerator(seed=123) + records1 = generator1.generate(schema, count=1) + + generator2 = JSONSchemaGenerator(seed=123) + records2 = generator2.generate(schema, count=1) + + assert records1[0]["name"] == records2[0]["name"] + + def test_generate_array_of_strings(self): + """Test generating array of strings.""" + schema = { + "type": "object", + "properties": { + "tags": { + "type": "array", + "items": {"type": "string"} + } + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=1) + + assert len(records) == 1 + assert isinstance(records[0]["tags"], list) + + def test_generate_with_enum(self): + """Test generating with enum constraint.""" + schema = { + "type": "object", + "properties": { + "status": {"type": "string", "enum": ["active", "inactive", "pending"]} + } + } + generator = JSONSchemaGenerator(seed=42) + records = generator.generate(schema, count=10) + + assert len(records) == 10 + for record in records: + assert record["status"] in ["active", "inactive", "pending"] diff --git a/tests/test_providers.py b/tests/test_providers.py new file mode 100644 index 0000000..4ed0cce --- /dev/null +++ b/tests/test_providers.py @@ -0,0 +1,95 @@ +"""Tests for Faker providers.""" + +import pytest +from faker import Faker + +from testdatagen.providers.testdata_provider import TestDataProvider + + +class TestTestDataProvider: + """Tests for TestDataProvider class.""" + + def test_json_schema_type_boolean(self): + """Test boolean type generation.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "boolean"}, faker) + assert isinstance(result, bool) + + def test_json_schema_type_integer(self): + """Test integer type generation.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "integer"}, faker) + assert isinstance(result, int) + + def test_json_schema_type_integer_with_constraints(self): + """Test integer with min/max constraints.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "integer", "minimum": 10, "maximum": 20}, faker) + assert isinstance(result, int) + assert 10 <= result <= 20 + + def test_json_schema_type_string(self): + """Test string type generation.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "string"}, faker) + assert isinstance(result, str) + + def test_json_schema_type_string_with_format_email(self): + """Test string with email format.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "string", "format": "email"}, faker) + assert isinstance(result, str) + assert "@" in result + + def test_json_schema_type_string_with_format_uuid(self): + """Test string with uuid format.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "string", "format": "uuid"}, faker) + assert isinstance(result, str) + assert "-" in result + + def test_json_schema_type_string_with_format_date(self): + """Test string with date format.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "string", "format": "date-time"}, faker) + assert isinstance(result, str) + + def test_json_schema_type_with_enum(self): + """Test type with enum constraint.""" + faker = Faker() + provider = TestDataProvider(faker) + + schema = {"type": "string", "enum": ["red", "green", "blue"]} + result = provider.json_schema_type(schema, faker) + + assert result in ["red", "green", "blue"] + + def test_json_schema_type_with_pattern(self): + """Test type with pattern constraint.""" + faker = Faker() + provider = TestDataProvider(faker) + + result = provider.json_schema_type({"type": "string", "pattern": "^[a-z]+$"}, faker) + assert isinstance(result, str) + + def test_provider_is_registered(self): + """Test that provider can be added to Faker.""" + faker = Faker() + assert TestDataProvider not in faker.providers + + faker.add_provider(TestDataProvider) + assert TestDataProvider in faker.providers