From 8c1ac1110034d1c29eba6c8a09acc7d66dc3deb2 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Wed, 4 Feb 2026 20:03:06 +0000 Subject: [PATCH] Add core modules: manifest, merger, validator --- confsync/core/validator.py | 444 +++++++++++++++++++++++++++++++++++++ 1 file changed, 444 insertions(+) create mode 100644 confsync/core/validator.py diff --git a/confsync/core/validator.py b/confsync/core/validator.py new file mode 100644 index 0000000..4dd15fd --- /dev/null +++ b/confsync/core/validator.py @@ -0,0 +1,444 @@ +"""Configuration validation for ConfSync.""" + +import json +from pathlib import Path +from typing import Any, Dict, List, Optional +import yaml +import toml +import configparser + +from confsync.models.config_models import ( + ValidationResult, + ValidationIssue, + Severity, + ConfigFile, +) + + +class Validator: + """Validates configuration files for errors and conflicts.""" + + def __init__(self): + self.validation_rules: Dict[str, List[str]] = { + ".json": ["valid_json", "no_duplicate_keys"], + ".yaml": ["valid_yaml", "no_duplicate_keys"], + ".yml": ["valid_yaml", "no_duplicate_keys"], + ".toml": ["valid_toml", "no_duplicate_sections"], + ".ini": ["valid_ini"], + ".cfg": ["valid_ini"], + ".sh": ["valid_shell_syntax"], + ".bash": ["valid_shell_syntax"], + ".zsh": ["valid_shell_syntax"], + ".gitconfig": ["valid_git_config"], + ".gitignore": ["valid_gitignore"], + "Dockerfile": ["valid_dockerfile"], + } + + def validate_file(self, config_file: ConfigFile) -> ValidationResult: + """Validate a single configuration file.""" + result = ValidationResult(is_valid=True, validated_files=1) + + suffix = Path(config_file.path).suffix.lower() + + for rule in self.validation_rules.get(suffix, []): + validator = getattr(self, f"check_{rule}", None) + if validator: + issue = validator(config_file) + if issue: + result.add_issue(issue) + + if not suffix or suffix not in self.validation_rules: + if config_file.content: + issue = ValidationIssue( + rule="unknown_format", + message=f"Unknown configuration format for {config_file.path}", + severity=Severity.INFO, + file_path=config_file.path, + suggestion="Consider converting to a standard format like YAML or JSON", + ) + result.add_issue(issue) + + return result + + def validate_manifest( + self, + configs: List[ConfigFile], + check_conflicts: bool = True + ) -> ValidationResult: + """Validate multiple configuration files.""" + result = ValidationResult(is_valid=True) + + for config_file in configs: + file_result = self.validate_file(config_file) + result.validated_files += file_result.validated_files + result.issues.extend(file_result.issues) + + if check_conflicts: + conflict_issues = self._check_conflicts(configs) + result.issues.extend(conflict_issues) + + result.is_valid = all( + issue.severity != Severity.ERROR for issue in result.issues + ) + + return result + + def check_valid_json(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check if JSON content is valid.""" + try: + json.loads(config_file.content) + return None + except json.JSONDecodeError as e: + return ValidationIssue( + rule="valid_json", + message=f"Invalid JSON: {str(e)}", + severity=Severity.ERROR, + file_path=config_file.path, + line_number=e.lineno if hasattr(e, 'lineno') else None, + suggestion="Check for missing commas, brackets, or quotes", + ) + + def check_valid_yaml(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check if YAML content is valid.""" + try: + yaml.safe_load(config_file.content) + return None + except yaml.YAMLError as e: + return ValidationIssue( + rule="valid_yaml", + message=f"Invalid YAML: {str(e)}", + severity=Severity.ERROR, + file_path=config_file.path, + suggestion="Check indentation and proper YAML syntax", + ) + + def check_valid_toml(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check if TOML content is valid.""" + try: + toml.loads(config_file.content) + return None + except toml.TomlDecodeError as e: + return ValidationIssue( + rule="valid_toml", + message=f"Invalid TOML: {str(e)}", + severity=Severity.ERROR, + file_path=config_file.path, + suggestion="Check TOML syntax (tables, keys, values)", + ) + + def check_valid_ini(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check if INI content is valid.""" + try: + parser = configparser.ConfigParser() + parser.read_string(config_file.content) + return None + except configparser.Error as e: + return ValidationIssue( + rule="valid_ini", + message=f"Invalid INI: {str(e)}", + severity=Severity.ERROR, + file_path=config_file.path, + suggestion="Check section headers and key-value format", + ) + + def check_no_duplicate_keys(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check for duplicate keys in JSON or YAML.""" + suffix = Path(config_file.path).suffix.lower() + + if suffix == ".json": + try: + data = json.loads(config_file.content) + duplicates = self._find_json_duplicates(data) + except json.JSONDecodeError: + return None + elif suffix in (".yaml", ".yml"): + try: + data = yaml.safe_load(config_file.content) + if isinstance(data, dict): + duplicates = self._find_dict_duplicates(data) + else: + return None + except yaml.YAMLError: + return None + else: + return None + + if duplicates: + return ValidationIssue( + rule="no_duplicate_keys", + message=f"Duplicate keys found: {', '.join(duplicates)}", + severity=Severity.WARNING, + file_path=config_file.path, + suggestion="Consider consolidating duplicate configurations", + ) + + return None + + def _find_json_duplicates(self, data: Any, path: str = "") -> List[str]: + """Find duplicate keys in JSON data.""" + duplicates = [] + if isinstance(data, dict): + seen = {} + for key, value in data.items(): + full_path = f"{path}.{key}" if path else key + if isinstance(value, (dict, list)): + sub_dups = self._find_json_duplicates(value, full_path) + duplicates.extend(sub_dups) + if key in seen: + duplicates.append(full_path) + else: + seen[key] = full_path + elif isinstance(data, list): + for i, item in enumerate(data): + sub_dups = self._find_json_duplicates(item, f"{path}[{i}]") + duplicates.extend(sub_dups) + return duplicates + + def _find_dict_duplicates(self, data: Dict, prefix: str = "") -> List[str]: + """Find duplicate keys in nested dictionary.""" + duplicates = [] + seen = {} + for key, value in data.items(): + full_key = f"{prefix}.{key}" if prefix else key + if isinstance(value, dict): + sub_dups = self._find_dict_duplicates(value, full_key) + duplicates.extend(sub_dups) + if key in seen: + duplicates.append(full_key) + else: + seen[key] = full_key + return duplicates + + def check_no_duplicate_sections(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check for duplicate sections in TOML.""" + try: + data = toml.loads(config_file.content) + except toml.TomlDecodeError: + return None + + if isinstance(data, dict): + sections: Dict[str, List[str]] = {} + for section in data.keys(): + if section in sections: + sections[section].append(section) + else: + sections[section] = [section] + + duplicates = [s for s, ids in sections.items() if len(ids) > 1] + if duplicates: + return ValidationIssue( + rule="no_duplicate_sections", + message=f"Duplicate sections found: {', '.join(duplicates)}", + severity=Severity.WARNING, + file_path=config_file.path, + suggestion="Remove duplicate sections from TOML file", + ) + + return None + + def check_valid_shell_syntax(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check basic shell script syntax.""" + content = config_file.content.strip() + + common_issues = [] + + if content and not content.startswith('#') and not content.startswith('alias '): + pass + + unclosed_quotes = self._check_unclosed_quotes(content) + if unclosed_quotes: + common_issues.append(f"Unclosed quotes: {unclosed_quotes}") + + unbalanced_brackets = self._check_unbalanced_brackets(content) + if unbalanced_brackets: + common_issues.append(f"Unbalanced brackets: {unbalanced_brackets}") + + if common_issues: + return ValidationIssue( + rule="valid_shell_syntax", + message=f"Potential shell syntax issues: {'; '.join(common_issues)}", + severity=Severity.WARNING, + file_path=config_file.path, + suggestion="Review shell script syntax", + ) + + return None + + def _check_unclosed_quotes(self, content: str) -> Optional[str]: + """Check for unclosed quotes.""" + in_single = False + in_double = False + + i = 0 + while i < len(content): + char = content[i] + if char == "'" and not in_double: + in_single = not in_single + elif char == '"' and not in_single: + in_double = not in_double + i += 1 + + if in_single: + return "single quote (')" + if in_double: + return 'double quote (")' + return None + + def _check_unbalanced_brackets(self, content: str) -> Optional[str]: + """Check for unbalanced brackets.""" + brackets = {'(': ')', '[': ']', '{': '}'} + stack = [] + + for i, char in enumerate(content): + if char in brackets: + stack.append((char, i)) + elif char in brackets.values(): + if not stack: + return f"unmatched {char} at position {i}" + opening, _ = stack.pop() + if brackets.get(opening) != char: + return f"mismatched {opening} at position {_} and {char} at position {i}" + + if stack: + opening, pos = stack[0] + return f"unclosed {opening} at position {pos}" + + return None + + def check_valid_git_config(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check if git config is valid.""" + try: + parser = configparser.ConfigParser() + parser.read_string(config_file.content) + + for section in parser.sections(): + if section.startswith('include'): + pass + + return None + except configparser.Error as e: + return ValidationIssue( + rule="valid_git_config", + message=f"Invalid git config: {str(e)}", + severity=Severity.ERROR, + file_path=config_file.path, + ) + + def check_valid_gitignore(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check gitignore for common issues.""" + issues = [] + lines = config_file.content.split('\n') + + for i, line in enumerate(lines, 1): + line = line.strip() + if not line or line.startswith('#'): + continue + + if line.startswith('/') and line.count('/') > 1: + issues.append(f"Line {i}: Multiple leading slashes may cause issues") + + if '**' in line and line.count('**') != 2: + issues.append(f"Line {i}: Malformed glob pattern with '**'") + + if line.endswith('/') and len(line) > 1: + issues.append(f"Line {i}: Trailing slash - directory pattern detected") + + if issues: + return ValidationIssue( + rule="valid_gitignore", + message=f"Potential gitignore issues: {'; '.join(issues)}", + severity=Severity.WARNING, + file_path=config_file.path, + suggestion="Review gitignore patterns for correctness", + ) + + return None + + def check_valid_dockerfile(self, config_file: ConfigFile) -> Optional[ValidationIssue]: + """Check Dockerfile for common issues.""" + issues = [] + lines = config_file.content.split('\n') + + has_from = False + for i, line in enumerate(lines, 1): + line = line.strip() + if not line or line.startswith('#'): + continue + + upper_line = line.upper() + + if upper_line.startswith('FROM'): + has_from = True + if ':' not in line: + issues.append(f"Line {i}: FROM instruction without tag") + + if upper_line.startswith('RUN') and ('apt-get' in line or 'yum' in line): + if '&&' not in line: + issues.append(f"Line {i}: Package manager command without cleanup") + + if not has_from: + issues.append("No FROM instruction found") + + if issues: + return ValidationIssue( + rule="valid_dockerfile", + message=f"Dockerfile issues: {'; '.join(issues)}", + severity=Severity.WARNING if has_from else Severity.ERROR, + file_path=config_file.path, + ) + + return None + + def _check_conflicts(self, configs: List[ConfigFile]) -> List[ValidationIssue]: + """Check for conflicts between configurations.""" + issues: List[ValidationIssue] = [] + tool_groups: Dict[str, List[ConfigFile]] = {} + + for config in configs: + tool = config.tool_name.lower() + if tool not in tool_groups: + tool_groups[tool] = [] + tool_groups[tool].append(config) + + for tool, tool_configs in tool_groups.items(): + if len(tool_configs) > 1: + issue = ValidationIssue( + rule="multiple_configs", + message=f"Multiple configurations found for {tool}: {[c.path for c in tool_configs]}", + severity=Severity.INFO, + suggestion="Ensure these are intentional (e.g., different environments)", + ) + issues.append(issue) + + return issues + + def generate_report(self, result: ValidationResult) -> str: + """Generate a human-readable validation report.""" + lines = [] + lines.append("=" * 60) + lines.append("Configuration Validation Report") + lines.append("=" * 60) + lines.append(f"Validated Files: {result.validated_files}") + lines.append(f"Overall Status: {'VALID' if result.is_valid else 'ISSUES FOUND'}") + lines.append("-" * 60) + + if not result.issues: + lines.append("No issues found.") + return '\n'.join(lines) + + for severity in [Severity.ERROR, Severity.WARNING, Severity.INFO]: + severity_issues = [i for i in result.issues if i.severity == severity] + if severity_issues: + lines.append(f"\n{severity.value.upper()}S ({len(severity_issues)}):") + for issue in severity_issues: + lines.append(f" - [{issue.rule}] {issue.message}") + if issue.file_path: + lines.append(f" File: {issue.file_path}") + if issue.suggestion: + lines.append(f" Suggestion: {issue.suggestion}") + + lines.append("-" * 60) + lines.append(f"Total Issues: {len(result.issues)}") + lines.append("=" * 60) + + return '\n'.join(lines)