Files
cli-command-memory/.cli_memory/history.py
2026-01-31 08:27:31 +00:00

82 lines
2.5 KiB
Python

import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from .config import Config
from .models import Command, Project, Workflow
from .database import Database
logger = logging.getLogger(__name__)
class HistoryManager:
def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None):
self.config = config or Config()
self.db = db or Database()
def get_recent_commands(
self, limit: int = 20, project_id: Optional[int] = None
) -> List[Command]:
return self.db.get_commands(project_id=project_id, limit=limit)
def get_commands_by_time_range(
self,
start_time: datetime,
end_time: datetime,
project_id: Optional[int] = None,
) -> List[Command]:
return self.db.search_commands(
"",
project_id=project_id,
start_time=start_time,
end_time=end_time,
limit=1000,
)
def get_statistics(self) -> Dict[str, Any]:
projects = self.db.get_all_projects()
workflows = self.db.get_all_workflows()
commands = self.db.get_commands(limit=10000)
return {
"total_projects": len(projects),
"total_workflows": len(workflows),
"total_commands": len(commands),
"recent_commands_24h": len(
[c for c in commands if c.timestamp >= datetime.utcnow() - timedelta(days=1)]
),
}
def clear_history(
self, project_id: Optional[int] = None, before_date: Optional[datetime] = None
) -> int:
if project_id:
self.db.delete_project(project_id)
return 1
logger.warning("Clearing all history is not implemented in demo")
return 0
def export_history(self, format: str = "json") -> str:
projects = self.db.get_all_projects()
workflows = self.db.get_all_workflows()
commands = self.db.get_commands(limit=10000)
data = {
"exported_at": datetime.utcnow().isoformat(),
"projects": [p.to_dict() for p in projects],
"workflows": [w.to_dict() for w in workflows],
"commands": [c.to_dict() for c in commands],
}
if format == "json":
import json
return json.dumps(data, indent=2, default=str)
elif format == "yaml":
import yaml
return yaml.dump(data, default_flow_style=False)
return str(data)
def close(self) -> None:
self.db.close()