Add converter modules (JSON, YAML, TOML, CSV)
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
284
src/cli.py
Normal file
284
src/cli.py
Normal file
@@ -0,0 +1,284 @@
|
||||
"""CLI interface for Data Format Converter."""
|
||||
|
||||
import sys
|
||||
import glob
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
from rich.box import ROUNDED
|
||||
|
||||
from src.converters import get_converter, SUPPORTED_FORMATS
|
||||
from src.validators import detect_format, get_validator
|
||||
from src.tui import DataViewer, DataEditor
|
||||
from src.watchers import FileWatcher
|
||||
|
||||
|
||||
console = Console()
|
||||
|
||||
|
||||
def validate_format(ctx, param, value):
|
||||
"""Validate format parameter."""
|
||||
if value and value.lower() not in SUPPORTED_FORMATS:
|
||||
raise click.BadParameter(
|
||||
f"Invalid format '{value}'. Supported formats: {', '.join(SUPPORTED_FORMATS)}"
|
||||
)
|
||||
return value.lower() if value else None
|
||||
|
||||
|
||||
def get_file_format(filepath: str) -> str:
|
||||
"""Get format from file path."""
|
||||
fmt = detect_format(filepath)
|
||||
if not fmt:
|
||||
raise click.BadParameter(
|
||||
f"Unknown file format for '{filepath}'. "
|
||||
f"Supported extensions: {', '.join(['.json', '.yaml', '.yml', '.toml', '.csv'])}"
|
||||
)
|
||||
return fmt
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option('--theme', default='default', help='Syntax highlighting theme')
|
||||
@click.pass_context
|
||||
def cli(ctx, theme):
|
||||
"""Data Format Converter - Convert between JSON, YAML, TOML, and CSV formats."""
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj['theme'] = theme
|
||||
ctx.obj['viewer'] = DataViewer(theme=theme)
|
||||
|
||||
|
||||
@cli.command('convert')
|
||||
@click.argument('input_file', type=click.File('r'), required=False)
|
||||
@click.option('--from', '-f', 'from_format', callback=validate_format,
|
||||
help='Input format (auto-detected from file extension)')
|
||||
@click.option('--to', '-t', 'to_format', required=True, callback=validate_format,
|
||||
help='Output format')
|
||||
@click.option('--output', '-o', type=click.Path(), default='-',
|
||||
help='Output file (use - for stdout)')
|
||||
@click.option('--indent', '-i', type=int, default=2, help='Indentation level')
|
||||
@click.pass_context
|
||||
def convert(ctx, input_file, from_format, to_format, output, indent):
|
||||
"""Convert data from one format to another."""
|
||||
viewer = ctx.obj['viewer']
|
||||
|
||||
if input_file:
|
||||
content = input_file.read()
|
||||
input_path = input_file.name
|
||||
else:
|
||||
content = sys.stdin.read()
|
||||
input_path = 'stdin'
|
||||
|
||||
if from_format:
|
||||
source_format = from_format
|
||||
else:
|
||||
if input_file:
|
||||
source_format = get_file_format(input_path)
|
||||
else:
|
||||
click.echo("Error: --from/-f required when reading from stdin", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
source_converter = get_converter(source_format)
|
||||
target_converter = get_converter(to_format)
|
||||
|
||||
result = source_converter.convert(content, target_converter, indent=indent)
|
||||
|
||||
if result.success:
|
||||
if output == '-':
|
||||
viewer.view(result.data, to_format, title="Converted Output")
|
||||
else:
|
||||
target_converter.write_file(output, result.data)
|
||||
viewer.display_success(f"Converted: {input_path} -> {output}")
|
||||
else:
|
||||
viewer.display_error(f"Conversion failed: {result.error}")
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
viewer.display_error(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command('validate')
|
||||
@click.argument('files', nargs=-1, type=click.Path(exists=True))
|
||||
@click.option('--format', '-f', 'fmt', callback=validate_format,
|
||||
help='Format to validate (auto-detected if not specified)')
|
||||
@click.option('--stdin', is_flag=True, help='Read from stdin')
|
||||
@click.pass_context
|
||||
def validate_cmd(ctx, files, fmt, stdin):
|
||||
"""Validate syntax of data files."""
|
||||
viewer = ctx.obj['viewer']
|
||||
|
||||
if stdin:
|
||||
content = sys.stdin.read()
|
||||
if not fmt:
|
||||
click.echo("Error: --format/-f required when reading from stdin", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
validator = get_validator(fmt)
|
||||
result = validator.validate(content)
|
||||
viewer.display_validation_result(result)
|
||||
sys.exit(0 if result.valid else 1)
|
||||
|
||||
if not files:
|
||||
click.echo("Error: No files specified", err=True)
|
||||
click.echo(ctx.get_help())
|
||||
sys.exit(1)
|
||||
|
||||
all_valid = True
|
||||
for filepath in files:
|
||||
if fmt:
|
||||
format_to_use = fmt
|
||||
else:
|
||||
format_to_use = get_file_format(filepath)
|
||||
|
||||
validator = get_validator(format_to_use)
|
||||
result = validator.validate(Path(filepath).read_text(encoding='utf-8'))
|
||||
|
||||
if result.valid:
|
||||
viewer.display_success(f"Valid: {filepath}")
|
||||
else:
|
||||
viewer.display_error(f"Invalid: {filepath}")
|
||||
viewer.display_validation_result(result)
|
||||
all_valid = False
|
||||
|
||||
sys.exit(0 if all_valid else 1)
|
||||
|
||||
|
||||
@cli.command('tui')
|
||||
@click.argument('input_file', type=click.File('r'), required=False)
|
||||
@click.option('--format', '-f', 'fmt', callback=validate_format,
|
||||
help='Input format (auto-detected from file extension)')
|
||||
@click.pass_context
|
||||
def tui(ctx, input_file, fmt):
|
||||
"""Launch interactive TUI mode."""
|
||||
viewer = ctx.obj['viewer']
|
||||
|
||||
if input_file:
|
||||
content = input_file.read()
|
||||
input_path = input_file.name
|
||||
format_name = fmt or get_file_format(input_path)
|
||||
else:
|
||||
content = sys.stdin.read()
|
||||
input_path = 'stdin'
|
||||
if not fmt:
|
||||
click.echo("Error: --format/-f required when reading from stdin", err=True)
|
||||
sys.exit(1)
|
||||
format_name = fmt
|
||||
|
||||
try:
|
||||
converter = get_converter(format_name)
|
||||
data = converter.loads(content)
|
||||
|
||||
viewer.view(content, format_name, title=input_path)
|
||||
|
||||
editor = DataEditor(console)
|
||||
editor.load_data(data, format_name)
|
||||
modified = editor.run()
|
||||
|
||||
if modified:
|
||||
output = converter.dumps(data, indent=2)
|
||||
viewer.view(output, format_name, title="Modified Data")
|
||||
|
||||
except Exception as e:
|
||||
viewer.display_error(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command('watch')
|
||||
@click.argument('patterns', nargs=-1, required=True)
|
||||
@click.option('--to', '-t', 'to_format', required=True, callback=validate_format,
|
||||
help='Output format')
|
||||
@click.option('--output', '-o', type=click.Path(), required=True,
|
||||
help='Output directory')
|
||||
@click.option('--debounce', '-d', type=float, default=1.0,
|
||||
help='Debounce delay in seconds')
|
||||
@click.pass_context
|
||||
def watch(ctx, patterns, to_format, output, debounce):
|
||||
"""Watch files for changes and auto-convert."""
|
||||
viewer = ctx.obj['viewer']
|
||||
|
||||
paths = []
|
||||
for pattern in patterns:
|
||||
expanded = glob.glob(pattern, recursive=True)
|
||||
if expanded:
|
||||
paths.extend(expanded)
|
||||
elif os.path.exists(pattern):
|
||||
paths.append(pattern)
|
||||
else:
|
||||
viewer.display_error(f"Path not found: {pattern}")
|
||||
|
||||
if not paths:
|
||||
viewer.display_error("No files to watch")
|
||||
sys.exit(1)
|
||||
|
||||
os.makedirs(output, exist_ok=True)
|
||||
|
||||
viewer.display_success(f"Watching {len(paths)} file(s). Press Ctrl+C to stop.")
|
||||
|
||||
def convert_file(filepath: str):
|
||||
try:
|
||||
fmt = get_file_format(filepath)
|
||||
source_converter = get_converter(fmt)
|
||||
target_converter = get_converter(to_format)
|
||||
|
||||
content = source_converter.read_file(filepath)
|
||||
result = source_converter.convert(content, target_converter)
|
||||
|
||||
if result.success:
|
||||
filename = os.path.basename(filepath)
|
||||
name_without_ext = os.path.splitext(filename)[0]
|
||||
output_path = os.path.join(output, name_without_ext + '.' + to_format)
|
||||
target_converter.write_file(output_path, result.data)
|
||||
viewer.display_success(f"Converted: {filepath} -> {output_path}")
|
||||
else:
|
||||
viewer.display_error(f"Conversion failed: {result.error}")
|
||||
except Exception as e:
|
||||
viewer.display_error(f"Error converting {filepath}: {e}")
|
||||
|
||||
try:
|
||||
watcher = FileWatcher(viewer.console)
|
||||
watcher.watch(paths, convert_file, debounce)
|
||||
watcher.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
viewer.display_success("Stopped watching")
|
||||
|
||||
|
||||
@cli.command('info')
|
||||
@click.pass_context
|
||||
def info(ctx):
|
||||
"""Show information about supported formats."""
|
||||
viewer = ctx.obj['viewer']
|
||||
|
||||
info_text = Text()
|
||||
info_text.append("Data Format Converter\n", style='bold cyan')
|
||||
info_text.append("─" * 30 + "\n\n", style='dim')
|
||||
info_text.append("Supported Formats:\n", style='yellow bold')
|
||||
|
||||
formats_info = [
|
||||
("JSON", ".json", "JavaScript Object Notation - hierarchical data"),
|
||||
("YAML", ".yaml, .yml", "YAML Ain't Markup Language - human-friendly"),
|
||||
("TOML", ".toml", "Tom's Obvious Minimal Language - configuration"),
|
||||
("CSV", ".csv", "Comma-Separated Values - tabular data"),
|
||||
]
|
||||
|
||||
for name, extensions, desc in formats_info:
|
||||
info_text.append(f" {name:<8} [{extensions}]\n", style='white')
|
||||
info_text.append(f" {desc}\n", style='dim')
|
||||
|
||||
info_text.append("\nFeatures:\n", style='yellow bold')
|
||||
info_text.append(" • Multi-format conversion\n", style='white')
|
||||
info_text.append(" • Syntax validation\n", style='white')
|
||||
info_text.append(" • Syntax highlighting\n", style='white')
|
||||
info_text.append(" • Interactive TUI mode\n", style='white')
|
||||
info_text.append(" • File watching\n", style='white')
|
||||
info_text.append(" • Stdin/stdout support\n", style='white')
|
||||
|
||||
panel = Panel(info_text, title="Info", box=ROUNDED)
|
||||
viewer.console.print(panel)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
Reference in New Issue
Block a user