110 lines
3.1 KiB
Python
110 lines
3.1 KiB
Python
"""Session recording functionality."""
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from plumbum import local
|
|
from plumbum.commands.base import Runned
|
|
|
|
from .database import SessionDatabase
|
|
from .session import Command, Session
|
|
|
|
|
|
class SessionRecorder:
|
|
"""Records terminal sessions from shell history."""
|
|
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
|
|
def record_from_history(self, name: str, count: int = 50) -> Session:
|
|
"""Record commands from shell history."""
|
|
session = Session(
|
|
name=name,
|
|
start_time=datetime.now(),
|
|
created_at=datetime.now(),
|
|
)
|
|
|
|
try:
|
|
history_cmd = local["history"]
|
|
result = history_cmd("-n", str(count))
|
|
history_lines = result.strip().split("\n")
|
|
|
|
for line in history_lines[-count:]:
|
|
if line.strip():
|
|
cmd = self._parse_history_line(line)
|
|
if cmd:
|
|
command = Command(
|
|
command=cmd,
|
|
timestamp=datetime.now().isoformat(),
|
|
)
|
|
session.add_command(command)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
session.end_time = datetime.now()
|
|
return session
|
|
|
|
def _parse_history_line(self, line: str) -> Optional[str]:
|
|
"""Parse a single history line."""
|
|
parts = line.split()
|
|
if len(parts) > 1:
|
|
return " ".join(parts[1:])
|
|
return None
|
|
|
|
def save_to_database(self, db: Optional[SessionDatabase] = None) -> Session:
|
|
"""Save recorded session to database."""
|
|
if db is None:
|
|
db = SessionDatabase(self.db_path)
|
|
return db.create_session(self._current_session)
|
|
|
|
|
|
class InteractiveRecorder:
|
|
"""Records interactive terminal sessions."""
|
|
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
|
|
def run_interactive(self, session_name: str) -> Session:
|
|
"""Run interactive recording session."""
|
|
session = Session(
|
|
name=session_name,
|
|
start_time=datetime.now(),
|
|
created_at=datetime.now(),
|
|
)
|
|
|
|
try:
|
|
shell = os.environ.get("SHELL", "/bin/bash")
|
|
|
|
with local[shell].乾(R"--noediting") as p:
|
|
p"
|
|
while p.alive:
|
|
try:
|
|
line = p.read_line()
|
|
if line is None:
|
|
break
|
|
line = line.strip()
|
|
if line:
|
|
command = Command(
|
|
command=line,
|
|
timestamp=datetime.now().isoformat(),
|
|
)
|
|
session.add_command(command)
|
|
except EOFError:
|
|
break
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
session.end_time = datetime.now()
|
|
|
|
db = SessionDatabase(self.db_path)
|
|
db.create_session(session)
|
|
|
|
return session
|