Add core modules: manifest, merger, validator
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-04 20:03:05 +00:00
parent bf9910452e
commit 23ad0afacd

387
confsync/core/merger.py Normal file
View File

@@ -0,0 +1,387 @@
"""Merge strategies for configuration files."""
import difflib
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional, Tuple
class MergeStrategy(Enum):
"""Available merge strategies."""
KEEP_LOCAL = "keep_local"
KEEP_REMOTE = "keep_remote"
KEEP_COMMON = "keep_common"
THREE_WAY = "three_way"
UNION = "union"
CUSTOM = "custom"
@dataclass
class MergeConflict:
"""Represents a merge conflict."""
file_path: str
local_lines: List[str]
remote_lines: List[str]
base_lines: Optional[List[str]]
local_marker: str = "<<<<<<< LOCAL"
common_marker: str = "======="
remote_marker: str = ">>>>>>> REMOTE"
def format_conflict(self) -> str:
"""Format conflict as a string."""
lines = []
lines.append(self.local_marker)
lines.extend(self.local_lines)
lines.append(self.common_marker)
lines.extend(self.remote_lines)
lines.append(self.remote_marker)
return '\n'.join(lines)
class BaseMergeStrategy(ABC):
"""Base class for merge strategies."""
@abstractmethod
def merge(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
"""Merge two contents.
Returns:
Tuple of (success, merged_content, conflict)
"""
pass
class KeepLocalStrategy(BaseMergeStrategy):
"""Strategy that keeps local changes."""
def merge(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
return True, local_content, None
class KeepRemoteStrategy(BaseMergeStrategy):
"""Strategy that keeps remote changes."""
def merge(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
return True, remote_content, None
class KeepCommonStrategy(BaseMergeStrategy):
"""Strategy that keeps common lines between local and remote."""
def merge(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
local_lines = local_content.splitlines()
remote_lines = remote_content.splitlines()
common = set(local_lines) & set(remote_lines)
common_lines = [line for line in local_lines if line in common]
return True, '\n'.join(common_lines), None
class ThreeWayMergeStrategy(BaseMergeStrategy):
"""Standard three-way merge algorithm."""
def merge(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
if not base_content:
base_content = ""
local_lines = local_content.splitlines()
remote_lines = remote_content.splitlines()
base_lines = base_content.splitlines()
merged, conflicts = self._three_way_merge(
local_lines, base_lines, remote_lines
)
if conflicts:
conflict = MergeConflict(
file_path=file_path or "",
local_lines=local_lines,
remote_lines=remote_lines,
base_lines=base_lines,
)
return False, '\n'.join(merged), conflict
return True, '\n'.join(merged), None
def _three_way_merge(
self,
local: List[str],
base: List[str],
remote: List[str]
) -> Tuple[List[str], List[MergeConflict]]:
"""Perform three-way merge using diff3 algorithm."""
merged = []
conflicts = []
matcher = difflib.SequenceMatcher(None, base, local)
remote_matcher = difflib.SequenceMatcher(None, base, remote)
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag == 'equal':
merged.extend(base[i1:i2])
elif tag == 'delete':
if self._has_remote_changes(base[i1:i2], remote, remote_matcher):
conflict = self._create_conflict(
local[i1:i2] if tag == 'insert' else base[i1:i2],
remote,
base[i1:i2],
remote_matcher
)
conflicts.append(conflict)
merged.extend(conflict.format_conflict().splitlines())
else:
merged.extend(base[i1:i2])
elif tag == 'insert':
if not self._has_remote_changes(base[i1:i2], remote, remote_matcher):
merged.extend(local[j1:j2])
else:
conflict = self._create_conflict(
local[j1:j2],
remote,
base[i1:i2],
remote_matcher
)
conflicts.append(conflict)
merged.extend(conflict.format_conflict().splitlines())
elif tag == 'replace':
remote_section = self._get_remote_section(
base[i1:i2], remote, remote_matcher
)
if base[i1:i2] == remote_section:
merged.extend(local[j1:j2])
else:
conflict = self._create_conflict(
local[j1:j2],
remote_section,
base[i1:i2],
remote_matcher
)
conflicts.append(conflict)
merged.extend(conflict.format_conflict().splitlines())
return merged, conflicts
def _has_remote_changes(
self,
base_section: List[str],
remote: List[str],
remote_matcher: difflib.SequenceMatcher
) -> bool:
"""Check if remote has changes from base."""
if not base_section:
return False
indices = self._find_section_in_remote(base_section, remote, remote_matcher)
return len(indices) != len(base_section) or any(
remote[i] != base_section[j]
for i, j in zip(indices, range(len(base_section)))
if i < len(remote) and j < len(base_section)
)
def _find_section_in_remote(
self,
section: List[str],
remote: List[str],
matcher: difflib.SequenceMatcher
) -> List[int]:
"""Find a section in remote content."""
for i in range(len(remote) - len(section) + 1):
if all(remote[i + j] == section[j] for j in range(len(section))):
return list(range(i, i + len(section)))
return []
def _get_remote_section(
self,
base_section: List[str],
remote: List[str],
matcher: difflib.SequenceMatcher
) -> List[str]:
"""Get the corresponding remote section."""
indices = self._find_section_in_remote(base_section, remote, matcher)
if indices:
return [remote[i] for i in indices]
return []
def _create_conflict(
self,
local_section: List[str],
remote_section: List[str],
base_section: List[str],
remote_matcher: difflib.SequenceMatcher
) -> MergeConflict:
"""Create a merge conflict object."""
remote = self._get_remote_section(base_section, remote_section, remote_matcher)
return MergeConflict(
file_path="",
local_lines=local_section,
remote_lines=remote or remote_section,
base_lines=base_section,
)
class UnionMergeStrategy(BaseMergeStrategy):
"""Strategy that combines both local and remote, removing duplicates."""
def merge(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
local_lines = local_content.splitlines()
remote_lines = remote_content.splitlines()
seen = set()
merged = []
for line in local_lines + remote_lines:
if line not in seen:
seen.add(line)
merged.append(line)
return True, '\n'.join(merged), None
class Merger:
"""Main merger class that orchestrates merge operations."""
STRATEGIES = {
MergeStrategy.KEEP_LOCAL: KeepLocalStrategy(),
MergeStrategy.KEEP_REMOTE: KeepRemoteStrategy(),
MergeStrategy.KEEP_COMMON: KeepCommonStrategy(),
MergeStrategy.THREE_WAY: ThreeWayMergeStrategy(),
MergeStrategy.UNION: UnionMergeStrategy(),
}
def __init__(self):
self.strategies = self.STRATEGIES.copy()
self.custom_rules: Dict[str, str] = {}
def register_strategy(self, name: str, strategy: BaseMergeStrategy) -> None:
"""Register a custom merge strategy."""
self.strategies[name] = strategy
def set_custom_rule(self, file_pattern: str, strategy: str) -> None:
"""Set a custom merge rule for a file pattern."""
self.custom_rules[file_pattern] = strategy
def get_strategy(self, strategy_name: str) -> BaseMergeStrategy:
"""Get a merge strategy by name."""
if strategy_name in self.strategies:
return self.strategies[strategy_name]
if strategy_name in [s.value for s in MergeStrategy]:
return self.STRATEGIES[MergeStrategy(strategy_name)]
return self.strategies[MergeStrategy.KEEP_LOCAL]
def merge_file(
self,
local_content: str,
remote_content: str,
base_content: Optional[str] = None,
file_path: Optional[str] = None,
strategy_name: Optional[str] = None
) -> Tuple[bool, str, Optional[MergeConflict]]:
"""Merge contents of a single file."""
if strategy_name:
strategy = self.get_strategy(strategy_name)
else:
if file_path:
strategy_name = self._get_strategy_for_file(file_path)
if not strategy_name:
strategy_name = MergeStrategy.THREE_WAY.value
strategy = self.get_strategy(strategy_name)
return strategy.merge(local_content, remote_content, base_content, file_path)
def _get_strategy_for_file(self, file_path: str) -> Optional[str]:
"""Get the merge strategy for a specific file."""
for pattern, strategy in self.custom_rules.items():
if Path(file_path).match(pattern):
return strategy
return None
def merge_multiple(
self,
files: List[Dict[str, str]],
base_contents: Optional[Dict[str, str]] = None
) -> Dict[str, Tuple[bool, str, Optional[MergeConflict]]]:
"""Merge multiple files at once."""
base_contents = base_contents or {}
results = {}
for file_info in files:
file_path = file_info["path"]
local = file_info.get("local", "")
remote = file_info.get("remote", "")
base = base_contents.get(file_path, "")
strategy = file_info.get("strategy")
success, merged, conflict = self.merge_file(
local, remote, base, file_path, strategy
)
results[file_path] = (success, merged, conflict)
return results
def resolve_conflict(
self,
conflict: MergeConflict,
resolution: str,
local_content: str,
remote_content: str
) -> str:
"""Resolve a merge conflict."""
if resolution == "local":
return local_content
elif resolution == "remote":
return remote_content
conflict_lines = conflict.format_conflict().splitlines()
if resolution == "both":
local_end = conflict_lines.index(conflict.common_marker)
remote_start = conflict_lines.index(conflict.remote_marker) + 1
new_lines = conflict_lines[:local_end]
new_lines.extend(conflict.remote_lines)
new_lines.extend(conflict_lines[remote_start:])
return '\n'.join(new_lines)
return local_content