75 lines
2.1 KiB
Python
75 lines
2.1 KiB
Python
"""JMESPath query support for config-converter-cli."""
|
|
|
|
from typing import Any, Optional
|
|
|
|
import jmespath
|
|
|
|
from configconverter.converters import Converter
|
|
from configconverter.exceptions import ParseError, QueryError
|
|
|
|
|
|
class QueryEngine:
|
|
"""Handles JMESPath queries on configuration data."""
|
|
|
|
def __init__(self):
|
|
self.converter = Converter()
|
|
|
|
def query(
|
|
self,
|
|
content: str,
|
|
query_expression: str,
|
|
format: Optional[str] = None,
|
|
) -> Any:
|
|
"""Execute a JMESPath query on the content.
|
|
|
|
Args:
|
|
content: The configuration content
|
|
query_expression: JMESPath expression
|
|
format: Optional format hint (json, yaml, toml)
|
|
|
|
Returns:
|
|
Query result
|
|
"""
|
|
try:
|
|
if format:
|
|
data = self.converter._parse(content, format)
|
|
else:
|
|
detected = self.converter.detect_format(content)
|
|
data = self.converter._parse(content, detected)
|
|
|
|
result = jmespath.search(query_expression, data)
|
|
return result
|
|
|
|
except jmespath.exceptions.ParseError as e:
|
|
raise QueryError(f"Invalid JMESPath expression: {e}")
|
|
except jmespath.exceptions.JMESPathTypeError as e:
|
|
raise QueryError(f"JMESPath type error: {e}")
|
|
except ParseError:
|
|
raise
|
|
except Exception as e:
|
|
raise QueryError(f"Query execution failed: {e}")
|
|
|
|
def query_json(
|
|
self,
|
|
content: str,
|
|
query_expression: str,
|
|
format: Optional[str] = None,
|
|
) -> str:
|
|
"""Execute a JMESPath query and return result as JSON string.
|
|
|
|
Args:
|
|
content: The configuration content
|
|
query_expression: JMESPath expression
|
|
format: Optional format hint (json, yaml, toml)
|
|
|
|
Returns:
|
|
Query result as JSON string
|
|
"""
|
|
import json
|
|
|
|
result = self.query(content, query_expression, format)
|
|
return json.dumps(result, indent=2, ensure_ascii=False)
|
|
|
|
|
|
query_engine = QueryEngine()
|