From 23ad0afacd26961725c35eb8269474e1bf36a32b Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Wed, 4 Feb 2026 20:03:05 +0000 Subject: [PATCH] Add core modules: manifest, merger, validator --- confsync/core/merger.py | 387 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 confsync/core/merger.py diff --git a/confsync/core/merger.py b/confsync/core/merger.py new file mode 100644 index 0000000..fa62ba2 --- /dev/null +++ b/confsync/core/merger.py @@ -0,0 +1,387 @@ +"""Merge strategies for configuration files.""" + +import difflib +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +class MergeStrategy(Enum): + """Available merge strategies.""" + + KEEP_LOCAL = "keep_local" + KEEP_REMOTE = "keep_remote" + KEEP_COMMON = "keep_common" + THREE_WAY = "three_way" + UNION = "union" + CUSTOM = "custom" + + +@dataclass +class MergeConflict: + """Represents a merge conflict.""" + + file_path: str + local_lines: List[str] + remote_lines: List[str] + base_lines: Optional[List[str]] + local_marker: str = "<<<<<<< LOCAL" + common_marker: str = "=======" + remote_marker: str = ">>>>>>> REMOTE" + + def format_conflict(self) -> str: + """Format conflict as a string.""" + lines = [] + lines.append(self.local_marker) + lines.extend(self.local_lines) + lines.append(self.common_marker) + lines.extend(self.remote_lines) + lines.append(self.remote_marker) + return '\n'.join(lines) + + +class BaseMergeStrategy(ABC): + """Base class for merge strategies.""" + + @abstractmethod + def merge( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + """Merge two contents. + + Returns: + Tuple of (success, merged_content, conflict) + """ + pass + + +class KeepLocalStrategy(BaseMergeStrategy): + """Strategy that keeps local changes.""" + + def merge( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + return True, local_content, None + + +class KeepRemoteStrategy(BaseMergeStrategy): + """Strategy that keeps remote changes.""" + + def merge( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + return True, remote_content, None + + +class KeepCommonStrategy(BaseMergeStrategy): + """Strategy that keeps common lines between local and remote.""" + + def merge( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + local_lines = local_content.splitlines() + remote_lines = remote_content.splitlines() + + common = set(local_lines) & set(remote_lines) + common_lines = [line for line in local_lines if line in common] + + return True, '\n'.join(common_lines), None + + +class ThreeWayMergeStrategy(BaseMergeStrategy): + """Standard three-way merge algorithm.""" + + def merge( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + if not base_content: + base_content = "" + + local_lines = local_content.splitlines() + remote_lines = remote_content.splitlines() + base_lines = base_content.splitlines() + + merged, conflicts = self._three_way_merge( + local_lines, base_lines, remote_lines + ) + + if conflicts: + conflict = MergeConflict( + file_path=file_path or "", + local_lines=local_lines, + remote_lines=remote_lines, + base_lines=base_lines, + ) + return False, '\n'.join(merged), conflict + + return True, '\n'.join(merged), None + + def _three_way_merge( + self, + local: List[str], + base: List[str], + remote: List[str] + ) -> Tuple[List[str], List[MergeConflict]]: + """Perform three-way merge using diff3 algorithm.""" + merged = [] + conflicts = [] + + matcher = difflib.SequenceMatcher(None, base, local) + remote_matcher = difflib.SequenceMatcher(None, base, remote) + + for tag, i1, i2, j1, j2 in matcher.get_opcodes(): + if tag == 'equal': + merged.extend(base[i1:i2]) + elif tag == 'delete': + if self._has_remote_changes(base[i1:i2], remote, remote_matcher): + conflict = self._create_conflict( + local[i1:i2] if tag == 'insert' else base[i1:i2], + remote, + base[i1:i2], + remote_matcher + ) + conflicts.append(conflict) + merged.extend(conflict.format_conflict().splitlines()) + else: + merged.extend(base[i1:i2]) + elif tag == 'insert': + if not self._has_remote_changes(base[i1:i2], remote, remote_matcher): + merged.extend(local[j1:j2]) + else: + conflict = self._create_conflict( + local[j1:j2], + remote, + base[i1:i2], + remote_matcher + ) + conflicts.append(conflict) + merged.extend(conflict.format_conflict().splitlines()) + elif tag == 'replace': + remote_section = self._get_remote_section( + base[i1:i2], remote, remote_matcher + ) + if base[i1:i2] == remote_section: + merged.extend(local[j1:j2]) + else: + conflict = self._create_conflict( + local[j1:j2], + remote_section, + base[i1:i2], + remote_matcher + ) + conflicts.append(conflict) + merged.extend(conflict.format_conflict().splitlines()) + + return merged, conflicts + + def _has_remote_changes( + self, + base_section: List[str], + remote: List[str], + remote_matcher: difflib.SequenceMatcher + ) -> bool: + """Check if remote has changes from base.""" + if not base_section: + return False + + indices = self._find_section_in_remote(base_section, remote, remote_matcher) + return len(indices) != len(base_section) or any( + remote[i] != base_section[j] + for i, j in zip(indices, range(len(base_section))) + if i < len(remote) and j < len(base_section) + ) + + def _find_section_in_remote( + self, + section: List[str], + remote: List[str], + matcher: difflib.SequenceMatcher + ) -> List[int]: + """Find a section in remote content.""" + for i in range(len(remote) - len(section) + 1): + if all(remote[i + j] == section[j] for j in range(len(section))): + return list(range(i, i + len(section))) + return [] + + def _get_remote_section( + self, + base_section: List[str], + remote: List[str], + matcher: difflib.SequenceMatcher + ) -> List[str]: + """Get the corresponding remote section.""" + indices = self._find_section_in_remote(base_section, remote, matcher) + if indices: + return [remote[i] for i in indices] + return [] + + def _create_conflict( + self, + local_section: List[str], + remote_section: List[str], + base_section: List[str], + remote_matcher: difflib.SequenceMatcher + ) -> MergeConflict: + """Create a merge conflict object.""" + remote = self._get_remote_section(base_section, remote_section, remote_matcher) + return MergeConflict( + file_path="", + local_lines=local_section, + remote_lines=remote or remote_section, + base_lines=base_section, + ) + + +class UnionMergeStrategy(BaseMergeStrategy): + """Strategy that combines both local and remote, removing duplicates.""" + + def merge( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + local_lines = local_content.splitlines() + remote_lines = remote_content.splitlines() + + seen = set() + merged = [] + for line in local_lines + remote_lines: + if line not in seen: + seen.add(line) + merged.append(line) + + return True, '\n'.join(merged), None + + +class Merger: + """Main merger class that orchestrates merge operations.""" + + STRATEGIES = { + MergeStrategy.KEEP_LOCAL: KeepLocalStrategy(), + MergeStrategy.KEEP_REMOTE: KeepRemoteStrategy(), + MergeStrategy.KEEP_COMMON: KeepCommonStrategy(), + MergeStrategy.THREE_WAY: ThreeWayMergeStrategy(), + MergeStrategy.UNION: UnionMergeStrategy(), + } + + def __init__(self): + self.strategies = self.STRATEGIES.copy() + self.custom_rules: Dict[str, str] = {} + + def register_strategy(self, name: str, strategy: BaseMergeStrategy) -> None: + """Register a custom merge strategy.""" + self.strategies[name] = strategy + + def set_custom_rule(self, file_pattern: str, strategy: str) -> None: + """Set a custom merge rule for a file pattern.""" + self.custom_rules[file_pattern] = strategy + + def get_strategy(self, strategy_name: str) -> BaseMergeStrategy: + """Get a merge strategy by name.""" + if strategy_name in self.strategies: + return self.strategies[strategy_name] + + if strategy_name in [s.value for s in MergeStrategy]: + return self.STRATEGIES[MergeStrategy(strategy_name)] + + return self.strategies[MergeStrategy.KEEP_LOCAL] + + def merge_file( + self, + local_content: str, + remote_content: str, + base_content: Optional[str] = None, + file_path: Optional[str] = None, + strategy_name: Optional[str] = None + ) -> Tuple[bool, str, Optional[MergeConflict]]: + """Merge contents of a single file.""" + if strategy_name: + strategy = self.get_strategy(strategy_name) + else: + if file_path: + strategy_name = self._get_strategy_for_file(file_path) + if not strategy_name: + strategy_name = MergeStrategy.THREE_WAY.value + strategy = self.get_strategy(strategy_name) + + return strategy.merge(local_content, remote_content, base_content, file_path) + + def _get_strategy_for_file(self, file_path: str) -> Optional[str]: + """Get the merge strategy for a specific file.""" + for pattern, strategy in self.custom_rules.items(): + if Path(file_path).match(pattern): + return strategy + return None + + def merge_multiple( + self, + files: List[Dict[str, str]], + base_contents: Optional[Dict[str, str]] = None + ) -> Dict[str, Tuple[bool, str, Optional[MergeConflict]]]: + """Merge multiple files at once.""" + base_contents = base_contents or {} + results = {} + + for file_info in files: + file_path = file_info["path"] + local = file_info.get("local", "") + remote = file_info.get("remote", "") + base = base_contents.get(file_path, "") + strategy = file_info.get("strategy") + + success, merged, conflict = self.merge_file( + local, remote, base, file_path, strategy + ) + results[file_path] = (success, merged, conflict) + + return results + + def resolve_conflict( + self, + conflict: MergeConflict, + resolution: str, + local_content: str, + remote_content: str + ) -> str: + """Resolve a merge conflict.""" + if resolution == "local": + return local_content + elif resolution == "remote": + return remote_content + + conflict_lines = conflict.format_conflict().splitlines() + if resolution == "both": + local_end = conflict_lines.index(conflict.common_marker) + remote_start = conflict_lines.index(conflict.remote_marker) + 1 + + new_lines = conflict_lines[:local_end] + new_lines.extend(conflict.remote_lines) + new_lines.extend(conflict_lines[remote_start:]) + + return '\n'.join(new_lines) + + return local_content