Initial upload: ErrorFix CLI with rule engine and pattern matching
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
109
errorfix/rules/loader.py
Normal file
109
errorfix/rules/loader.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Union
|
||||
import yaml
|
||||
|
||||
from .rule import Rule
|
||||
from .validator import RuleValidator
|
||||
|
||||
|
||||
class RuleLoader:
|
||||
def __init__(self, validator: Optional[RuleValidator] = None):
|
||||
self.validator = validator or RuleValidator()
|
||||
self._cache: dict = {}
|
||||
|
||||
def load_yaml(self, file_path: str) -> List[Rule]:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Rule file not found: {file_path}")
|
||||
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
if not data:
|
||||
return []
|
||||
|
||||
rules = []
|
||||
items = data if isinstance(data, list) else [data]
|
||||
|
||||
for item in items:
|
||||
self.validator.validate(item)
|
||||
rule = Rule.from_dict(item)
|
||||
rules.append(rule)
|
||||
|
||||
return rules
|
||||
|
||||
def load_json(self, file_path: str) -> List[Rule]:
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"Rule file not found: {file_path}")
|
||||
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if not data:
|
||||
return []
|
||||
|
||||
rules = []
|
||||
items = data if isinstance(data, list) else [data]
|
||||
|
||||
for item in items:
|
||||
self.validator.validate(item)
|
||||
rule = Rule.from_dict(item)
|
||||
rules.append(rule)
|
||||
|
||||
return rules
|
||||
|
||||
def load_directory(self, directory: str, recursive: bool = True) -> List[Rule]:
|
||||
path = Path(directory)
|
||||
if not path.exists() or not path.is_dir():
|
||||
raise NotADirectoryError(f"Directory not found: {directory}")
|
||||
|
||||
rules = []
|
||||
pattern = '**/*.yaml' if recursive else '*.yaml'
|
||||
yaml_files = list(path.glob(pattern))
|
||||
pattern = '**/*.yml' if recursive else '*.yml'
|
||||
yml_files = list(path.glob(pattern))
|
||||
pattern = '**/*.json' if recursive else '*.json'
|
||||
json_files = list(path.glob(pattern))
|
||||
|
||||
for file_path in yaml_files + yml_files + json_files:
|
||||
try:
|
||||
if file_path.suffix in ['.yaml', '.yml']:
|
||||
file_rules = self.load_yaml(str(file_path))
|
||||
else:
|
||||
file_rules = self.load_json(str(file_path))
|
||||
rules.extend(file_rules)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to load rules from {file_path}: {e}")
|
||||
|
||||
return rules
|
||||
|
||||
def load_multiple(self, sources: List[str]) -> List[Rule]:
|
||||
all_rules = []
|
||||
for source in sources:
|
||||
if os.path.isfile(source):
|
||||
if source.endswith(('.yaml', '.yml')):
|
||||
all_rules.extend(self.load_yaml(source))
|
||||
elif source.endswith('.json'):
|
||||
all_rules.extend(self.load_json(source))
|
||||
elif os.path.isdir(source):
|
||||
all_rules.extend(self.load_directory(source))
|
||||
return all_rules
|
||||
|
||||
def filter_rules(
|
||||
self,
|
||||
rules: List[Rule],
|
||||
language: Optional[str] = None,
|
||||
tool: Optional[str] = None,
|
||||
tags: Optional[List[str]] = None,
|
||||
) -> List[Rule]:
|
||||
filtered = rules
|
||||
if language:
|
||||
filtered = [r for r in filtered if r.language is None or r.language == language]
|
||||
if tool:
|
||||
filtered = [r for r in filtered if r.tool is None or r.tool == tool]
|
||||
if tags:
|
||||
filtered = [r for r in filtered if any(t in r.tags for t in tags)]
|
||||
return sorted(filtered, key=lambda r: r.priority, reverse=True)
|
||||
Reference in New Issue
Block a user