Files
cli-command-memory/cli_memory/patterns.py
7000pctAUTO e89bf6f8c8
Some checks failed
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (push) Has been cancelled
feat: add recorder, project detector, and pattern detection
2026-01-31 10:24:45 +00:00

108 lines
3.6 KiB
Python

import logging
from typing import Optional, List, Dict, Any
from collections import Counter
from datetime import datetime
from .config import Config
from .models import Command, Pattern, Project
from .database import Database
logger = logging.getLogger(__name__)
class PatternDetector:
def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None):
self.config = config or Config()
self.db = db or Database()
def detect_patterns(
self, commands: List[Command], project_id: Optional[int] = None
) -> List[Pattern]:
min_len = self.config.get("patterns.min_sequence_length", 3)
min_occ = self.config.get("patterns.min_occurrences", 2)
max_len = self.config.get("patterns.max_pattern_length", 10)
sorted_commands = sorted(commands, key=lambda c: c.timestamp)
sequences = self._extract_sequences(sorted_commands, min_len, max_len)
pattern_counts = Counter(tuple(seq) for seq_list in sequences for seq in seq_list)
patterns = []
for seq, count in pattern_counts.items():
if count >= min_occ:
confidence = min(count / 5.0, 1.0)
pattern = Pattern(
project_id=project_id,
name=self._generate_pattern_name(seq),
command_sequence=list(seq),
occurrences=count,
confidence=confidence,
created_at=datetime.utcnow(),
)
patterns.append(pattern)
return patterns
def _extract_sequences(
self, commands: List[Command], min_len: int, max_len: int
) -> List[List[str]]:
sequences = []
cmd_strings = [c.command for c in commands]
for length in range(min_len, max_len + 1):
for i in range(len(cmd_strings) - length + 1):
seq = cmd_strings[i : i + length]
sequences.append([seq])
return sequences
def _generate_pattern_name(self, sequence: tuple) -> str:
first_cmd = sequence[0].split()[0] if sequence else "pattern"
return f"{first_cmd}-sequence-{len(sequence)}"
def find_similar_patterns(
self, pattern: Pattern, patterns: List[Pattern]
) -> List[Pattern]:
similar = []
threshold = self.config.get("patterns.similarity_threshold", 0.8)
for other in patterns:
if other.id == pattern.id:
continue
similarity = self._calculate_similarity(pattern, other)
if similarity >= threshold:
similar.append(other)
return similar
def _calculate_similarity(self, p1: Pattern, p2: Pattern) -> float:
if not p1.command_sequence or not p2.command_sequence:
return 0.0
set1 = set(p1.command_sequence)
set2 = set(p2.command_sequence)
intersection = len(set1 & set2)
union = len(set1 | set2)
if union == 0:
return 0.0
return intersection / union
def analyze_workflow_patterns(
self, project_id: Optional[int] = None
) -> Dict[str, Any]:
commands = self.db.get_commands(project_id=project_id, limit=10000)
workflows = self.db.get_all_workflows(project_id)
patterns = self.detect_patterns(commands, project_id)
return {
"total_patterns": len(patterns),
"high_confidence_patterns": sum(
1 for p in patterns if p.confidence >= 0.7
),
"patterns": [p.to_dict() for p in patterns],
"workflow_usage": sum(w.usage_count for w in workflows),
}