2026-04-16 08:08:28 +08:00

698 lines
32 KiB
Python

"""
Hermes Agent Core Module - Multi-User Version with SSH Remote Skills
Implements the core functionality of Hermes Agent as a Python module
that can be integrated into ahserver applications with full multi-user support
and SSH remote skills deployment and execution capabilities.
"""
import asyncio
import json
import os
import subprocess
import tempfile
import shutil
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
import uuid
# Import required dependencies
try:
from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify
from sqlor.dbpools import DBPools
except ImportError:
# For standalone testing
class ServerEnv:
def __init__(self):
pass
def awaitify(func):
async def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
class DBPools:
def __init__(self):
pass
@dataclass
class HermesConfig:
"""Configuration for Hermes Agent module"""
work_dir: str = "./hermes_work"
class HermesAgent:
"""Core Hermes Agent implementation with multi-user support and SSH remote skills"""
def __init__(self, config: Optional[HermesConfig] = None):
self.config = config or HermesConfig()
self._ensure_paths()
self.db = DBPools()
def _ensure_paths(self):
"""Ensure all required paths exist"""
os.makedirs(self.config.work_dir, exist_ok=True)
def _get_current_user_id(self, context: Dict[str, Any]) -> str:
"""Get current user ID from request context"""
# In ahserver, user context is typically available in the request
user_id = context.get('user_id') or context.get('userid')
if not user_id:
raise ValueError("User ID not found in context. User must be authenticated.")
return str(user_id)
async def execute_tool_call(self, tool_name: str, parameters: Dict[str, Any],
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute a tool call with given parameters
Args:
tool_name: Name of the tool to execute
parameters: Parameters for the tool
context: Request context containing user information
Returns:
Result of the tool execution
"""
# This would integrate with actual tool implementations
# For now, return a mock response structure
return {
"success": True,
"tool_name": tool_name,
"parameters": parameters,
"user_id": self._get_current_user_id(context) if context else "anonymous",
"timestamp": datetime.now().isoformat(),
"result": f"Executed {tool_name} with parameters: {parameters}"
}
async def manage_memory(self, action: str, target: str, content: str = "",
old_text: str = "", context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Manage persistent memory operations with user isolation
Args:
action: 'add', 'replace', or 'remove'
target: 'memory' or 'user'
content: Content to add/replace (required for add/replace)
old_text: Text to identify entry for replace/remove
context: Request context containing user information
Returns:
Memory operation result
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
async with self.db.sqlorContext('default') as sor:
if action == "add":
memory_id = str(uuid.uuid4())
data = {
'id': memory_id,
'user_id': user_id,
'target': target,
'content': content,
'created_at': datetime.now(),
'updated_at': datetime.now()
}
result = await sor.C('hermes_memory', data)
return {"success": True, "action": action, "id": memory_id, "user_id": user_id}
elif action == "replace":
filters = {
'user_id': user_id,
'content': old_text
}
records = await sor.R('hermes_memory', filters)
if not records:
return {"success": False, "error": "Memory entry not found"}
record = records[0]
data = {
'id': record['id'],
'user_id': user_id,
'target': target,
'content': content,
'updated_at': datetime.now()
}
result = await sor.U('hermes_memory', data)
return {"success": True, "action": action, "id": record['id'], "user_id": user_id}
elif action == "remove":
filters = {
'user_id': user_id,
'content': old_text
}
records = await sor.R('hermes_memory', filters)
if not records:
return {"success": False, "error": "Memory entry not found"}
record = records[0]
result = await sor.D('hermes_memory', {'id': record['id']})
return {"success": True, "action": action, "id": record['id'], "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def search_sessions(self, query: str = "", limit: int = 3,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Search across past conversation sessions for current user
Args:
query: Search query (empty for recent sessions)
limit: Maximum number of sessions to return
context: Request context containing user information
Returns:
Search results
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
async with self.db.sqlorContext('default') as sor:
filters = {'user_id': user_id}
if query:
filters['$or'] = [
{'title': {'$like': f'%{query}%'}},
{'preview': {'$like': f'%{query}%'}},
{'tags': {'$like': f'%{query}%'}}
]
sessions = await sor.R('hermes_sessions', filters,
orderby='started_at DESC', limit=limit)
return {
"success": True,
"sessions": sessions,
"query": query,
"limit": limit,
"user_id": user_id
}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def manage_skills(self, action: str, name: str,
context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
"""
Manage local skills (create, update, delete, view) with user isolation
Args:
action: 'create', 'patch', 'edit', 'delete', 'view'
name: Skill name
context: Request context containing user information
**kwargs: Additional parameters based on action
Returns:
Skill operation result
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
async with self.db.sqlorContext('default') as sor:
if action == "view":
filters = {'user_id': user_id, 'name': name}
skills = await sor.R('hermes_skills', filters)
if skills:
return {"success": True, "skill": skills[0], "user_id": user_id}
else:
return {"success": False, "error": "Skill not found", "user_id": user_id}
elif action == "create":
skill_id = str(uuid.uuid4())
data = {
'id': skill_id,
'user_id': user_id,
'name': name,
'description': kwargs.get('description', ''),
'category': kwargs.get('category', ''),
'version': kwargs.get('version', '1.0.0'),
'content': kwargs.get('content', ''),
'created_at': datetime.now(),
'updated_at': datetime.now()
}
result = await sor.C('hermes_skills', data)
return {"success": True, "action": action, "id": skill_id, "user_id": user_id}
elif action == "update":
filters = {'user_id': user_id, 'name': name}
skills = await sor.R('hermes_skills', filters)
if not skills:
return {"success": False, "error": "Skill not found", "user_id": user_id}
skill = skills[0]
data = {
'id': skill['id'],
'user_id': user_id,
'name': name,
'description': kwargs.get('description', skill['description']),
'category': kwargs.get('category', skill['category']),
'version': kwargs.get('version', skill['version']),
'content': kwargs.get('content', skill['content']),
'updated_at': datetime.now()
}
result = await sor.U('hermes_skills', data)
return {"success": True, "action": action, "id": skill['id'], "user_id": user_id}
elif action == "delete":
filters = {'user_id': user_id, 'name': name}
skills = await sor.R('hermes_skills', filters)
if not skills:
return {"success": False, "error": "Skill not found", "user_id": user_id}
result = await sor.D('hermes_skills', {'id': skills[0]['id']})
return {"success": True, "action": action, "id": skills[0]['id'], "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def manage_remote_skills(self, action: str, skill_id: str = None,
context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
"""
Manage remote skills with SSH deployment and execution capabilities
Args:
action: 'create', 'read', 'update', 'delete', 'list', 'deploy', 'execute', 'list_remote'
skill_id: Remote skill ID (required for most actions)
context: Request context containing user information
**kwargs: Additional parameters based on action
Returns:
Remote skill operation result
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
async with self.db.sqlorContext('default') as sor:
if action == "create":
# Create new remote skill configuration
new_skill_id = str(uuid.uuid4())
data = {
'id': new_skill_id,
'user_id': user_id,
'name': kwargs.get('name'),
'host': kwargs.get('host'),
'port': kwargs.get('port', 22),
'username': kwargs.get('username'),
'remote_path': kwargs.get('remote_path', '~/.skills'),
'auth_method': kwargs.get('auth_method', 'key'),
'ssh_key_path': kwargs.get('ssh_key_path'),
'description': kwargs.get('description', ''),
'category': kwargs.get('category', ''),
'version': kwargs.get('version', '1.0.0'),
'enabled': kwargs.get('enabled', True),
'created_at': datetime.now(),
'updated_at': datetime.now()
}
# Validate required fields
required_fields = ['name', 'host', 'username']
for field in required_fields:
if not data.get(field):
return {"success": False, "error": f"Missing required field: {field}", "user_id": user_id}
result = await sor.C('hermes_remote_skills', data)
return {"success": True, "action": action, "id": new_skill_id, "user_id": user_id}
elif action == "read":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('hermes_remote_skills', filters)
if skills:
return {"success": True, "skill": skills[0], "user_id": user_id}
else:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
elif action == "update":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
existing_skills = await sor.R('hermes_remote_skills', filters)
if not existing_skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
existing_skill = existing_skills[0]
data = {
'id': skill_id,
'user_id': user_id,
'name': kwargs.get('name', existing_skill['name']),
'host': kwargs.get('host', existing_skill['host']),
'port': kwargs.get('port', existing_skill['port']),
'username': kwargs.get('username', existing_skill['username']),
'remote_path': kwargs.get('remote_path', existing_skill['remote_path']),
'auth_method': kwargs.get('auth_method', existing_skill['auth_method']),
'ssh_key_path': kwargs.get('ssh_key_path', existing_skill['ssh_key_path']),
'description': kwargs.get('description', existing_skill['description']),
'category': kwargs.get('category', existing_skill['category']),
'version': kwargs.get('version', existing_skill['version']),
'enabled': kwargs.get('enabled', existing_skill['enabled']),
'updated_at': datetime.now()
}
result = await sor.U('hermes_remote_skills', data)
return {"success": True, "action": action, "id": skill_id, "user_id": user_id}
elif action == "delete":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
existing_skills = await sor.R('hermes_remote_skills', filters)
if not existing_skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
result = await sor.D('hermes_remote_skills', {'id': skill_id})
return {"success": True, "action": action, "id": skill_id, "user_id": user_id}
elif action == "list":
filters = {'user_id': user_id}
# Apply optional filters
if 'name' in kwargs:
filters['name'] = kwargs['name']
if 'host' in kwargs:
filters['host'] = kwargs['host']
if 'enabled' in kwargs:
filters['enabled'] = kwargs['enabled']
skills = await sor.R('hermes_remote_skills', filters, orderby='name ASC')
return {"success": True, "skills": skills, "user_id": user_id}
elif action == "deploy":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('hermes_remote_skills', filters)
if not skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
skill = skills[0]
if not skill.get('enabled'):
return {"success": False, "error": "Remote skill is disabled", "user_id": user_id}
# Deploy skill to remote host
deploy_result = await self._deploy_remote_skill(skill, kwargs.get('skill_content', ''))
if deploy_result['success']:
# Update last_deployed timestamp
update_data = {
'id': skill_id,
'user_id': user_id,
'last_deployed': datetime.now(),
'updated_at': datetime.now()
}
await sor.U('hermes_remote_skills', update_data)
return deploy_result
elif action == "execute":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('hermes_remote_skills', filters)
if not skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
skill = skills[0]
if not skill.get('enabled'):
return {"success": False, "error": "Remote skill is disabled", "user_id": user_id}
# Execute remote skill
execute_result = await self._execute_remote_skill(skill, kwargs.get('parameters', {}))
if execute_result['success']:
# Update last_executed timestamp
update_data = {
'id': skill_id,
'user_id': user_id,
'last_executed': datetime.now(),
'updated_at': datetime.now()
}
await sor.U('hermes_remote_skills', update_data)
return execute_result
elif action == "list_remote":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('hermes_remote_skills', filters)
if not skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
skill = skills[0]
if not skill.get('enabled'):
return {"success": False, "error": "Remote skill is disabled", "user_id": user_id}
# List available skills on remote host
return await self._list_remote_skills(skill)
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def _deploy_remote_skill(self, skill_config: Dict[str, Any], skill_content: str) -> Dict[str, Any]:
"""
Deploy a skill to remote host via SSH
Args:
skill_config: Remote skill configuration
skill_content: Skill content to deploy
Returns:
Deployment result
"""
try:
# Create temporary directory for skill files
with tempfile.TemporaryDirectory() as temp_dir:
skill_name = skill_config['name']
skill_dir = os.path.join(temp_dir, skill_name)
os.makedirs(skill_dir, exist_ok=True)
# Write skill content to SKILL.md
skill_file = os.path.join(skill_dir, 'SKILL.md')
with open(skill_file, 'w', encoding='utf-8') as f:
f.write(skill_content)
# Build rsync/scp command
remote_path = skill_config['remote_path'].replace('~', f"/home/{skill_config['username']}")
remote_skill_path = os.path.join(remote_path, skill_name)
ssh_options = []
if skill_config.get('port'):
ssh_options.extend(['-p', str(skill_config['port'])])
if skill_config.get('auth_method') == 'key' and skill_config.get('ssh_key_path'):
ssh_options.extend(['-i', skill_config['ssh_key_path']])
# Create remote directory if it doesn't exist
mkdir_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
f"mkdir -p '{remote_path}'"
]
result = subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {
"success": False,
"error": f"Failed to create remote directory: {result.stderr}",
"stdout": result.stdout,
"stderr": result.stderr
}
# Deploy skill using rsync (preferred) or scp
try:
# Try rsync first
rsync_cmd = ['rsync', '-avz'] + ssh_options + [
f"{skill_dir}/",
f"{skill_config['username']}@{skill_config['host']}:{remote_skill_path}/"
]
result = subprocess.run(rsync_cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, rsync_cmd, result.stdout, result.stderr)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fall back to scp
scp_cmd = ['scp'] + ssh_options + ['-r'] + [
f"{skill_dir}/",
f"{skill_config['username']}@{skill_config['host']}:{remote_skill_path}/"
]
result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
return {
"success": False,
"error": f"Failed to deploy skill: {result.stderr}",
"stdout": result.stdout,
"stderr": result.stderr
}
return {
"success": True,
"message": f"Skill '{skill_name}' deployed successfully to {skill_config['host']}",
"remote_path": remote_skill_path
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Deployment timeout"}
except Exception as e:
return {"success": False, "error": f"Deployment failed: {str(e)}"}
async def _execute_remote_skill(self, skill_config: Dict[str, Any], parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a remote skill via SSH
Args:
skill_config: Remote skill configuration
parameters: Parameters for skill execution
Returns:
Execution result
"""
try:
skill_name = skill_config['name']
remote_path = skill_config['remote_path'].replace('~', f"/home/{skill_config['username']}")
skill_script_path = os.path.join(remote_path, skill_name, 'execute.py')
# Check if execute.py exists on remote host
ssh_options = []
if skill_config.get('port'):
ssh_options.extend(['-p', str(skill_config['port'])])
if skill_config.get('auth_method') == 'key' and skill_config.get('ssh_key_path'):
ssh_options.extend(['-i', skill_config['ssh_key_path']])
check_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
f"test -f '{skill_script_path}' && echo 'exists' || echo 'not_exists'"
]
result = subprocess.run(check_cmd, capture_output=True, text=True, timeout=30)
if 'not_exists' in result.stdout:
# Fall back to executing the skill directly via hermes skill system
skill_full_path = os.path.join(remote_path, skill_name)
execute_cmd = f"cd {remote_path} && hermes skill_view --name {skill_name} && echo 'Skill executed'"
else:
# Execute the custom execute.py script
param_json = json.dumps(parameters) if parameters else '{}'
execute_cmd = f"cd {remote_path} && python3 {skill_script_path} '{param_json}'"
# Execute the command
final_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
execute_cmd
]
result = subprocess.run(final_cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
return {
"success": True,
"result": result.stdout,
"skill_name": skill_name,
"host": skill_config['host']
}
else:
return {
"success": False,
"error": result.stderr,
"stdout": result.stdout,
"stderr": result.stderr,
"skill_name": skill_name,
"host": skill_config['host']
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Execution timeout"}
except Exception as e:
return {"success": False, "error": f"Execution failed: {str(e)}"}
async def _list_remote_skills(self, skill_config: Dict[str, Any]) -> Dict[str, Any]:
"""
List available skills on remote host
Args:
skill_config: Remote skill configuration
Returns:
List of available skills
"""
try:
remote_path = skill_config['remote_path'].replace('~', f"/home/{skill_config['username']}")
ssh_options = []
if skill_config.get('port'):
ssh_options.extend(['-p', str(skill_config['port'])])
if skill_config.get('auth_method') == 'key' and skill_config.get('ssh_key_path'):
ssh_options.extend(['-i', skill_config['ssh_key_path']])
# List directories in remote skills path
list_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
f"find '{remote_path}' -maxdepth 1 -type d -not -path '{remote_path}' -exec basename {{}} \\;"
]
result = subprocess.run(list_cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
skills = [line.strip() for line in result.stdout.split('\n') if line.strip()]
return {
"success": True,
"skills": skills,
"remote_path": remote_path,
"host": skill_config['host']
}
else:
return {
"success": False,
"error": result.stderr,
"stdout": result.stdout,
"stderr": result.stderr,
"host": skill_config['host']
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "List timeout"}
except Exception as e:
return {"success": False, "error": f"List failed: {str(e)}"}
# Global instance for module functions
_hermes_instance = None
def get_hermes_agent():
"""Get or create the global Hermes agent instance"""
global _hermes_instance
if _hermes_instance is None:
_hermes_instance = HermesAgent()
return _hermes_instance
# Exposed async functions for frontend integration
# These functions expect the ahserver context to be passed automatically
async def hermes_execute_tool(tool_name: str, parameters: Dict[str, Any]):
"""Execute a Hermes tool with current user context"""
agent = get_hermes_agent()
return await agent.execute_tool_call(tool_name, parameters)
async def hermes_manage_memory(action: str, target: str, content: str = "", old_text: str = ""):
"""Manage Hermes memory with current user context"""
agent = get_hermes_agent()
return await agent.manage_memory(action, target, content, old_text)
async def hermes_search_sessions(query: str = "", limit: int = 3):
"""Search Hermes sessions with current user context"""
agent = get_hermes_agent()
return await agent.search_sessions(query, limit)
async def hermes_manage_skills(action: str, name: str, **kwargs):
"""Manage local Hermes skills with current user context"""
agent = get_hermes_agent()
return await agent.manage_skills(action, name, **kwargs)
async def hermes_manage_remote_skills(action: str, skill_id: str = None, **kwargs):
"""Manage remote Hermes skills with SSH deployment and execution"""
agent = get_hermes_agent()
return await agent.manage_remote_skills(action, skill_id, **kwargs)
async def hermes_get_config():
"""Get Hermes configuration"""
agent = get_hermes_agent()
return {
"work_dir": agent.config.work_dir
}
# Helper function to get current user from ahserver context
async def hermes_get_current_user():
"""Get current user information from ahserver context"""
try:
from ahserver.serverenv import ServerEnv
env = ServerEnv()
user_id = getattr(env, 'user_id', None) or getattr(env, 'userid', None)
return {"user_id": user_id} if user_id else {"user_id": None}
except:
return {"user_id": None}