1404 lines
60 KiB
Python

"""
Hermes Agent Core Module - Enhanced with Intelligent Memory Filtering and True Orchestration
Implements the core functionality of Hermes Agent as a Python module
that can be integrated into ahserver applications with full multi-user support,
SSH remote skills, intelligent memory management with token optimization,
and true workflow orchestration capabilities.
"""
import asyncio
import json
import os
import subprocess
import tempfile
import shutil
from typing import Dict, Any, List, Optional, Callable, Tuple
from dataclasses import dataclass
from datetime import datetime
import uuid
from appPublic.uniqueID import getID
import re
# Import required dependencies
try:
from ahserver.serverenv import ServerEnv
from appPublic.worker import awaitify
from sqlor.dbpools import DBPools
except ImportError:
# For standalone testing
class ServerEnv:
def __init__(self):
pass
def awaitify(func):
async def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
class DBPools:
def __init__(self):
pass
def getConfig():
class Config:
databases = None
return Config()
@dataclass
class HermesConfig:
"""Configuration for Hermes Agent module"""
work_dir: str = "./hermes_work"
skills_path: str = "~/.hermes/skills" # Path to skills directory
# Intelligent memory filtering configuration
max_memory_tokens: int = 2000 # Maximum tokens for memory context
default_priority: int = 50 # Default priority for new memories (0-100)
high_priority_threshold: int = 70 # Threshold for high priority memories
low_priority_threshold: int = 30 # Threshold for low priority memories
auto_cleanup_enabled: bool = True # Enable automatic memory cleanup
min_retention_days: int = 30 # Minimum days to retain memories
class HermesAgent:
"""Core Hermes Agent implementation with intelligent memory filtering and orchestration"""
def __init__(self, config: Optional[HermesConfig] = None):
self.config = config or HermesConfig()
self._ensure_paths()
self.orchestrator = None # Will be initialized when needed
def _ensure_paths(self):
"""Ensure all required paths exist"""
os.makedirs(self.config.work_dir, exist_ok=True)
def _get_current_user_id(self, context: Dict[str, Any]) -> str:
"""Get current user ID from request context"""
# In ahserver, user context is typically available in the request
user_id = context.get('user_id') or context.get('userid')
if not user_id:
raise ValueError("User ID not found in context. User must be authenticated.")
return str(user_id)
def _validate_skill_name(self, name: str) -> bool:
"""
Validate skill name to prevent security issues
Args:
name: Skill name to validate
Returns:
True if valid, False otherwise
"""
if not name or not isinstance(name, str):
return False
# Check length
if len(name) > 64 or len(name) < 1:
return False
# Check for allowed characters (alphanumeric, underscore, hyphen, dot)
import re
if not re.match(r'^[a-zA-Z0-9._-]+$', name):
return False
# Prevent path traversal
if '..' in name or '/' in name or '\\' in name:
return False
# Prevent reserved names
reserved_names = ['.', '..', 'con', 'prn', 'aux', 'nul', 'com1', 'com2', 'com3', 'com4', 'com5', 'com6', 'com7', 'com8', 'com9', 'lpt1', 'lpt2', 'lpt3', 'lpt4', 'lpt5', 'lpt6', 'lpt7', 'lpt8', 'lpt9']
if name.lower() in reserved_names:
return False
return True
def _validate_skill_content(self, content: str) -> bool:
"""
Validate skill content to prevent security issues
Args:
content: Skill content to validate
Returns:
True if valid, False otherwise
"""
if not content or not isinstance(content, str):
return False
# Check minimum length
if len(content.strip()) < 10:
return False
# Dangerous command patterns to block
dangerous_patterns = [
r'rm\s+-rf\s+/',
r'rm\s+-fr\s+/',
r'dd\s+if=/dev/zero',
r':\(\)\{\s*:\s*\|\s*:\s*&\s*\};:',
r'cat\s+/etc/passwd',
r'cat\s+/etc/shadow',
r'wget\s+http[s]?://[^ ]+',
r'curl\s+http[s]?://[^ ]+',
r'sudo\s+',
r'chmod\s+777\s+',
r'chown\s+root:root\s+/',
r'mkfs\.',
r'fdisk\s+',
r'parted\s+',
r'dd\s+if=/dev',
r'>\s*/dev/sda',
r'>\s*/dev/hda',
r'echo\s+.*>\s*/etc/',
r'cp\s+.*\s+/etc/',
r'mv\s+.*\s+/etc/',
r'ln\s+-sf\s+.*\s+/etc/',
r'iptables\s+',
r'ufw\s+',
r'firewall-cmd\s+',
r'systemctl\s+stop\s+',
r'service\s+.*\s+stop',
r'pkill\s+',
r'killall\s+',
r'init\s+0',
r'shutdown\s+',
r'reboot\s+',
r'halt\s+',
r'poweroff\s+'
]
import re
content_lower = content.lower()
for pattern in dangerous_patterns:
if re.search(pattern, content_lower):
return False
# Check for excessive command chaining
command_chains = content.count('&&') + content.count('||') + content.count(';')
if command_chains > 3:
return False
# Check for obfuscated commands (excessive base64, hex, etc.)
if re.search(r'[A-Za-z0-9+/]{100,}', content): # Long base64-like strings
return False
if re.search(r'\\x[0-9a-fA-F]{2}', content): # Hex escape sequences
return False
return True
def _estimate_tokens(self, text: str) -> int:
"""
Estimate token count for given text using simple heuristic
This is a rough estimation - actual tokenizers may vary
"""
# Simple heuristic: average 4 characters per token
return max(1, len(text) // 4)
def _classify_memory_priority(self, content: str, target: str) -> int:
"""
Classify memory priority based on content and target type
Returns priority score (0-100)
"""
priority = self.config.default_priority
# User preferences get highest priority
if target == "user":
priority = max(priority, 80)
# Check for high-value keywords that indicate important memories
high_value_patterns = [
r"(?i)remember this", r"(?i)don't forget", r"(?i)important",
r"(?i)preference", r"(?i)requirement", r"(?i)must", r"(?i)always",
r"(?i)never", r"(?i)critical", r"(?i)essential"
]
for pattern in high_value_patterns:
if re.search(pattern, content):
priority = max(priority, self.config.high_priority_threshold + 10)
break
# Very short memories might be less valuable
if len(content) < 20:
priority = min(priority, self.config.low_priority_threshold - 10)
# Cap priority between 0-100
return max(0, min(100, priority))
def _get_user_permissions(self, context: Dict[str, Any]) -> List[str]:
"""
Get user permissions from request context
Args:
context: Request context containing user information
Returns:
List of user permissions
"""
if not context:
# Anonymous user gets minimal permissions
return ['file_read', 'memory_read', 'skill_read']
# In a real implementation, this would check RBAC or similar
# For now, return all permissions for authenticated users
user_id = context.get('user_id') or context.get('userid')
if user_id:
return [
'file_read', 'file_write',
'system_execute', 'system_manage',
'browser_access',
'ai_vision', 'ai_tts',
'memory_manage', 'memory_read',
'skill_read', 'skill_manage',
'task_manage', 'task_delegate',
'user_interact', 'schedule_manage',
'config_read'
]
else:
return ['file_read', 'memory_read', 'skill_read']
async def _execute_tool_with_retry(self, tool_func: Callable, params: dict,
tool_name: str, user_id: str) -> Dict[str, Any]:
"""
Execute a tool with retry logic and proper error handling
Args:
tool_func: The tool function to execute
params: Parameters for the tool
tool_name: Name of the tool (for logging)
user_id: User ID (for logging)
Returns:
Result of the tool execution
"""
import asyncio
import time
# Get tool metadata for retry configuration
if hasattr(self, '_tool_registry'):
metadata = self._tool_registry.get_tool_metadata(tool_name)
max_retries = metadata.get('max_retries', 3) if metadata else 3
timeout = metadata.get('timeout', 300) if metadata else 300
else:
max_retries = 3
timeout = 300
last_error = None
for attempt in range(max_retries):
try:
# Add user context to parameters if needed
params_with_context = params.copy()
# Execute with timeout
result = await asyncio.wait_for(
tool_func(**params_with_context),
timeout=timeout
)
# Ensure result is a dictionary
if isinstance(result, dict):
result.update({
"success": True,
"tool_name": tool_name,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"attempt": attempt + 1
})
else:
result = {
"success": True,
"tool_name": tool_name,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"result": result,
"attempt": attempt + 1
}
return result
except asyncio.TimeoutError as e:
last_error = f"Timeout after {timeout} seconds: {str(e)}"
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
break
except Exception as e:
last_error = str(e)
# Don't retry on certain errors
error_str = str(e).lower()
if any(keyword in error_str for keyword in ['permission', 'security', 'validation']):
break
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
break
return {
"success": False,
"error": last_error,
"tool_name": tool_name,
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"attempts": max_retries
}
async def _get_intelligent_memory_context(self, user_id: str,
current_task: str = "",
max_tokens: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get intelligent memory context optimized for token usage
Args:
user_id: Current user ID
current_task: Current task description for relevance filtering
max_tokens: Maximum tokens allowed (defaults to config.max_memory_tokens)
Returns:
List of relevant memory entries sorted by priority and relevance
"""
max_tokens = max_tokens or self.config.max_memory_tokens
current_tokens = 0
selected_memories = []
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
# High priority memories
ns = {'user_id': user_id, 'priority__gte': self.config.high_priority_threshold, 'sort': 'priority desc,last_accessed desc'}
hp_rows = await sor.R('hermes_memory', ns)
high_priority_memories = hp_rows or []
for memory in high_priority_memories:
memory_tokens = self._estimate_tokens(memory['content'])
if current_tokens + memory_tokens <= max_tokens:
selected_memories.append(memory)
current_tokens += memory_tokens
await self._update_memory_access_stats(user_id, memory['id'])
else:
break
remaining_tokens = max_tokens - current_tokens
if remaining_tokens > 0:
mp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__gte': self.config.low_priority_threshold, 'priority__lt': self.config.high_priority_threshold, 'sort': 'last_accessed desc,priority desc'})
medium_priority_memories = mp_rows or []
for memory in medium_priority_memories:
memory_tokens = self._estimate_tokens(memory['content'])
if current_tokens + memory_tokens <= max_tokens:
selected_memories.append(memory)
current_tokens += memory_tokens
await self._update_memory_access_stats(user_id, memory['id'])
else:
break
remaining_tokens = max_tokens - current_tokens
if remaining_tokens > 0:
lp_rows = await sor.R('hermes_memory', {'user_id': user_id, 'priority__lt': self.config.low_priority_threshold, 'sort': 'last_accessed desc'})
low_priority_memories = lp_rows or []
for memory in low_priority_memories:
memory_tokens = self._estimate_tokens(memory['content'])
if current_tokens + memory_tokens <= max_tokens:
selected_memories.append(memory)
current_tokens += memory_tokens
await self._update_memory_access_stats(user_id, memory['id'])
else:
break
return selected_memories
except Exception as e:
# Return empty list on error to avoid breaking the main flow
return []
async def _update_memory_access_stats(self, user_id: str, memory_id: str):
"""Update memory access statistics"""
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
memories = await sor.R('hermes_memory', {'id': memory_id, 'user_id': user_id})
if memories:
current_count = memories[0].get('access_count', 0)
update_data = {
'id': memory_id,
'user_id': user_id,
'access_count': current_count + 1,
'last_accessed': datetime.now(),
'updated_at': datetime.now()
}
await sor.U('hermes_memory', update_data)
except Exception:
pass
async def _cleanup_old_memories(self, user_id: str):
"""Clean up old, low-priority memories to maintain efficiency"""
if not self.config.auto_cleanup_enabled:
return
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
# Calculate cutoff date
cutoff_date = datetime.now().replace(
day=datetime.now().day - self.config.min_retention_days
)
# Find old, low-priority memories that haven't been accessed recently
cleanup_filters = {
'user_id': user_id,
'priority': {'$lt': self.config.low_priority_threshold},
'created_at': {'$lt': cutoff_date.isoformat()},
'access_count': {'$lt': 3} # Accessed less than 3 times
}
old_memories = await sor.R('hermes_memory', cleanup_filters)
deleted_count = 0
for memory in old_memories:
result = await sor.D('hermes_memory', {'id': memory['id']})
deleted_count += 1
return deleted_count
except Exception:
# Silently ignore cleanup errors
return 0
async def execute_tool_call(self, tool_name: str, parameters: Dict[str, Any],
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute a tool call with given parameters
Args:
tool_name: Name of the tool to execute
parameters: Parameters for the tool
context: Request context containing user information
Returns:
Result of the tool execution
"""
# Initialize tool registry if not already done
if not hasattr(self, '_tool_registry'):
from .tools.registration import create_tool_registry
self._tool_registry = create_tool_registry()
# Get tool information
tool_info = self._tool_registry.get_tool(tool_name)
if not tool_info:
return {
"success": False,
"error": f"Tool '{tool_name}' not found",
"available_tools": self._tool_registry.list_tools()
}
# Check user permissions
user_permissions = self._get_user_permissions(context)
if not self._tool_registry.has_permission(tool_name, user_permissions):
return {
"success": False,
"error": f"Insufficient permissions to execute tool '{tool_name}'"
}
# Get user ID for logging
user_id = self._get_current_user_id(context) if context else "anonymous"
# Execute the tool with proper error handling and retries
try:
result = await self._execute_tool_with_retry(
tool_info['function'],
parameters,
tool_name,
user_id
)
return result
except Exception as e:
return {
"success": False,
"error": str(e),
"tool_name": tool_name,
"user_id": user_id,
"timestamp": datetime.now().isoformat()
}
async def manage_memory(self, action: str, target: str, content: str = "",
old_text: str = "", context: Dict[str, Any] = None,
priority: Optional[int] = None) -> Dict[str, Any]:
"""
Manage persistent memory operations with intelligent filtering and user isolation
Args:
action: 'add', 'replace', or 'remove'
target: 'memory' or 'user'
content: Content to add/replace (required for add/replace)
old_text: Text to identify entry for replace/remove
context: Request context containing user information
priority: Optional priority override (0-100)
Returns:
Memory operation result
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
if action == "add":
memory_id = getID()
# Auto-classify priority if not provided
final_priority = priority if priority is not None else self._classify_memory_priority(content, target)
data = {
'id': memory_id,
'user_id': user_id,
'target': target,
'content': content,
'priority': final_priority,
'access_count': 0,
'created_at': datetime.now(),
'updated_at': datetime.now()
}
result = await sor.C('hermes_memory', data)
return {"success": True, "action": action, "id": memory_id, "user_id": user_id, "priority": final_priority}
elif action == "replace":
filters = {
'user_id': user_id,
'content': old_text
}
records = await sor.R('hermes_memory', filters)
if not records:
return {"success": False, "error": "Memory entry not found"}
record = records[0]
# Preserve original priority unless explicitly overridden
final_priority = priority if priority is not None else record.get('priority', self.config.default_priority)
data = {
'id': record['id'],
'user_id': user_id,
'target': target,
'content': content,
'priority': final_priority,
'updated_at': datetime.now()
}
result = await sor.U('hermes_memory', data)
return {"success": True, "action": action, "id": record['id'], "user_id": user_id, "priority": final_priority}
elif action == "remove":
filters = {
'user_id': user_id,
'content': old_text
}
records = await sor.R('hermes_memory', filters)
if not records:
return {"success": False, "error": "Memory entry not found"}
record = records[0]
result = await sor.D('hermes_memory', {'id': record['id']})
return {"success": True, "action": action, "id": record['id'], "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def get_intelligent_memory_context(self, current_task: str = "",
context: Dict[str, Any] = None,
max_tokens: Optional[int] = None) -> Dict[str, Any]:
"""
Get intelligent memory context optimized for current task and token usage
Args:
current_task: Description of current task for relevance filtering
context: Request context containing user information
max_tokens: Maximum tokens allowed for memory context
Returns:
Optimized memory context with token count and selected memories
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
memories = await self._get_intelligent_memory_context(user_id, current_task, max_tokens)
# Calculate total tokens
total_tokens = sum(self._estimate_tokens(m['content']) for m in memories)
# Perform cleanup if enabled
if self.config.auto_cleanup_enabled:
await self._cleanup_old_memories(user_id)
return {
"success": True,
"memories": memories,
"total_tokens": total_tokens,
"max_tokens": max_tokens or self.config.max_memory_tokens,
"user_id": user_id,
"memory_count": len(memories)
}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def search_sessions(self, query: str = "", limit: int = 3,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Search across past conversation sessions for current user
Args:
query: Search query (empty for recent sessions)
limit: Maximum number of sessions to return
context: Request context containing user information
Returns:
Search results
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
filters = {'user_id': user_id}
if query:
filters['$or'] = [
{'title': {'$like': f'%{query}%'}},
{'preview': {'$like': f'%{query}%'}},
{'tags': {'$like': f'%{query}%'}}
]
sessions = await sor.R('hermes_sessions', {'user_id': user_id, 'sort': 'started_at desc'})
sessions = (sessions or [])[:limit]
return {
"success": True,
"sessions": sessions,
"query": query,
"limit": limit,
"user_id": user_id
}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def manage_skills(self, action: str, name: str,
context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
"""
Manage local skills (create, update, delete, view) with user isolation and security validation
Args:
action: 'create', 'patch', 'edit', 'delete', 'view'
name: Skill name
context: Request context containing user information
**kwargs: Additional parameters based on action
Returns:
Skill operation result
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
# Validate skill name
if not self._validate_skill_name(name):
return {"success": False, "error": "Invalid skill name", "user_id": user_id}
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
if action == "view":
filters = {'user_id': user_id, 'name': name}
skills = await sor.R('harnessed_skills', {'user_id': user_id, 'name': name})
if skills:
return {"success": True, "skill": skills[0], "user_id": user_id}
else:
return {"success": False, "error": "Skill not found", "user_id": user_id}
elif action == "create":
# Validate skill content
skill_content = kwargs.get('content', '')
if not self._validate_skill_content(skill_content):
return {"success": False, "error": "Invalid skill content", "user_id": user_id}
skill_id = getID()
data = {
'id': skill_id,
'user_id': user_id,
'name': name,
'description': kwargs.get('description', ''),
'category': kwargs.get('category', ''),
'version': kwargs.get('version', '1.0.0'),
'content': skill_content,
'created_at': datetime.now(),
'updated_at': datetime.now()
}
result = await sor.C('harnessed_skills', data)
return {"success": True, "action": action, "id": skill_id, "user_id": user_id}
elif action == "update":
filters = {'user_id': user_id, 'name': name}
skills = await sor.R('harnessed_skills', {'user_id': user_id, 'name': name})
if not skills:
return {"success": False, "error": "Skill not found", "user_id": user_id}
skill = skills[0]
# Validate updated skill content
updated_content = kwargs.get('content', skill['content'])
if not self._validate_skill_content(updated_content):
return {"success": False, "error": "Invalid skill content", "user_id": user_id}
data = {
'id': skill['id'],
'user_id': user_id,
'name': name,
'description': kwargs.get('description', skill['description']),
'category': kwargs.get('category', skill['category']),
'version': kwargs.get('version', skill['version']),
'content': updated_content,
'updated_at': datetime.now()
}
result = await sor.U('harnessed_skills', data)
return {"success": True, "action": action, "id": skill['id'], "user_id": user_id}
elif action == "delete":
filters = {'user_id': user_id, 'name': name}
skills = await sor.R('harnessed_skills', {'user_id': user_id, 'name': name})
if not skills:
return {"success": False, "error": "Skill not found", "user_id": user_id}
result = await sor.D('harnessed_skills', {'id': skills[0]['id']})
return {"success": True, "action": action, "id": skills[0]['id'], "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def manage_remote_skills(self, action: str, skill_id: str = None,
context: Dict[str, Any] = None, **kwargs) -> Dict[str, Any]:
"""
Manage remote skills with SSH deployment and execution capabilities
Args:
action: 'create', 'read', 'update', 'delete', 'list', 'deploy', 'execute', 'list_remote'
skill_id: Remote skill ID (required for most actions)
context: Request context containing user information
**kwargs: Additional parameters based on action
Returns:
Remote skill operation result
"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
if action == "create":
# Create new remote skill configuration
new_skill_id = getID()
data = {
'id': new_skill_id,
'user_id': user_id,
'name': kwargs.get('name'),
'host': kwargs.get('host'),
'port': kwargs.get('port', 22),
'username': kwargs.get('username'),
'remote_path': kwargs.get('remote_path', '~/.skills'),
'auth_method': kwargs.get('auth_method', 'key'),
'ssh_key_path': kwargs.get('ssh_key_path'),
'description': kwargs.get('description', ''),
'category': kwargs.get('category', ''),
'version': kwargs.get('version', '1.0.0'),
'enabled': kwargs.get('enabled', True),
'created_at': datetime.now(),
'updated_at': datetime.now()
}
# Validate required fields
required_fields = ['name', 'host', 'username']
for field in required_fields:
if not data.get(field):
return {"success": False, "error": f"Missing required field: {field}", "user_id": user_id}
result = await sor.C('harnessed_remote_skills', data)
return {"success": True, "action": action, "id": new_skill_id, "user_id": user_id}
elif action == "read":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('harnessed_remote_skills', filters)
if skills:
return {"success": True, "skill": skills[0], "user_id": user_id}
else:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
elif action == "update":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
existing_skills = await sor.R('harnessed_remote_skills', filters)
if not existing_skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
existing_skill = existing_skills[0]
data = {
'id': skill_id,
'user_id': user_id,
'name': kwargs.get('name', existing_skill['name']),
'host': kwargs.get('host', existing_skill['host']),
'port': kwargs.get('port', existing_skill['port']),
'username': kwargs.get('username', existing_skill['username']),
'remote_path': kwargs.get('remote_path', existing_skill['remote_path']),
'auth_method': kwargs.get('auth_method', existing_skill['auth_method']),
'ssh_key_path': kwargs.get('ssh_key_path', existing_skill['ssh_key_path']),
'description': kwargs.get('description', existing_skill['description']),
'category': kwargs.get('category', existing_skill['category']),
'version': kwargs.get('version', existing_skill['version']),
'enabled': kwargs.get('enabled', existing_skill['enabled']),
'updated_at': datetime.now()
}
result = await sor.U('harnessed_remote_skills', data)
return {"success": True, "action": action, "id": skill_id, "user_id": user_id}
elif action == "delete":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
existing_skills = await sor.R('harnessed_remote_skills', filters)
if not existing_skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
result = await sor.D('harnessed_remote_skills', {'id': skill_id})
return {"success": True, "action": action, "id": skill_id, "user_id": user_id}
elif action == "list":
filters = {'user_id': user_id}
# Apply optional filters
if 'name' in kwargs:
filters['name'] = kwargs['name']
if 'host' in kwargs:
filters['host'] = kwargs['host']
if 'enabled' in kwargs:
filters['enabled'] = kwargs['enabled']
sql_skills = "SELECT * FROM harnessed_remote_skills WHERE user_id = :user_id ORDER BY name ASC"
skills = await sor.R('harnessed_remote_skills', {'user_id': user_id, 'sort': 'name asc', **filters})
skills = skills or []
return {"success": True, "skills": skills, "user_id": user_id}
elif action == "deploy":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('harnessed_remote_skills', filters)
if not skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
skill = skills[0]
if not skill.get('enabled'):
return {"success": False, "error": "Remote skill is disabled", "user_id": user_id}
# Deploy skill to remote host
deploy_result = await self._deploy_remote_skill(skill, kwargs.get('skill_content', ''))
if deploy_result['success']:
# Update last_deployed timestamp
update_data = {
'id': skill_id,
'user_id': user_id,
'last_deployed': datetime.now(),
'updated_at': datetime.now()
}
await sor.U('harnessed_remote_skills', update_data)
return deploy_result
elif action == "execute":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('harnessed_remote_skills', filters)
if not skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
skill = skills[0]
if not skill.get('enabled'):
return {"success": False, "error": "Remote skill is disabled", "user_id": user_id}
# Execute remote skill
execute_result = await self._execute_remote_skill(skill, kwargs.get('parameters', {}))
if execute_result['success']:
# Update last_executed timestamp
update_data = {
'id': skill_id,
'user_id': user_id,
'last_executed': datetime.now(),
'updated_at': datetime.now()
}
await sor.U('harnessed_remote_skills', update_data)
return execute_result
elif action == "list_remote":
if not skill_id:
return {"success": False, "error": "skill_id required", "user_id": user_id}
filters = {'id': skill_id, 'user_id': user_id}
skills = await sor.R('harnessed_remote_skills', filters)
if not skills:
return {"success": False, "error": "Remote skill not found", "user_id": user_id}
skill = skills[0]
if not skill.get('enabled'):
return {"success": False, "error": "Remote skill is disabled", "user_id": user_id}
# List available skills on remote host
return await self._list_remote_skills(skill)
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def _deploy_remote_skill(self, skill_config: Dict[str, Any], skill_content: str) -> Dict[str, Any]:
"""
Deploy a skill to remote host via SSH
Args:
skill_config: Remote skill configuration
skill_content: Skill content to deploy
Returns:
Deployment result
"""
try:
# Create temporary directory for skill files
with tempfile.TemporaryDirectory() as temp_dir:
skill_name = skill_config['name']
skill_dir = os.path.join(temp_dir, skill_name)
os.makedirs(skill_dir, exist_ok=True)
# Write skill content to SKILL.md
skill_file = os.path.join(skill_dir, 'SKILL.md')
with open(skill_file, 'w', encoding='utf-8') as f:
f.write(skill_content)
# Build rsync/scp command
remote_path = skill_config['remote_path'].replace('~', f"/home/{skill_config['username']}")
remote_skill_path = os.path.join(remote_path, skill_name)
ssh_options = []
if skill_config.get('port'):
ssh_options.extend(['-p', str(skill_config['port'])])
if skill_config.get('auth_method') == 'key' and skill_config.get('ssh_key_path'):
ssh_options.extend(['-i', skill_config['ssh_key_path']])
# Create remote directory if it doesn't exist
mkdir_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
f"mkdir -p '{remote_path}'"
]
result = subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {
"success": False,
"error": f"Failed to create remote directory: {result.stderr}",
"stdout": result.stdout,
"stderr": result.stderr
}
# Deploy skill using rsync (preferred) or scp
try:
# Try rsync first
rsync_cmd = ['rsync', '-avz'] + ssh_options + [
f"{skill_dir}/",
f"{skill_config['username']}@{skill_config['host']}:{remote_skill_path}/"
]
result = subprocess.run(rsync_cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
raise subprocess.CalledProcessError(result.returncode, rsync_cmd, result.stdout, result.stderr)
except (subprocess.CalledProcessError, FileNotFoundError):
# Fall back to scp
scp_cmd = ['scp'] + ssh_options + ['-r'] + [
f"{skill_dir}/",
f"{skill_config['username']}@{skill_config['host']}:{remote_skill_path}/"
]
result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=60)
if result.returncode != 0:
return {
"success": False,
"error": f"Failed to deploy skill: {result.stderr}",
"stdout": result.stdout,
"stderr": result.stderr
}
return {
"success": True,
"message": f"Skill '{skill_name}' deployed successfully to {skill_config['host']}",
"remote_path": remote_skill_path
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Deployment timeout"}
except Exception as e:
return {"success": False, "error": f"Deployment failed: {str(e)}"}
async def _execute_remote_skill(self, skill_config: Dict[str, Any], parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a remote skill via SSH
Args:
skill_config: Remote skill configuration
parameters: Parameters for skill execution
Returns:
Execution result
"""
try:
skill_name = skill_config['name']
remote_path = skill_config['remote_path'].replace('~', f"/home/{skill_config['username']}")
skill_script_path = os.path.join(remote_path, skill_name, 'execute.py')
# Check if execute.py exists on remote host
ssh_options = []
if skill_config.get('port'):
ssh_options.extend(['-p', str(skill_config['port'])])
if skill_config.get('auth_method') == 'key' and skill_config.get('ssh_key_path'):
ssh_options.extend(['-i', skill_config['ssh_key_path']])
check_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
f"test -f '{skill_script_path}' && echo 'exists' || echo 'not_exists'"
]
result = subprocess.run(check_cmd, capture_output=True, text=True, timeout=30)
if 'not_exists' in result.stdout:
# Fall back to executing the skill directly via hermes skill system
skill_full_path = os.path.join(remote_path, skill_name)
execute_cmd = f"cd {remote_path} && hermes skill_view --name {skill_name} && echo 'Skill executed'"
else:
# Execute the custom execute.py script
param_json = json.dumps(parameters) if parameters else '{}'
execute_cmd = f"cd {remote_path} && python3 {skill_script_path} '{param_json}'"
# Execute the command
final_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
execute_cmd
]
result = subprocess.run(final_cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
return {
"success": True,
"result": result.stdout,
"skill_name": skill_name,
"host": skill_config['host']
}
else:
return {
"success": False,
"error": result.stderr,
"stdout": result.stdout,
"stderr": result.stderr,
"skill_name": skill_name,
"host": skill_config['host']
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Execution timeout"}
except Exception as e:
return {"success": False, "error": f"Execution failed: {str(e)}"}
async def _list_remote_skills(self, skill_config: Dict[str, Any]) -> Dict[str, Any]:
"""
List available skills on remote host
Args:
skill_config: Remote skill configuration
Returns:
List of available skills
"""
try:
remote_path = skill_config['remote_path'].replace('~', f"/home/{skill_config['username']}")
ssh_options = []
if skill_config.get('port'):
ssh_options.extend(['-p', str(skill_config['port'])])
if skill_config.get('auth_method') == 'key' and skill_config.get('ssh_key_path'):
ssh_options.extend(['-i', skill_config['ssh_key_path']])
# List directories in remote skills path
list_cmd = ['ssh'] + ssh_options + [
f"{skill_config['username']}@{skill_config['host']}",
f"find '{remote_path}' -maxdepth 1 -type d -not -path '{remote_path}' -exec basename {{}} \\;"
]
result = subprocess.run(list_cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
skills = [line.strip() for line in result.stdout.split('\n') if line.strip()]
return {
"success": True,
"skills": skills,
"remote_path": remote_path,
"host": skill_config['host']
}
else:
return {
"success": False,
"error": result.stderr,
"stdout": result.stdout,
"stderr": result.stderr,
"host": skill_config['host']
}
except subprocess.TimeoutExpired:
return {"success": False, "error": "List timeout"}
except Exception as e:
return {"success": False, "error": f"List failed: {str(e)}"}
# Orchestrator methods - integrated with the orchestrator module
async def create_workflow(self, name: str, description: str = "",
workflow_type: str = "sequential",
max_concurrent_tasks: int = 3,
timeout_seconds: int = 1800,
retry_count: int = 2,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Create a new workflow definition"""
from .orchestrator import get_hermes_orchestrator
orchestrator = get_hermes_orchestrator(self)
return await orchestrator.create_workflow(
name, description, workflow_type,
max_concurrent_tasks, timeout_seconds, retry_count, context
)
async def add_task_to_workflow(self, workflow_id: str, task_name: str,
task_type: str, skill_name: str = None,
tool_name: str = None, parameters: Dict[str, Any] = None,
depends_on: str = None, parallel_group: str = None,
timeout_seconds: int = 300, retry_count: int = 2,
order_index: int = 0,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Add a task to an existing workflow"""
from .orchestrator import get_hermes_orchestrator
orchestrator = get_hermes_orchestrator(self)
return await orchestrator.add_task_to_workflow(
workflow_id, task_name, task_type, skill_name,
tool_name, parameters, depends_on, parallel_group,
timeout_seconds, retry_count, order_index, context
)
async def execute_workflow(self, workflow_id: str,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Execute a complete workflow with proper orchestration"""
from .orchestrator import get_hermes_orchestrator
orchestrator = get_hermes_orchestrator(self)
return await orchestrator.execute_workflow(workflow_id, context)
async def list_workflows(self, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""List workflows for current user"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
workflows = await sor.R('hermes_workflows', {'sort': 'created_at desc', 'user_id': user_id})
return {"success": True, "workflows": workflows or [], "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
async def list_executions(self, workflow_id: str = None,
limit: int = 100, offset: int = 0,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""List executions for current user (optionally filtered by workflow)"""
user_id = self._get_current_user_id(context) if context else "anonymous"
try:
env = ServerEnv()
dbname = env.get_module_dbname('harnessed_agent')
config = getConfig()
db = DBPools()
db.databases = config.databases
async with db.sqlorContext(dbname) as sor:
filters = {'user_id': user_id}
if workflow_id:
filters['workflow_id'] = workflow_id
executions = await sor.R('hermes_executions', {'sort': 'created_at desc', 'user_id': user_id} if not workflow_id else {'sort': 'created_at desc', 'user_id': user_id, 'workflow_id': workflow_id})
executions = executions or []
executions = executions[offset:offset + limit]
return {"success": True, "executions": executions, "user_id": user_id}
except Exception as e:
return {"success": False, "error": str(e), "user_id": user_id}
# Global instance for module functions
_hermes_instance = None
def get_harnessed_agent():
"""Get or create the global Hermes agent instance"""
global _hermes_instance
if _hermes_instance is None:
_hermes_instance = HermesAgent()
return _hermes_instance
# Exposed async functions for frontend integration
# These functions expect the ahserver context to be passed automatically
async def harnessed_execute_tool(tool_name: str, parameters: Dict[str, Any]):
"""Execute a Hermes tool with current user context"""
agent = get_harnessed_agent()
return await agent.execute_tool_call(tool_name, parameters)
async def harnessed_manage_memory(action: str, target: str, content: str = "", old_text: str = "",
priority: Optional[int] = None):
"""Manage Hermes memory with current user context and intelligent filtering"""
agent = get_harnessed_agent()
return await agent.manage_memory(action, target, content, old_text, priority=priority)
async def harnessed_get_intelligent_memory_context(current_task: str = "", max_tokens: Optional[int] = None):
"""Get intelligent memory context optimized for current task and token usage"""
agent = get_harnessed_agent()
return await agent.get_intelligent_memory_context(current_task, max_tokens=max_tokens)
async def harnessed_search_sessions(query: str = "", limit: int = 3):
"""Search Hermes sessions with current user context"""
agent = get_harnessed_agent()
return await agent.search_sessions(query, limit)
async def harnessed_manage_skills(action: str, name: str, **kwargs):
"""Manage local Hermes skills with current user context"""
agent = get_harnessed_agent()
return await agent.manage_skills(action, name, **kwargs)
async def harnessed_manage_remote_skills(action: str, skill_id: str = None, **kwargs):
"""Manage remote Hermes skills with SSH deployment and execution"""
agent = get_harnessed_agent()
return await agent.manage_remote_skills(action, skill_id, **kwargs)
async def harnessed_get_config():
"""Get Hermes configuration"""
agent = get_harnessed_agent()
return {
"work_dir": agent.config.work_dir,
"skills_path": agent.config.skills_path,
"max_memory_tokens": agent.config.max_memory_tokens,
"default_priority": agent.config.default_priority,
"high_priority_threshold": agent.config.high_priority_threshold,
"low_priority_threshold": agent.config.low_priority_threshold,
"auto_cleanup_enabled": agent.config.auto_cleanup_enabled,
"min_retention_days": agent.config.min_retention_days
}
# Helper function to get current user from ahserver context
async def harnessed_get_current_user():
"""Get current user information from ahserver context"""
try:
from ahserver.serverenv import ServerEnv
env = ServerEnv()
user_id = getattr(env, 'user_id', None) or getattr(env, 'userid', None)
return {"user_id": user_id} if user_id else {"user_id": None}
except:
return {"user_id": None}
# Orchestrator functions
async def harnessed_create_workflow(name: str, description: str = "",
workflow_type: str = "sequential",
max_concurrent_tasks: int = 3,
timeout_seconds: int = 1800,
retry_count: int = 2):
"""Create a new workflow definition"""
agent = get_harnessed_agent()
return await agent.create_workflow(name, description, workflow_type,
max_concurrent_tasks, timeout_seconds, retry_count)
async def harnessed_add_task_to_workflow(workflow_id: str, task_name: str,
task_type: str, skill_name: str = None,
tool_name: str = None, parameters: Dict[str, Any] = None,
depends_on: str = None, parallel_group: str = None,
timeout_seconds: int = 300, retry_count: int = 2,
order_index: int = 0):
"""Add a task to an existing workflow"""
agent = get_harnessed_agent()
return await agent.add_task_to_workflow(workflow_id, task_name, task_type, skill_name,
tool_name, parameters, depends_on, parallel_group,
timeout_seconds, retry_count, order_index)
async def harnessed_execute_workflow(workflow_id: str):
"""Execute a complete workflow with proper orchestration"""
agent = get_harnessed_agent()
return await agent.execute_workflow(workflow_id)
async def harnessed_list_workflows():
"""List workflows for current user"""
agent = get_harnessed_agent()
return await agent.list_workflows()
async def harnessed_list_executions(workflow_id: str = None, limit: int = 100, offset: int = 0):
"""List executions for current user (optionally filtered by workflow)"""
agent = get_harnessed_agent()
return await agent.list_executions(workflow_id, limit, offset)