diff --git a/confsync/detectors/base.py b/confsync/detectors/base.py new file mode 100644 index 0000000..c096339 --- /dev/null +++ b/confsync/detectors/base.py @@ -0,0 +1,132 @@ +"""Base detector class and registry for configuration file detection.""" + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Dict, List, Optional, Type + +from confsync.models.config_models import ConfigFile, ConfigCategory + + +class BaseDetector(ABC): + """Base class for configuration file detectors.""" + + category: ConfigCategory + tool_name: str + + @abstractmethod + def detect(self) -> List[ConfigFile]: + """Detect configuration files for this tool. + + Returns: + List of detected ConfigFile objects + """ + pass + + def get_default_locations(self) -> List[str]: + """Get default locations to search for configurations.""" + return [] + + def get_file_patterns(self) -> List[str]: + """Get file patterns to match.""" + return [] + + def should_skip(self, path: str) -> bool: + """Check if a path should be skipped.""" + return False + + def _create_config_file(self, path: str, content: str = "") -> ConfigFile: + """Create a ConfigFile object from a path and content.""" + from confsync.utils.file_utils import read_file_safe + from confsync.utils.path_utils import expand_path + + full_path = expand_path(path) + + if content == "": + content = read_file_safe(full_path) or "" + + file_path = Path(full_path) + return ConfigFile( + path=full_path, + name=file_path.name, + category=self.category, + tool_name=self.tool_name, + content=content, + file_hash=self._calculate_hash(content), + size=len(content.encode('utf-8')), + ) + + def _calculate_hash(self, content: str) -> str: + """Calculate hash of content.""" + import hashlib + return hashlib.sha256(content.encode('utf-8')).hexdigest() + + def _get_file_hash(self, path: str) -> str: + """Get hash of a file.""" + from confsync.utils.file_utils import read_file_safe + content = read_file_safe(path) + if content is None: + return "" + return self._calculate_hash(content) + + +class DetectorRegistry: + """Registry for all configuration detectors.""" + + _detectors: Dict[str, Type[BaseDetector]] = {} + _instances: Dict[str, BaseDetector] = {} + + @classmethod + def register(cls, detector_class: Type[BaseDetector]) -> None: + """Register a detector class.""" + cls._detectors[detector_class.tool_name.lower()] = detector_class + + @classmethod + def get_detector(cls, tool_name: str) -> Optional[BaseDetector]: + """Get a detector instance by tool name.""" + tool_lower = tool_name.lower() + if tool_lower not in cls._instances: + if tool_lower in cls._detectors: + cls._instances[tool_lower] = cls._detectors[tool_lower]() + return cls._instances.get(tool_lower) + + @classmethod + def get_all_detectors(cls) -> Dict[str, BaseDetector]: + """Get all registered detector instances.""" + result = {} + for name, class_obj in cls._detectors.items(): + if name not in cls._instances: + cls._instances[name] = class_obj() + result[name] = cls._instances[name] + return result + + @classmethod + def detect_all(cls, categories: Optional[List[ConfigCategory]] = None) -> List[ConfigFile]: + """Run all registered detectors.""" + all_configs = [] + for name, detector in cls.get_all_detectors().items(): + if categories is None or detector.category in categories: + try: + configs = detector.detect() + all_configs.extend(configs) + except Exception as e: + print(f"Error running detector {name}: {e}") + return all_configs + + @classmethod + def list_categories(cls) -> List[ConfigCategory]: + """List all available categories.""" + categories = set() + for detector_class in cls._detectors.values(): + categories.add(detector_class.category) + return list(categories) + + @classmethod + def list_tools(cls) -> List[str]: + """List all available tools.""" + return list(cls._detectors.keys()) + + +def detector(detector_class: Type[BaseDetector]) -> Type[BaseDetector]: + """Decorator to register a detector.""" + DetectorRegistry.register(detector_class) + return detector_class