This commit is contained in:
156
src/code_privacy_shield/redactor.py
Normal file
156
src/code_privacy_shield/redactor.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
|
||||
from .patterns import PatternLibrary
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedactionMatch:
|
||||
start: int
|
||||
end: int
|
||||
category: str
|
||||
name: str
|
||||
original: str
|
||||
replacement: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedactionResult:
|
||||
original: str
|
||||
redacted: str
|
||||
matches: List[RedactionMatch] = field(default_factory=list)
|
||||
categories: Set[str] = field(default_factory=set)
|
||||
|
||||
|
||||
class Redactor:
|
||||
DEFAULT_REPLACEMENT = "█" * 8
|
||||
|
||||
def __init__(self, pattern_library: Optional[PatternLibrary] = None):
|
||||
self.pattern_library = pattern_library or PatternLibrary()
|
||||
self.custom_patterns: Dict[str, List[Tuple[re.Pattern, str]]] = {}
|
||||
|
||||
def add_custom_pattern(self, name: str, pattern: str, category: str = "custom") -> bool:
|
||||
try:
|
||||
compiled = re.compile(pattern, re.IGNORECASE)
|
||||
if category not in self.custom_patterns:
|
||||
self.custom_patterns[category] = []
|
||||
self.custom_patterns[category].append((compiled, name))
|
||||
return True
|
||||
except re.error:
|
||||
return False
|
||||
|
||||
def _find_all_matches(self, content: str) -> List[RedactionMatch]:
|
||||
matches: List[RedactionMatch] = []
|
||||
all_patterns = self.pattern_library.get_all_patterns()
|
||||
|
||||
for category, patterns in all_patterns.items():
|
||||
for pattern, name in patterns:
|
||||
for match in pattern.finditer(content):
|
||||
original = match.group(0)
|
||||
replacement = self._generate_replacement(original)
|
||||
matches.append(RedactionMatch(
|
||||
start=match.start(),
|
||||
end=match.end(),
|
||||
category=category,
|
||||
name=name,
|
||||
original=original,
|
||||
replacement=replacement,
|
||||
))
|
||||
|
||||
for category, patterns in self.custom_patterns.items():
|
||||
for pattern, name in patterns:
|
||||
for match in pattern.finditer(content):
|
||||
original = match.group(0)
|
||||
replacement = self._generate_replacement(original)
|
||||
matches.append(RedactionMatch(
|
||||
start=match.start(),
|
||||
end=match.end(),
|
||||
category=category,
|
||||
name=name,
|
||||
original=original,
|
||||
replacement=replacement,
|
||||
))
|
||||
|
||||
return matches
|
||||
|
||||
def _sort_matches_by_position(self, matches: List[RedactionMatch]) -> List[RedactionMatch]:
|
||||
return sorted(matches, key=lambda m: (m.start, m.end))
|
||||
|
||||
def _remove_overlapping_matches(self, matches: List[RedactionMatch]) -> List[RedactionMatch]:
|
||||
if not matches:
|
||||
return matches
|
||||
|
||||
sorted_matches = self._sort_matches_by_position(matches)
|
||||
non_overlapping: List[RedactionMatch] = []
|
||||
current_end = -1
|
||||
|
||||
for match in sorted_matches:
|
||||
if match.start >= current_end:
|
||||
non_overlapping.append(match)
|
||||
current_end = match.end
|
||||
|
||||
return non_overlapping
|
||||
|
||||
def _generate_replacement(self, original: str) -> str:
|
||||
length = len(original)
|
||||
if length <= 4:
|
||||
return self.DEFAULT_REPLACEMENT
|
||||
return self.DEFAULT_REPLACEMENT + original[-4:]
|
||||
|
||||
def _apply_redactions(self, content: str, matches: List[RedactionMatch]) -> str:
|
||||
if not matches:
|
||||
return content
|
||||
|
||||
sorted_matches = self._sort_matches_by_position(matches)
|
||||
non_overlapping = self._remove_overlapping_matches(sorted_matches)
|
||||
|
||||
result_parts = []
|
||||
last_pos = 0
|
||||
|
||||
for match in non_overlapping:
|
||||
result_parts.append(content[last_pos:match.start])
|
||||
result_parts.append(match.replacement)
|
||||
last_pos = match.end
|
||||
|
||||
result_parts.append(content[last_pos:])
|
||||
return "".join(result_parts)
|
||||
|
||||
def redact(self, content: str, preserve_structure: bool = True) -> RedactionResult:
|
||||
matches = self._find_all_matches(content)
|
||||
redacted = self._apply_redactions(content, matches)
|
||||
categories = set(m.category for m in matches)
|
||||
|
||||
return RedactionResult(
|
||||
original=content,
|
||||
redacted=redacted,
|
||||
matches=matches,
|
||||
categories=categories,
|
||||
)
|
||||
|
||||
def redact_file(self, file_path: str, encoding: str = "utf-8") -> RedactionResult:
|
||||
with open(file_path, "r", encoding=encoding) as f:
|
||||
content = f.read()
|
||||
|
||||
result = self.redact(content)
|
||||
|
||||
return result
|
||||
|
||||
def redact_multiple(
|
||||
self,
|
||||
contents: Dict[str, str],
|
||||
preserve_structure: bool = True
|
||||
) -> Dict[str, RedactionResult]:
|
||||
return {
|
||||
path: self.redact(content) for path, content in contents.items()
|
||||
}
|
||||
|
||||
def get_match_summary(self, result: RedactionResult) -> Dict[str, int]:
|
||||
summary: Dict[str, int] = {}
|
||||
for match in result.matches:
|
||||
key = f"{match.category}:{match.name}"
|
||||
summary[key] = summary.get(key, 0) + 1
|
||||
return summary
|
||||
|
||||
def filter_matches_by_category(self, result: RedactionResult, categories: Set[str]) -> List[RedactionMatch]:
|
||||
return [m for m in result.matches if m.category in categories]
|
||||
Reference in New Issue
Block a user