Add storage models and monitor modules
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-01-30 09:10:47 +00:00
parent 8627defa85
commit cb87ca93ec

132
src/monitor/commands.py Normal file
View File

@@ -0,0 +1,132 @@
"""Command capture functionality for shell history."""
import os
from pathlib import Path
from typing import Optional, List
from ..storage import Database
class CommandCapture:
"""Captures shell commands using shell integration."""
def __init__(self, db: Database, session_id: int):
self.db = db
self.session_id = session_id
self._history_file: Optional[Path] = None
self._last_position = 0
def _get_shell_history_file(self) -> Optional[Path]:
"""Get the shell history file path."""
shell = os.environ.get("SHELL", "")
if "bash" in shell:
return Path(os.environ.get("HISTFILE", Path.home() / ".bash_history"))
elif "zsh" in shell:
return Path(os.environ.get("HISTFILE", Path.home() / ".zsh_history"))
elif "fish" in shell:
return Path(os.environ.get("HISTFILE", Path.home() / ".config/fish/fish_history"))
return None
def _parse_bash_history(self, lines: List[str], start_pos: int) -> List[tuple]:
"""Parse bash history format."""
commands = []
for i, line in enumerate(lines[start_pos:], start=start_pos):
line = line.strip()
if line and not line.startswith("#"):
commands.append((i, line))
return commands
def _parse_zsh_history(self, lines: List[str], start_pos: int) -> List[tuple]:
"""Parse zsh history format."""
commands = []
for i, line in enumerate(lines[start_pos:], start=start_pos):
if ": " in line:
parts = line.split(": ", 1)
if len(parts) == 2:
commands.append((i, parts[1]))
elif line.strip():
commands.append((i, line))
return commands
def capture_new_commands(self) -> List[int]:
"""Capture new commands from shell history."""
history_file = self._get_shell_history_file()
if history_file is None or not history_file.exists():
return []
shell = os.environ.get("SHELL", "")
try:
with open(history_file, "r") as f:
lines = f.readlines()
except (IOError, OSError):
return []
if "bash" in shell:
commands = self._parse_bash_history(lines, self._last_position)
elif "zsh" in shell:
commands = self._parse_zsh_history(lines, self._last_position)
else:
commands = [(i, line.strip()) for i, line in enumerate(lines[self._last_position:]) if line.strip()]
event_ids = []
for pos, command in commands:
if command and not command.startswith("#"):
event_id = self.db.add_command_event(
session_id=self.session_id,
command=command,
exit_code=None,
working_directory=os.getcwd(),
details=None
)
event_ids.append(event_id)
self._last_position = len(lines)
return event_ids
def capture_command(
self,
command: str,
exit_code: Optional[int] = None
) -> int:
"""Capture a command directly."""
return self.db.add_command_event(
session_id=self.session_id,
command=command,
exit_code=exit_code,
working_directory=os.getcwd(),
details=None
)
def get_shell_integration_script(self, shell_type: str = "bash") -> str:
"""Get shell integration script for command capture."""
if shell_type == "bash":
return '''# Add to ~/.bashrc for DevTrace integration
export DEVTRACE_SESSION_ID=$(cat ~/.devtrace_session 2>/dev/null)
if [ -n "$DEVTRACE_SESSION_ID" ]; then
export PROMPT_COMMAND='history -a; devtrace-cmd "$CMD"'
fi
devtrace-cmd() {
if [ -n "$DEVTRACE_SESSION_ID" ] && [ -n "$1" ]; then
python -c "from devtrace.storage import Database, Path; db = Database(Path('~/.devtrace/devtrace.db')); db.add_command_event($DEVTRACE_SESSION_ID, '$1', None, '$(pwd)')"
fi
}'''
elif shell_type == "zsh":
return '''# Add to ~/.zshrc for DevTrace integration
export DEVTRACE_SESSION_ID=$(cat ~/.devtrace_session 2>/dev/null)
if [ -n "$DEVTRACE_SESSION_ID" ]; then
autoload -U add-zsh-hook
add-zsh-hook preexec devtrace-cmd
fi
devtrace-cmd() {
if [ -n "$DEVTRACE_SESSION_ID" ]; then
python -c "from devtrace.storage import Database, Path; db = Database(Path('~/.devtrace/devtrace.db')); db.add_command_event($DEVTRACE_SESSION_ID, '$1', None, '$(pwd)')"
fi
}'''
else:
return "# Shell type not supported for automatic integration"