"""Pattern detection for command sequences.""" from typing import List, Dict, Tuple from collections import defaultdict from datetime import datetime, timedelta from .models import Command, Pattern from .database import Database class PatternDetector: """Detects patterns in command sequences.""" def __init__(self, db: Database, min_frequency: int = 2, window_size: int = 5): self.db = db self.min_frequency = min_frequency self.window_size = window_size def analyze_recent_commands(self, limit: int = 100) -> List[Pattern]: commands = self.db.get_all_commands()[:limit] command_ids = [cmd.id for cmd in commands if cmd.id is not None] if len(command_ids) < self.window_size: return [] patterns = self._find_ngram_patterns(command_ids) return patterns def _find_ngram_patterns(self, command_ids: List[int]) -> List[Pattern]: existing_patterns = self.db.get_patterns(self.min_frequency) existing_hashes = {p.sequence_hash(): p for p in existing_patterns} ngram_counts = defaultdict(int) ngram_commands = defaultdict(list) for n in range(2, self.window_size + 1): for i in range(len(command_ids) - n + 1): ngram = tuple(command_ids[i:i + n]) ngram_counts[ngram] += 1 if len(ngram_commands[ngram]) < 3: ngram_commands[ngram].append(command_ids[i:i + n]) detected_patterns = [] for ngram, count in ngram_counts.items(): if count >= self.min_frequency: sequence_hash = Pattern( command_ids=list(ngram), frequency=count, ).sequence_hash() if sequence_hash in existing_hashes: pattern_ref = existing_hashes[sequence_hash] if pattern_ref.id is not None: self.db.update_pattern_frequency(pattern_ref.id) detected_patterns.append(pattern_ref) else: pattern = Pattern( name=f"Pattern: {' -> '.join(map(str, ngram))}", command_ids=list(ngram), frequency=count, last_seen=datetime.now(), ) pattern_id = self.db.add_pattern(pattern) pattern.id = pattern_id detected_patterns.append(pattern) return detected_patterns def get_detected_patterns(self) -> List[Pattern]: return self.db.get_patterns(self.min_frequency) def suggest_shortcuts(self) -> List[Tuple[Pattern, List[Command]]]: patterns = self.db.get_patterns(self.min_frequency) shortcuts = [] for pattern in patterns: commands = [] for cmd_id in pattern.command_ids: cmd = self.db.get_command(cmd_id) if cmd: commands.append(cmd) if len(commands) >= 2: shortcuts.append((pattern, commands)) return shortcuts def get_workflow_stats(self, days: int = 7) -> Dict[str, object]: commands = self.db.get_all_commands() cutoff = datetime.now() - timedelta(days=days) recent_commands = [c for c in commands if c.created_at >= cutoff] return { "total_commands": len(commands), "recent_commands": len(recent_commands), "unique_commands": len(set(c.command for c in commands)), "patterns_found": len(self.db.get_patterns(self.min_frequency)), }