diff --git a/errorfix/rules/loader.py b/errorfix/rules/loader.py new file mode 100644 index 0000000..4579a0b --- /dev/null +++ b/errorfix/rules/loader.py @@ -0,0 +1,109 @@ +import os +import json +from pathlib import Path +from typing import List, Optional, Union +import yaml + +from .rule import Rule +from .validator import RuleValidator + + +class RuleLoader: + def __init__(self, validator: Optional[RuleValidator] = None): + self.validator = validator or RuleValidator() + self._cache: dict = {} + + def load_yaml(self, file_path: str) -> List[Rule]: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Rule file not found: {file_path}") + + with open(path, 'r', encoding='utf-8') as f: + data = yaml.safe_load(f) + + if not data: + return [] + + rules = [] + items = data if isinstance(data, list) else [data] + + for item in items: + self.validator.validate(item) + rule = Rule.from_dict(item) + rules.append(rule) + + return rules + + def load_json(self, file_path: str) -> List[Rule]: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Rule file not found: {file_path}") + + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) + + if not data: + return [] + + rules = [] + items = data if isinstance(data, list) else [data] + + for item in items: + self.validator.validate(item) + rule = Rule.from_dict(item) + rules.append(rule) + + return rules + + def load_directory(self, directory: str, recursive: bool = True) -> List[Rule]: + path = Path(directory) + if not path.exists() or not path.is_dir(): + raise NotADirectoryError(f"Directory not found: {directory}") + + rules = [] + pattern = '**/*.yaml' if recursive else '*.yaml' + yaml_files = list(path.glob(pattern)) + pattern = '**/*.yml' if recursive else '*.yml' + yml_files = list(path.glob(pattern)) + pattern = '**/*.json' if recursive else '*.json' + json_files = list(path.glob(pattern)) + + for file_path in yaml_files + yml_files + json_files: + try: + if file_path.suffix in ['.yaml', '.yml']: + file_rules = self.load_yaml(str(file_path)) + else: + file_rules = self.load_json(str(file_path)) + rules.extend(file_rules) + except Exception as e: + print(f"Warning: Failed to load rules from {file_path}: {e}") + + return rules + + def load_multiple(self, sources: List[str]) -> List[Rule]: + all_rules = [] + for source in sources: + if os.path.isfile(source): + if source.endswith(('.yaml', '.yml')): + all_rules.extend(self.load_yaml(source)) + elif source.endswith('.json'): + all_rules.extend(self.load_json(source)) + elif os.path.isdir(source): + all_rules.extend(self.load_directory(source)) + return all_rules + + def filter_rules( + self, + rules: List[Rule], + language: Optional[str] = None, + tool: Optional[str] = None, + tags: Optional[List[str]] = None, + ) -> List[Rule]: + filtered = rules + if language: + filtered = [r for r in filtered if r.language is None or r.language == language] + if tool: + filtered = [r for r in filtered if r.tool is None or r.tool == tool] + if tags: + filtered = [r for r in filtered if any(t in r.tags for t in tags)] + return sorted(filtered, key=lambda r: r.priority, reverse=True)