Add detector modules
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-04 20:06:47 +00:00
parent 2f7e90148f
commit 81802c47b9

132
confsync/detectors/base.py Normal file
View File

@@ -0,0 +1,132 @@
"""Base detector class and registry for configuration file detection."""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Type
from confsync.models.config_models import ConfigFile, ConfigCategory
class BaseDetector(ABC):
"""Base class for configuration file detectors."""
category: ConfigCategory
tool_name: str
@abstractmethod
def detect(self) -> List[ConfigFile]:
"""Detect configuration files for this tool.
Returns:
List of detected ConfigFile objects
"""
pass
def get_default_locations(self) -> List[str]:
"""Get default locations to search for configurations."""
return []
def get_file_patterns(self) -> List[str]:
"""Get file patterns to match."""
return []
def should_skip(self, path: str) -> bool:
"""Check if a path should be skipped."""
return False
def _create_config_file(self, path: str, content: str = "") -> ConfigFile:
"""Create a ConfigFile object from a path and content."""
from confsync.utils.file_utils import read_file_safe
from confsync.utils.path_utils import expand_path
full_path = expand_path(path)
if content == "":
content = read_file_safe(full_path) or ""
file_path = Path(full_path)
return ConfigFile(
path=full_path,
name=file_path.name,
category=self.category,
tool_name=self.tool_name,
content=content,
file_hash=self._calculate_hash(content),
size=len(content.encode('utf-8')),
)
def _calculate_hash(self, content: str) -> str:
"""Calculate hash of content."""
import hashlib
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def _get_file_hash(self, path: str) -> str:
"""Get hash of a file."""
from confsync.utils.file_utils import read_file_safe
content = read_file_safe(path)
if content is None:
return ""
return self._calculate_hash(content)
class DetectorRegistry:
"""Registry for all configuration detectors."""
_detectors: Dict[str, Type[BaseDetector]] = {}
_instances: Dict[str, BaseDetector] = {}
@classmethod
def register(cls, detector_class: Type[BaseDetector]) -> None:
"""Register a detector class."""
cls._detectors[detector_class.tool_name.lower()] = detector_class
@classmethod
def get_detector(cls, tool_name: str) -> Optional[BaseDetector]:
"""Get a detector instance by tool name."""
tool_lower = tool_name.lower()
if tool_lower not in cls._instances:
if tool_lower in cls._detectors:
cls._instances[tool_lower] = cls._detectors[tool_lower]()
return cls._instances.get(tool_lower)
@classmethod
def get_all_detectors(cls) -> Dict[str, BaseDetector]:
"""Get all registered detector instances."""
result = {}
for name, class_obj in cls._detectors.items():
if name not in cls._instances:
cls._instances[name] = class_obj()
result[name] = cls._instances[name]
return result
@classmethod
def detect_all(cls, categories: Optional[List[ConfigCategory]] = None) -> List[ConfigFile]:
"""Run all registered detectors."""
all_configs = []
for name, detector in cls.get_all_detectors().items():
if categories is None or detector.category in categories:
try:
configs = detector.detect()
all_configs.extend(configs)
except Exception as e:
print(f"Error running detector {name}: {e}")
return all_configs
@classmethod
def list_categories(cls) -> List[ConfigCategory]:
"""List all available categories."""
categories = set()
for detector_class in cls._detectors.values():
categories.add(detector_class.category)
return list(categories)
@classmethod
def list_tools(cls) -> List[str]:
"""List all available tools."""
return list(cls._detectors.keys())
def detector(detector_class: Type[BaseDetector]) -> Type[BaseDetector]:
"""Decorator to register a detector."""
DetectorRegistry.register(detector_class)
return detector_class