Add core modules: manifest, merger, validator
Some checks failed
CI / test (push) Failing after 6s
CI / build (push) Has been skipped

This commit is contained in:
2026-02-04 20:03:06 +00:00
parent 23ad0afacd
commit 8c1ac11100

444
confsync/core/validator.py Normal file
View File

@@ -0,0 +1,444 @@
"""Configuration validation for ConfSync."""
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
import toml
import configparser
from confsync.models.config_models import (
ValidationResult,
ValidationIssue,
Severity,
ConfigFile,
)
class Validator:
"""Validates configuration files for errors and conflicts."""
def __init__(self):
self.validation_rules: Dict[str, List[str]] = {
".json": ["valid_json", "no_duplicate_keys"],
".yaml": ["valid_yaml", "no_duplicate_keys"],
".yml": ["valid_yaml", "no_duplicate_keys"],
".toml": ["valid_toml", "no_duplicate_sections"],
".ini": ["valid_ini"],
".cfg": ["valid_ini"],
".sh": ["valid_shell_syntax"],
".bash": ["valid_shell_syntax"],
".zsh": ["valid_shell_syntax"],
".gitconfig": ["valid_git_config"],
".gitignore": ["valid_gitignore"],
"Dockerfile": ["valid_dockerfile"],
}
def validate_file(self, config_file: ConfigFile) -> ValidationResult:
"""Validate a single configuration file."""
result = ValidationResult(is_valid=True, validated_files=1)
suffix = Path(config_file.path).suffix.lower()
for rule in self.validation_rules.get(suffix, []):
validator = getattr(self, f"check_{rule}", None)
if validator:
issue = validator(config_file)
if issue:
result.add_issue(issue)
if not suffix or suffix not in self.validation_rules:
if config_file.content:
issue = ValidationIssue(
rule="unknown_format",
message=f"Unknown configuration format for {config_file.path}",
severity=Severity.INFO,
file_path=config_file.path,
suggestion="Consider converting to a standard format like YAML or JSON",
)
result.add_issue(issue)
return result
def validate_manifest(
self,
configs: List[ConfigFile],
check_conflicts: bool = True
) -> ValidationResult:
"""Validate multiple configuration files."""
result = ValidationResult(is_valid=True)
for config_file in configs:
file_result = self.validate_file(config_file)
result.validated_files += file_result.validated_files
result.issues.extend(file_result.issues)
if check_conflicts:
conflict_issues = self._check_conflicts(configs)
result.issues.extend(conflict_issues)
result.is_valid = all(
issue.severity != Severity.ERROR for issue in result.issues
)
return result
def check_valid_json(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check if JSON content is valid."""
try:
json.loads(config_file.content)
return None
except json.JSONDecodeError as e:
return ValidationIssue(
rule="valid_json",
message=f"Invalid JSON: {str(e)}",
severity=Severity.ERROR,
file_path=config_file.path,
line_number=e.lineno if hasattr(e, 'lineno') else None,
suggestion="Check for missing commas, brackets, or quotes",
)
def check_valid_yaml(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check if YAML content is valid."""
try:
yaml.safe_load(config_file.content)
return None
except yaml.YAMLError as e:
return ValidationIssue(
rule="valid_yaml",
message=f"Invalid YAML: {str(e)}",
severity=Severity.ERROR,
file_path=config_file.path,
suggestion="Check indentation and proper YAML syntax",
)
def check_valid_toml(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check if TOML content is valid."""
try:
toml.loads(config_file.content)
return None
except toml.TomlDecodeError as e:
return ValidationIssue(
rule="valid_toml",
message=f"Invalid TOML: {str(e)}",
severity=Severity.ERROR,
file_path=config_file.path,
suggestion="Check TOML syntax (tables, keys, values)",
)
def check_valid_ini(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check if INI content is valid."""
try:
parser = configparser.ConfigParser()
parser.read_string(config_file.content)
return None
except configparser.Error as e:
return ValidationIssue(
rule="valid_ini",
message=f"Invalid INI: {str(e)}",
severity=Severity.ERROR,
file_path=config_file.path,
suggestion="Check section headers and key-value format",
)
def check_no_duplicate_keys(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check for duplicate keys in JSON or YAML."""
suffix = Path(config_file.path).suffix.lower()
if suffix == ".json":
try:
data = json.loads(config_file.content)
duplicates = self._find_json_duplicates(data)
except json.JSONDecodeError:
return None
elif suffix in (".yaml", ".yml"):
try:
data = yaml.safe_load(config_file.content)
if isinstance(data, dict):
duplicates = self._find_dict_duplicates(data)
else:
return None
except yaml.YAMLError:
return None
else:
return None
if duplicates:
return ValidationIssue(
rule="no_duplicate_keys",
message=f"Duplicate keys found: {', '.join(duplicates)}",
severity=Severity.WARNING,
file_path=config_file.path,
suggestion="Consider consolidating duplicate configurations",
)
return None
def _find_json_duplicates(self, data: Any, path: str = "") -> List[str]:
"""Find duplicate keys in JSON data."""
duplicates = []
if isinstance(data, dict):
seen = {}
for key, value in data.items():
full_path = f"{path}.{key}" if path else key
if isinstance(value, (dict, list)):
sub_dups = self._find_json_duplicates(value, full_path)
duplicates.extend(sub_dups)
if key in seen:
duplicates.append(full_path)
else:
seen[key] = full_path
elif isinstance(data, list):
for i, item in enumerate(data):
sub_dups = self._find_json_duplicates(item, f"{path}[{i}]")
duplicates.extend(sub_dups)
return duplicates
def _find_dict_duplicates(self, data: Dict, prefix: str = "") -> List[str]:
"""Find duplicate keys in nested dictionary."""
duplicates = []
seen = {}
for key, value in data.items():
full_key = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
sub_dups = self._find_dict_duplicates(value, full_key)
duplicates.extend(sub_dups)
if key in seen:
duplicates.append(full_key)
else:
seen[key] = full_key
return duplicates
def check_no_duplicate_sections(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check for duplicate sections in TOML."""
try:
data = toml.loads(config_file.content)
except toml.TomlDecodeError:
return None
if isinstance(data, dict):
sections: Dict[str, List[str]] = {}
for section in data.keys():
if section in sections:
sections[section].append(section)
else:
sections[section] = [section]
duplicates = [s for s, ids in sections.items() if len(ids) > 1]
if duplicates:
return ValidationIssue(
rule="no_duplicate_sections",
message=f"Duplicate sections found: {', '.join(duplicates)}",
severity=Severity.WARNING,
file_path=config_file.path,
suggestion="Remove duplicate sections from TOML file",
)
return None
def check_valid_shell_syntax(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check basic shell script syntax."""
content = config_file.content.strip()
common_issues = []
if content and not content.startswith('#') and not content.startswith('alias '):
pass
unclosed_quotes = self._check_unclosed_quotes(content)
if unclosed_quotes:
common_issues.append(f"Unclosed quotes: {unclosed_quotes}")
unbalanced_brackets = self._check_unbalanced_brackets(content)
if unbalanced_brackets:
common_issues.append(f"Unbalanced brackets: {unbalanced_brackets}")
if common_issues:
return ValidationIssue(
rule="valid_shell_syntax",
message=f"Potential shell syntax issues: {'; '.join(common_issues)}",
severity=Severity.WARNING,
file_path=config_file.path,
suggestion="Review shell script syntax",
)
return None
def _check_unclosed_quotes(self, content: str) -> Optional[str]:
"""Check for unclosed quotes."""
in_single = False
in_double = False
i = 0
while i < len(content):
char = content[i]
if char == "'" and not in_double:
in_single = not in_single
elif char == '"' and not in_single:
in_double = not in_double
i += 1
if in_single:
return "single quote (')"
if in_double:
return 'double quote (")'
return None
def _check_unbalanced_brackets(self, content: str) -> Optional[str]:
"""Check for unbalanced brackets."""
brackets = {'(': ')', '[': ']', '{': '}'}
stack = []
for i, char in enumerate(content):
if char in brackets:
stack.append((char, i))
elif char in brackets.values():
if not stack:
return f"unmatched {char} at position {i}"
opening, _ = stack.pop()
if brackets.get(opening) != char:
return f"mismatched {opening} at position {_} and {char} at position {i}"
if stack:
opening, pos = stack[0]
return f"unclosed {opening} at position {pos}"
return None
def check_valid_git_config(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check if git config is valid."""
try:
parser = configparser.ConfigParser()
parser.read_string(config_file.content)
for section in parser.sections():
if section.startswith('include'):
pass
return None
except configparser.Error as e:
return ValidationIssue(
rule="valid_git_config",
message=f"Invalid git config: {str(e)}",
severity=Severity.ERROR,
file_path=config_file.path,
)
def check_valid_gitignore(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check gitignore for common issues."""
issues = []
lines = config_file.content.split('\n')
for i, line in enumerate(lines, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
if line.startswith('/') and line.count('/') > 1:
issues.append(f"Line {i}: Multiple leading slashes may cause issues")
if '**' in line and line.count('**') != 2:
issues.append(f"Line {i}: Malformed glob pattern with '**'")
if line.endswith('/') and len(line) > 1:
issues.append(f"Line {i}: Trailing slash - directory pattern detected")
if issues:
return ValidationIssue(
rule="valid_gitignore",
message=f"Potential gitignore issues: {'; '.join(issues)}",
severity=Severity.WARNING,
file_path=config_file.path,
suggestion="Review gitignore patterns for correctness",
)
return None
def check_valid_dockerfile(self, config_file: ConfigFile) -> Optional[ValidationIssue]:
"""Check Dockerfile for common issues."""
issues = []
lines = config_file.content.split('\n')
has_from = False
for i, line in enumerate(lines, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
upper_line = line.upper()
if upper_line.startswith('FROM'):
has_from = True
if ':' not in line:
issues.append(f"Line {i}: FROM instruction without tag")
if upper_line.startswith('RUN') and ('apt-get' in line or 'yum' in line):
if '&&' not in line:
issues.append(f"Line {i}: Package manager command without cleanup")
if not has_from:
issues.append("No FROM instruction found")
if issues:
return ValidationIssue(
rule="valid_dockerfile",
message=f"Dockerfile issues: {'; '.join(issues)}",
severity=Severity.WARNING if has_from else Severity.ERROR,
file_path=config_file.path,
)
return None
def _check_conflicts(self, configs: List[ConfigFile]) -> List[ValidationIssue]:
"""Check for conflicts between configurations."""
issues: List[ValidationIssue] = []
tool_groups: Dict[str, List[ConfigFile]] = {}
for config in configs:
tool = config.tool_name.lower()
if tool not in tool_groups:
tool_groups[tool] = []
tool_groups[tool].append(config)
for tool, tool_configs in tool_groups.items():
if len(tool_configs) > 1:
issue = ValidationIssue(
rule="multiple_configs",
message=f"Multiple configurations found for {tool}: {[c.path for c in tool_configs]}",
severity=Severity.INFO,
suggestion="Ensure these are intentional (e.g., different environments)",
)
issues.append(issue)
return issues
def generate_report(self, result: ValidationResult) -> str:
"""Generate a human-readable validation report."""
lines = []
lines.append("=" * 60)
lines.append("Configuration Validation Report")
lines.append("=" * 60)
lines.append(f"Validated Files: {result.validated_files}")
lines.append(f"Overall Status: {'VALID' if result.is_valid else 'ISSUES FOUND'}")
lines.append("-" * 60)
if not result.issues:
lines.append("No issues found.")
return '\n'.join(lines)
for severity in [Severity.ERROR, Severity.WARNING, Severity.INFO]:
severity_issues = [i for i in result.issues if i.severity == severity]
if severity_issues:
lines.append(f"\n{severity.value.upper()}S ({len(severity_issues)}):")
for issue in severity_issues:
lines.append(f" - [{issue.rule}] {issue.message}")
if issue.file_path:
lines.append(f" File: {issue.file_path}")
if issue.suggestion:
lines.append(f" Suggestion: {issue.suggestion}")
lines.append("-" * 60)
lines.append(f"Total Issues: {len(result.issues)}")
lines.append("=" * 60)
return '\n'.join(lines)