Initial upload: DataForge CLI with full documentation and tests
This commit is contained in:
316
dataforge/commands.py
Normal file
316
dataforge/commands.py
Normal file
@@ -0,0 +1,316 @@
|
||||
"""CLI commands for DataForge CLI."""
|
||||
|
||||
import glob
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import click
|
||||
|
||||
from .parsers import (
|
||||
detect_format,
|
||||
dump_data,
|
||||
load_data,
|
||||
SUPPORTED_FORMATS,
|
||||
)
|
||||
from .validator import SchemaValidator
|
||||
from .type_check import validate_types, infer_schema_from_data
|
||||
|
||||
|
||||
def find_files(
|
||||
pattern: str,
|
||||
recursive: bool = False,
|
||||
directory: Optional[str] = None
|
||||
) -> List[str]:
|
||||
"""Find files matching a pattern."""
|
||||
search_dir = directory or "."
|
||||
if recursive:
|
||||
pattern = os.path.join(search_dir, "**", pattern)
|
||||
files = glob.glob(pattern, recursive=True)
|
||||
else:
|
||||
pattern = os.path.join(search_dir, pattern)
|
||||
files = glob.glob(pattern)
|
||||
return sorted(files)
|
||||
|
||||
|
||||
def resolve_format(fmt: Optional[str], file_path: str) -> str:
|
||||
"""Resolve format from explicit option or file extension."""
|
||||
if fmt:
|
||||
if fmt not in SUPPORTED_FORMATS:
|
||||
raise click.BadParameter(
|
||||
f"Unsupported format: {fmt}. Supported: {', '.join(SUPPORTED_FORMATS)}",
|
||||
param_hint="--format"
|
||||
)
|
||||
return fmt
|
||||
return detect_format(file_path)
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("input_file", type=click.Path(exists=True))
|
||||
@click.argument("output_file", type=click.Path())
|
||||
@click.option("--from", "-f", "from_format", help="Input format (json, yaml, toml)")
|
||||
@click.option("--to", "-t", "to_format", required=True, help="Output format (json, yaml, toml)")
|
||||
@click.option("--indent", "-i", default=2, help="Indentation spaces (0 for compact)")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Minimal output")
|
||||
def convert(
|
||||
input_file: str,
|
||||
output_file: str,
|
||||
from_format: Optional[str],
|
||||
to_format: str,
|
||||
indent: int,
|
||||
quiet: bool
|
||||
) -> None:
|
||||
"""Convert a file from one format to another.
|
||||
|
||||
INPUT_FILE: Input file path (or - for stdin)
|
||||
|
||||
OUTPUT_FILE: Output file path (or - for stdout)
|
||||
"""
|
||||
try:
|
||||
if to_format not in SUPPORTED_FORMATS:
|
||||
raise click.BadParameter(
|
||||
f"Unsupported format: {to_format}. Supported: {', '.join(SUPPORTED_FORMATS)}",
|
||||
param_hint="--to"
|
||||
)
|
||||
|
||||
if input_file == "-":
|
||||
import sys
|
||||
content = sys.stdin.read()
|
||||
input_format = from_format or "json"
|
||||
data = load_data(content, input_format)
|
||||
else:
|
||||
input_format = resolve_format(from_format, input_file)
|
||||
data = load_data(input_file, input_format)
|
||||
|
||||
output_format = to_format
|
||||
result = dump_data(data, output_format, indent=indent if indent > 0 else None)
|
||||
|
||||
if output_file == "-":
|
||||
import sys
|
||||
sys.stdout.write(result)
|
||||
else:
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
f.write(result)
|
||||
|
||||
if not quiet:
|
||||
click.echo(f"Successfully converted {input_file} to {output_file}")
|
||||
|
||||
except Exception as e:
|
||||
raise click.ClickException(str(e))
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("input_files", nargs=-1, type=click.Path(exists=True))
|
||||
@click.option("--from", "-f", "from_format", help="Input format (json, yaml, toml)")
|
||||
@click.option("--to", "-t", "to_format", required=True, help="Output format (json, yaml, toml)")
|
||||
@click.option("--output-dir", "-o", default=".", help="Output directory for converted files")
|
||||
@click.option("--indent", "-i", default=2, help="Indentation spaces")
|
||||
@click.option("--pattern", "-p", default="*.{json,yaml,yml,toml}", help="File pattern for batch processing")
|
||||
@click.option("--recursive", "-r", is_flag=True, help="Search recursively")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Minimal output")
|
||||
def batch_convert(
|
||||
input_files: tuple,
|
||||
from_format: Optional[str],
|
||||
to_format: str,
|
||||
output_dir: str,
|
||||
indent: int,
|
||||
pattern: str,
|
||||
recursive: bool,
|
||||
quiet: bool
|
||||
) -> None:
|
||||
"""Convert multiple files from one format to another.
|
||||
|
||||
INPUT_FILES: Input file paths (optional, uses --pattern if not provided)
|
||||
"""
|
||||
try:
|
||||
if to_format not in SUPPORTED_FORMATS:
|
||||
raise click.BadParameter(
|
||||
f"Unsupported format: {to_format}. Supported: {', '.join(SUPPORTED_FORMATS)}",
|
||||
param_hint="--to"
|
||||
)
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
files = list(input_files) if input_files else find_files(pattern, recursive)
|
||||
|
||||
if not files:
|
||||
if not quiet:
|
||||
click.echo("No files found matching the pattern")
|
||||
return
|
||||
|
||||
converted = 0
|
||||
errors = 0
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
input_format = resolve_format(from_format, file_path)
|
||||
data = load_data(file_path, input_format)
|
||||
|
||||
output_filename = Path(file_path).stem + f".{to_format}"
|
||||
output_file = os.path.join(output_dir, output_filename)
|
||||
dump_data(data, to_format, output_file, indent=indent if indent > 0 else None)
|
||||
converted += 1
|
||||
except Exception as e:
|
||||
errors += 1
|
||||
if not quiet:
|
||||
click.echo(f"Error converting {file_path}: {e}")
|
||||
|
||||
if not quiet:
|
||||
click.echo(f"Converted {converted} files, {errors} errors")
|
||||
elif errors > 0:
|
||||
click.echo(f"Converted {converted}, errors: {errors}", err=True)
|
||||
|
||||
except Exception as e:
|
||||
raise click.ClickException(str(e))
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("input_file", type=click.Path(exists=True))
|
||||
@click.option("--schema", "-s", "schema_file", help="Path to JSON Schema file")
|
||||
@click.option("--strict", is_flag=True, help="Strict validation mode")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Minimal output")
|
||||
def validate(
|
||||
input_file: str,
|
||||
schema_file: Optional[str],
|
||||
strict: bool,
|
||||
quiet: bool
|
||||
) -> None:
|
||||
"""Validate a file against a JSON Schema.
|
||||
|
||||
INPUT_FILE: Input file path (or - for stdin)
|
||||
"""
|
||||
try:
|
||||
if input_file == "-":
|
||||
import sys
|
||||
content = sys.stdin.read()
|
||||
input_format = "json"
|
||||
data = load_data(content, input_format)
|
||||
else:
|
||||
input_format = detect_format(input_file)
|
||||
data = load_data(input_file, input_format)
|
||||
|
||||
if schema_file:
|
||||
validator = SchemaValidator(schema_file=schema_file)
|
||||
errors = validator.validate(data)
|
||||
|
||||
if errors:
|
||||
if not quiet:
|
||||
for error in validator.get_error_messages(errors):
|
||||
click.echo(error)
|
||||
raise click.ClickException(f"Validation failed with {len(errors)} error(s)")
|
||||
else:
|
||||
if not quiet:
|
||||
click.echo("Validation passed")
|
||||
else:
|
||||
if not quiet:
|
||||
click.echo(f"File is valid {input_format}")
|
||||
|
||||
except Exception as e:
|
||||
raise click.ClickException(str(e))
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("input_files", nargs=-1, type=click.Path(exists=True))
|
||||
@click.option("--schema", "-s", "schema_file", required=True, help="Path to JSON Schema file")
|
||||
@click.option("--pattern", "-p", default="*.{json,yaml,yml,toml}", help="File pattern for batch processing")
|
||||
@click.option("--recursive", "-r", is_flag=True, help="Search recursively")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Minimal output")
|
||||
def batch_validate(
|
||||
input_files: tuple,
|
||||
schema_file: str,
|
||||
pattern: str,
|
||||
recursive: bool,
|
||||
quiet: bool
|
||||
) -> None:
|
||||
"""Validate multiple files against a JSON Schema.
|
||||
|
||||
INPUT_FILES: Input file paths (optional, uses --pattern if not provided)
|
||||
"""
|
||||
try:
|
||||
files = list(input_files) if input_files else find_files(pattern, recursive)
|
||||
|
||||
if not files:
|
||||
if not quiet:
|
||||
click.echo("No files found matching the pattern")
|
||||
return
|
||||
|
||||
validator = SchemaValidator(schema_file=schema_file)
|
||||
valid_count = 0
|
||||
invalid_count = 0
|
||||
invalid_files = []
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
input_format = detect_format(file_path)
|
||||
data = load_data(file_path, input_format)
|
||||
errors = validator.validate(data)
|
||||
|
||||
if errors:
|
||||
invalid_count += 1
|
||||
invalid_files.append(file_path)
|
||||
if not quiet:
|
||||
click.echo(f"Invalid: {file_path}")
|
||||
for error in validator.get_error_messages(errors)[:3]:
|
||||
click.echo(f" {error}")
|
||||
else:
|
||||
valid_count += 1
|
||||
if not quiet:
|
||||
click.echo(f"Valid: {file_path}")
|
||||
except Exception as e:
|
||||
invalid_count += 1
|
||||
invalid_files.append(file_path)
|
||||
if not quiet:
|
||||
click.echo(f"Error: {file_path} - {e}")
|
||||
|
||||
if not quiet:
|
||||
click.echo(f"\nSummary: {valid_count} valid, {invalid_count} invalid")
|
||||
elif invalid_count > 0:
|
||||
click.echo(f"Valid: {valid_count}, Invalid: {invalid_count}", err=True)
|
||||
|
||||
if invalid_count > 0:
|
||||
raise click.ClickException(f"Validation failed for {invalid_count} file(s)")
|
||||
|
||||
except Exception as e:
|
||||
raise click.ClickException(str(e))
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument("input_file", type=click.Path(exists=True))
|
||||
@click.option("--infer", is_flag=True, help="Infer schema from data")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Minimal output")
|
||||
def typecheck(
|
||||
input_file: str,
|
||||
infer: bool,
|
||||
quiet: bool
|
||||
) -> None:
|
||||
"""Check types in a data file.
|
||||
|
||||
INPUT_FILE: Input file path (or - for stdin)
|
||||
"""
|
||||
try:
|
||||
if input_file == "-":
|
||||
import sys
|
||||
content = sys.stdin.read()
|
||||
input_format = "json"
|
||||
data = load_data(content, input_format)
|
||||
else:
|
||||
input_format = detect_format(input_file)
|
||||
data = load_data(input_file, input_format)
|
||||
|
||||
if infer:
|
||||
schema = infer_schema_from_data(data)
|
||||
if not quiet:
|
||||
import json
|
||||
click.echo(json.dumps(schema, indent=2))
|
||||
else:
|
||||
if not quiet:
|
||||
inferred_type = type(data).__name__
|
||||
if isinstance(data, dict):
|
||||
click.echo(f"Type: object with {len(data)} keys")
|
||||
elif isinstance(data, list):
|
||||
click.echo(f"Type: array with {len(data)} items")
|
||||
else:
|
||||
click.echo(f"Type: {inferred_type}")
|
||||
|
||||
except Exception as e:
|
||||
raise click.ClickException(str(e))
|
||||
Reference in New Issue
Block a user