Add formatters, utils, schemas and version
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.8) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / build-package (push) Has been cancelled
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.8) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / typecheck (push) Has been cancelled
CI / build-package (push) Has been cancelled
This commit is contained in:
147
configforge/utils/file_parser.py
Normal file
147
configforge/utils/file_parser.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""File parsing utilities for ConfigForge."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from configforge.exceptions import (
|
||||
ConversionError,
|
||||
FileOperationError,
|
||||
InvalidConfigFormatError,
|
||||
)
|
||||
|
||||
|
||||
SUPPORTED_FORMATS = ["json", "yaml", "yml", "toml", "env", "ini"]
|
||||
|
||||
|
||||
def detect_format(filepath: str) -> str:
|
||||
"""Detect the format of a configuration file based on its extension."""
|
||||
ext = Path(filepath).suffix.lower()
|
||||
|
||||
format_mapping = {
|
||||
".json": "json",
|
||||
".yaml": "yaml",
|
||||
".yml": "yaml",
|
||||
".toml": "toml",
|
||||
".env": "env",
|
||||
".ini": "ini",
|
||||
}
|
||||
|
||||
if ext not in format_mapping:
|
||||
raise InvalidConfigFormatError(
|
||||
f"Unsupported file format: {ext}. Supported formats: {', '.join(SUPPORTED_FORMATS)}"
|
||||
)
|
||||
|
||||
return format_mapping[ext]
|
||||
|
||||
|
||||
class FileParser:
|
||||
"""Parser for configuration files in various formats."""
|
||||
|
||||
@staticmethod
|
||||
def parse(filepath: str) -> dict:
|
||||
"""Parse a configuration file and return its contents as a dictionary."""
|
||||
try:
|
||||
format_type = detect_format(filepath)
|
||||
parser_method = getattr(FileParser, f"_parse_{format_type}", None)
|
||||
|
||||
if parser_method is None:
|
||||
raise InvalidConfigFormatError(f"No parser available for format: {format_type}")
|
||||
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
return parser_method(content, filepath)
|
||||
|
||||
except FileNotFoundError:
|
||||
raise FileOperationError(f"File not found: {filepath}", filepath=filepath)
|
||||
except PermissionError:
|
||||
raise FileOperationError(f"Permission denied: {filepath}", filepath=filepath)
|
||||
except Exception as e:
|
||||
if isinstance(e, (InvalidConfigFormatError, FileOperationError, ConversionError)):
|
||||
raise
|
||||
raise ConversionError(f"Failed to parse {filepath}: {str(e)}") from e
|
||||
|
||||
@staticmethod
|
||||
def _parse_json(content: str, filepath: str) -> dict:
|
||||
"""Parse JSON configuration file."""
|
||||
import json
|
||||
|
||||
try:
|
||||
return json.loads(content)
|
||||
except json.JSONDecodeError as e:
|
||||
raise ConversionError(f"Invalid JSON in {filepath}: {str(e)}") from e
|
||||
|
||||
@staticmethod
|
||||
def _parse_yaml(content: str, filepath: str) -> dict:
|
||||
"""Parse YAML configuration file."""
|
||||
import yaml
|
||||
|
||||
try:
|
||||
return yaml.safe_load(content) or {}
|
||||
except yaml.YAMLError as e:
|
||||
raise ConversionError(f"Invalid YAML in {filepath}: {str(e)}") from e
|
||||
|
||||
@staticmethod
|
||||
def _parse_toml(content: str, filepath: str) -> dict:
|
||||
"""Parse TOML configuration file."""
|
||||
import tomlkit
|
||||
|
||||
try:
|
||||
return tomlkit.loads(content)
|
||||
except tomlkit.exceptions.ParseError as e:
|
||||
raise ConversionError(f"Invalid TOML in {filepath}: {str(e)}") from e
|
||||
|
||||
@staticmethod
|
||||
def _parse_env(content: str, filepath: str) -> dict:
|
||||
"""Parse ENV configuration file."""
|
||||
result = {}
|
||||
for line_num, line in enumerate(content.splitlines(), 1):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if "=" in line:
|
||||
key, _, value = line.partition("=")
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
if not key:
|
||||
continue
|
||||
|
||||
value = value.strip('"').strip("'")
|
||||
result[key] = value
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _parse_ini(content: str, filepath: str) -> dict:
|
||||
"""Parse INI configuration file."""
|
||||
result = {}
|
||||
current_section = "default"
|
||||
|
||||
for line_num, line in enumerate(content.splitlines(), 1):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#") or line.startswith(";"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and line.endswith("]"):
|
||||
current_section = line[1:-1].strip()
|
||||
if current_section not in result:
|
||||
result[current_section] = {}
|
||||
elif "=" in line:
|
||||
key, _, value = line.partition("=")
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
if current_section == "default":
|
||||
result[key] = value
|
||||
else:
|
||||
result[current_section][key] = value
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def parse_file(filepath: str) -> dict:
|
||||
"""Parse a configuration file and return its contents as a dictionary."""
|
||||
return FileParser.parse(filepath)
|
||||
Reference in New Issue
Block a user