import logging from typing import Optional, List, Dict, Any from collections import Counter from datetime import datetime from .config import Config from .models import Command, Pattern, Project from .database import Database logger = logging.getLogger(__name__) class PatternDetector: def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None): self.config = config or Config() self.db = db or Database() def detect_patterns( self, commands: List[Command], project_id: Optional[int] = None ) -> List[Pattern]: min_len = self.config.get("patterns.min_sequence_length", 3) min_occ = self.config.get("patterns.min_occurrences", 2) max_len = self.config.get("patterns.max_pattern_length", 10) sorted_commands = sorted(commands, key=lambda c: c.timestamp) sequences = self._extract_sequences(sorted_commands, min_len, max_len) pattern_counts = Counter(tuple(seq) for seq_list in sequences for seq in seq_list) patterns = [] for seq, count in pattern_counts.items(): if count >= min_occ: confidence = min(count / 5.0, 1.0) pattern = Pattern( project_id=project_id, name=self._generate_pattern_name(seq), command_sequence=list(seq), occurrences=count, confidence=confidence, created_at=datetime.utcnow(), ) patterns.append(pattern) return patterns def _extract_sequences( self, commands: List[Command], min_len: int, max_len: int ) -> List[List[str]]: sequences = [] cmd_strings = [c.command for c in commands] for length in range(min_len, max_len + 1): for i in range(len(cmd_strings) - length + 1): seq = cmd_strings[i : i + length] sequences.append([seq]) return sequences def _generate_pattern_name(self, sequence: tuple) -> str: first_cmd = sequence[0].split()[0] if sequence else "pattern" return f"{first_cmd}-sequence-{len(sequence)}" def find_similar_patterns( self, pattern: Pattern, patterns: List[Pattern] ) -> List[Pattern]: similar = [] threshold = self.config.get("patterns.similarity_threshold", 0.8) for other in patterns: if other.id == pattern.id: continue similarity = self._calculate_similarity(pattern, other) if similarity >= threshold: similar.append(other) return similar def _calculate_similarity(self, p1: Pattern, p2: Pattern) -> float: if not p1.command_sequence or not p2.command_sequence: return 0.0 set1 = set(p1.command_sequence) set2 = set(p2.command_sequence) intersection = len(set1 & set2) union = len(set1 | set2) if union == 0: return 0.0 return intersection / union def analyze_workflow_patterns( self, project_id: Optional[int] = None ) -> Dict[str, Any]: commands = self.db.get_commands(project_id=project_id, limit=10000) workflows = self.db.get_all_workflows(project_id) patterns = self.detect_patterns(commands, project_id) return { "total_patterns": len(patterns), "high_confidence_patterns": sum( 1 for p in patterns if p.confidence >= 0.7 ), "patterns": [p.to_dict() for p in patterns], "workflow_usage": sum(w.usage_count for w in workflows), }