Initial upload: DataForge CLI with full documentation and tests
This commit is contained in:
117
dataforge/parsers.py
Normal file
117
dataforge/parsers.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""Data parsing and serialization module for JSON, YAML, and TOML formats."""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
try:
|
||||
import tomli
|
||||
except ImportError:
|
||||
tomli = None
|
||||
|
||||
try:
|
||||
import tomllib
|
||||
except ImportError:
|
||||
tomllib = None
|
||||
|
||||
SUPPORTED_FORMATS = ["json", "yaml", "toml"]
|
||||
|
||||
|
||||
def detect_format(file_path: str) -> str:
|
||||
"""Detect file format from extension."""
|
||||
ext = Path(file_path).suffix.lower()
|
||||
format_map = {
|
||||
".json": "json",
|
||||
".yaml": "yaml",
|
||||
".yml": "yaml",
|
||||
".toml": "toml",
|
||||
}
|
||||
format_name = format_map.get(ext)
|
||||
if format_name is None:
|
||||
raise ValueError(f"Unsupported file extension: {ext}. Supported formats: {', '.join(SUPPORTED_FORMATS)}")
|
||||
return format_name
|
||||
|
||||
|
||||
def detect_format_from_content(content: str) -> Optional[str]:
|
||||
"""Detect format from content (try parsing)."""
|
||||
if content.strip().startswith("{") or content.strip().startswith("["):
|
||||
try:
|
||||
json.loads(content)
|
||||
return "json"
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
if "=" in content and ":" not in content.split("=")[0]:
|
||||
return "toml"
|
||||
return "yaml"
|
||||
|
||||
|
||||
def load_data(source: str, format: Optional[str] = None) -> Any:
|
||||
"""Load data from a file path or string content."""
|
||||
path = Path(source)
|
||||
if path.exists() and path.is_file():
|
||||
file_format = format or detect_format(source)
|
||||
with open(source, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
return parse_content(content, file_format)
|
||||
else:
|
||||
detected = format or detect_format_from_content(source)
|
||||
if detected is None:
|
||||
detected = "json"
|
||||
return parse_content(source, detected)
|
||||
|
||||
|
||||
def parse_content(content: str, format: str) -> Any:
|
||||
"""Parse content string based on format."""
|
||||
if format == "json":
|
||||
return json.loads(content)
|
||||
elif format == "yaml":
|
||||
return yaml.safe_load(content)
|
||||
elif format == "toml":
|
||||
if tomli is not None:
|
||||
return tomli.loads(content)
|
||||
elif tomllib is not None:
|
||||
return tomllib.loads(content)
|
||||
else:
|
||||
raise ImportError("Neither tomli nor tomllib is available for TOML parsing")
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format}. Supported formats: {', '.join(SUPPORTED_FORMATS)}")
|
||||
|
||||
|
||||
def dump_data(data: Any, format: str, output: Optional[str] = None, indent: int = 2) -> str:
|
||||
"""Dump data to string or file based on format."""
|
||||
if format == "json":
|
||||
result = json.dumps(data, indent=indent, ensure_ascii=False)
|
||||
elif format == "yaml":
|
||||
result = yaml.dump(data, indent=indent, allow_unicode=True, sort_keys=False)
|
||||
elif format == "toml":
|
||||
try:
|
||||
import tomli_w
|
||||
result = tomli_w.dumps(data)
|
||||
except ImportError:
|
||||
try:
|
||||
import tomllib
|
||||
result = tomllib.dumps(data)
|
||||
except ImportError:
|
||||
raise ImportError("tomli_w or tomllib required for TOML output")
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format}. Supported formats: {', '.join(SUPPORTED_FORMATS)}")
|
||||
|
||||
if output:
|
||||
with open(output, "w", encoding="utf-8") as f:
|
||||
f.write(result)
|
||||
return ""
|
||||
return result
|
||||
|
||||
|
||||
def read_file(file_path: str) -> str:
|
||||
"""Read file content."""
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def write_file(content: str, file_path: str) -> None:
|
||||
"""Write content to file."""
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
Reference in New Issue
Block a user