diff --git a/src/code_privacy_shield/redactor.py b/src/code_privacy_shield/redactor.py new file mode 100644 index 0000000..9440066 --- /dev/null +++ b/src/code_privacy_shield/redactor.py @@ -0,0 +1,156 @@ +import re +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set, Tuple + +from .patterns import PatternLibrary + + +@dataclass +class RedactionMatch: + start: int + end: int + category: str + name: str + original: str + replacement: str + + +@dataclass +class RedactionResult: + original: str + redacted: str + matches: List[RedactionMatch] = field(default_factory=list) + categories: Set[str] = field(default_factory=set) + + +class Redactor: + DEFAULT_REPLACEMENT = "█" * 8 + + def __init__(self, pattern_library: Optional[PatternLibrary] = None): + self.pattern_library = pattern_library or PatternLibrary() + self.custom_patterns: Dict[str, List[Tuple[re.Pattern, str]]] = {} + + def add_custom_pattern(self, name: str, pattern: str, category: str = "custom") -> bool: + try: + compiled = re.compile(pattern, re.IGNORECASE) + if category not in self.custom_patterns: + self.custom_patterns[category] = [] + self.custom_patterns[category].append((compiled, name)) + return True + except re.error: + return False + + def _find_all_matches(self, content: str) -> List[RedactionMatch]: + matches: List[RedactionMatch] = [] + all_patterns = self.pattern_library.get_all_patterns() + + for category, patterns in all_patterns.items(): + for pattern, name in patterns: + for match in pattern.finditer(content): + original = match.group(0) + replacement = self._generate_replacement(original) + matches.append(RedactionMatch( + start=match.start(), + end=match.end(), + category=category, + name=name, + original=original, + replacement=replacement, + )) + + for category, patterns in self.custom_patterns.items(): + for pattern, name in patterns: + for match in pattern.finditer(content): + original = match.group(0) + replacement = self._generate_replacement(original) + matches.append(RedactionMatch( + start=match.start(), + end=match.end(), + category=category, + name=name, + original=original, + replacement=replacement, + )) + + return matches + + def _sort_matches_by_position(self, matches: List[RedactionMatch]) -> List[RedactionMatch]: + return sorted(matches, key=lambda m: (m.start, m.end)) + + def _remove_overlapping_matches(self, matches: List[RedactionMatch]) -> List[RedactionMatch]: + if not matches: + return matches + + sorted_matches = self._sort_matches_by_position(matches) + non_overlapping: List[RedactionMatch] = [] + current_end = -1 + + for match in sorted_matches: + if match.start >= current_end: + non_overlapping.append(match) + current_end = match.end + + return non_overlapping + + def _generate_replacement(self, original: str) -> str: + length = len(original) + if length <= 4: + return self.DEFAULT_REPLACEMENT + return self.DEFAULT_REPLACEMENT + original[-4:] + + def _apply_redactions(self, content: str, matches: List[RedactionMatch]) -> str: + if not matches: + return content + + sorted_matches = self._sort_matches_by_position(matches) + non_overlapping = self._remove_overlapping_matches(sorted_matches) + + result_parts = [] + last_pos = 0 + + for match in non_overlapping: + result_parts.append(content[last_pos:match.start]) + result_parts.append(match.replacement) + last_pos = match.end + + result_parts.append(content[last_pos:]) + return "".join(result_parts) + + def redact(self, content: str, preserve_structure: bool = True) -> RedactionResult: + matches = self._find_all_matches(content) + redacted = self._apply_redactions(content, matches) + categories = set(m.category for m in matches) + + return RedactionResult( + original=content, + redacted=redacted, + matches=matches, + categories=categories, + ) + + def redact_file(self, file_path: str, encoding: str = "utf-8") -> RedactionResult: + with open(file_path, "r", encoding=encoding) as f: + content = f.read() + + result = self.redact(content) + + return result + + def redact_multiple( + self, + contents: Dict[str, str], + preserve_structure: bool = True + ) -> Dict[str, RedactionResult]: + return { + path: self.redact(content) for path, content in contents.items() + } + + def get_match_summary(self, result: RedactionResult) -> Dict[str, int]: + summary: Dict[str, int] = {} + for match in result.matches: + key = f"{match.category}:{match.name}" + summary[key] = summary.get(key, 0) + 1 + return summary + + def filter_matches_by_category(self, result: RedactionResult, categories: Set[str]) -> List[RedactionMatch]: + return [m for m in result.matches if m.category in categories]