Initial upload: config-converter-cli v1.0.0
This commit is contained in:
74
configconverter/query.py
Normal file
74
configconverter/query.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""JMESPath query support for config-converter-cli."""
|
||||
|
||||
from typing import Any, Optional
|
||||
|
||||
import jmespath
|
||||
|
||||
from configconverter.converters import Converter
|
||||
from configconverter.exceptions import ParseError, QueryError
|
||||
|
||||
|
||||
class QueryEngine:
|
||||
"""Handles JMESPath queries on configuration data."""
|
||||
|
||||
def __init__(self):
|
||||
self.converter = Converter()
|
||||
|
||||
def query(
|
||||
self,
|
||||
content: str,
|
||||
query_expression: str,
|
||||
format: Optional[str] = None,
|
||||
) -> Any:
|
||||
"""Execute a JMESPath query on the content.
|
||||
|
||||
Args:
|
||||
content: The configuration content
|
||||
query_expression: JMESPath expression
|
||||
format: Optional format hint (json, yaml, toml)
|
||||
|
||||
Returns:
|
||||
Query result
|
||||
"""
|
||||
try:
|
||||
if format:
|
||||
data = self.converter._parse(content, format)
|
||||
else:
|
||||
detected = self.converter.detect_format(content)
|
||||
data = self.converter._parse(content, detected)
|
||||
|
||||
result = jmespath.search(query_expression, data)
|
||||
return result
|
||||
|
||||
except jmespath.exceptions.ParseError as e:
|
||||
raise QueryError(f"Invalid JMESPath expression: {e}")
|
||||
except jmespath.exceptions.JMESPathTypeError as e:
|
||||
raise QueryError(f"JMESPath type error: {e}")
|
||||
except ParseError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise QueryError(f"Query execution failed: {e}")
|
||||
|
||||
def query_json(
|
||||
self,
|
||||
content: str,
|
||||
query_expression: str,
|
||||
format: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Execute a JMESPath query and return result as JSON string.
|
||||
|
||||
Args:
|
||||
content: The configuration content
|
||||
query_expression: JMESPath expression
|
||||
format: Optional format hint (json, yaml, toml)
|
||||
|
||||
Returns:
|
||||
Query result as JSON string
|
||||
"""
|
||||
import json
|
||||
|
||||
result = self.query(content, query_expression, format)
|
||||
return json.dumps(result, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
query_engine = QueryEngine()
|
||||
Reference in New Issue
Block a user