""" hermes-web-cli module - Complete business logic implementation. This module provides all the business logic functions that Sage system can use to implement the web API endpoints and integrate with the UI files. The .ui files in wwwroot/ contain static JSON configurations that reference endpoints like "/hermes-web-cli/services". Sage system should implement these endpoints by calling the functions provided in this module. """ import json import asyncio import aiohttp from typing import Dict, List, Optional, Tuple from datetime import datetime # Import sqlor database module from sqlor.dbpools import get_sor_context # Import database table definitions and CRUD operations from .db_tables import TABLE_DEFINITIONS from .crud_ops import SERVICES_CRUD, SESSIONS_CRUD, SETTINGS_CRUD def load_hermes_web_cli(): """Initialize and load the hermes-web-cli module. This function is called by Sage system during module loading. It registers all module functions with the ServerEnv instance so they can be called directly from .ui and .dspy files. """ from ahserver.serverenv import ServerEnv # Get the ServerEnv instance env = ServerEnv() # Register all module functions with ServerEnv env.get_setting = get_setting env.save_setting = save_setting env.get_all_services = get_all_services env.create_service = create_service env.delete_service = delete_service env.get_service_by_id = get_service_by_id env.test_service_connection = test_service_connection env.create_session = create_session env.send_message_to_service = send_message_to_service env.get_session_messages = get_session_messages env.get_active_sessions = get_active_sessions env.get_recent_sessions = get_recent_sessions env.get_session_by_id = get_session_by_id env.validate_service_url = validate_service_url env.generate_session_id = generate_session_id return True # Database operations using sqlor-database-module async def get_all_services(user_id: str) -> List[Dict]: """Get all registered Hermes services for the specified user from database. Args: user_id: The ID of the user whose services to retrieve Returns: List of service dictionaries belonging to the specified user """ try: # Query services table with user_id filter using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['read_all']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id}) # Convert datetime objects to ISO format strings for JSON serialization result = [] for rec in recs: service_dict = dict(rec) if 'created_at' in service_dict and service_dict['created_at']: service_dict['created_at'] = service_dict['created_at'].isoformat() if 'updated_at' in service_dict and service_dict['updated_at']: service_dict['updated_at'] = service_dict['updated_at'].isoformat() result.append(service_dict) return result except Exception as e: print(f"Error getting services: {str(e)}") return [] async def create_service(name: str, url: str, user_id: str, description: str = "", apikey: str = "") -> str: """Create a new Hermes service registration for the specified user. Args: name: Service name url: Service URL user_id: The ID of the user creating the service description: Service description (optional) apikey: API key for the service (optional) Returns: The created service ID """ try: # Validate service URL if not await validate_service_url(url): raise ValueError("Invalid service URL") service_id = str(uuid.uuid4()) # Save to database using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['create']['sql_template'] await sor.sqlExe(sql_template, { 'id': service_id, 'user_id': user_id, 'name': name, 'service_url': url, 'description': description, 'apikey': apikey, 'status': 'active' }) return service_id except Exception as e: print(f"Error creating service: {str(e)}") raise async def delete_service(service_id: str, user_id: str) -> bool: """Delete a Hermes service registration (only if owned by specified user). Args: service_id: The ID of the service to delete user_id: The ID of the user attempting deletion Returns: True if deleted successfully, False otherwise """ try: # Verify service belongs to current user before deletion service = await get_service_by_id(service_id, user_id) if not service: return False if service.get("user_id") != user_id: print(f"Permission denied: Service {service_id} does not belong to user {user_id}") return False # Delete from database using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['delete']['sql_template'] await sor.sqlExe(sql_template, { 'service_id': service_id, 'user_id': user_id }) # Also delete associated sessions env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: await sor.sqlExe(""" DELETE FROM sessions WHERE service_id = ${service_id}$ AND user_id = ${user_id}$ """, { 'service_id': service_id, 'user_id': user_id }) return True except Exception as e: print(f"Error deleting service: {str(e)}") return False async def get_service_by_id(service_id: str, user_id: str) -> Optional[Dict]: """Get service configuration by ID (only if owned by specified user). Args: service_id: The ID of the service to retrieve user_id: The ID of the user requesting the service Returns: Service dictionary if found and owned by user, None otherwise """ try: # Query database directly with user_id filter for security db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['read_by_id']['sql_template'] recs = await sor.sqlExe(sql_template, { 'service_id': service_id, 'user_id': user_id }) if len(recs) > 0: service_dict = dict(recs[0]) if 'created_at' in service_dict and service_dict['created_at']: service_dict['created_at'] = service_dict['created_at'].isoformat() if 'updated_at' in service_dict and service_dict['updated_at']: service_dict['updated_at'] = service_dict['updated_at'].isoformat() return service_dict return None except Exception as e: print(f"Error getting service: {str(e)}") return None # Service connection testing async def test_service_connection(service_id: str) -> Tuple[bool, str]: """Test connection to a Hermes service endpoint. Args: service_id: The ID of the service to test Returns: Tuple[bool, str]: (is_connected, status_message) """ try: # Get service configuration from database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['read_by_id']['sql_template'] recs = await sor.sqlExe(sql_template, { 'service_id': service_id, 'user_id': '' }) if not recs: return False, "Service not found" service = dict(recs[0]) url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = {} if apikey: headers["Authorization"] = f"Bearer {apikey}" # Test the /health endpoint or similar timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(f"{url.rstrip('/')}/health", headers=headers) as response: if response.status == 200: return True, "Connected" else: return False, f"HTTP {response.status}" except asyncio.TimeoutError: return False, "Connection timeout" except aiohttp.ClientConnectorError: return False, "Connection refused" except Exception as e: return False, f"Error: {str(e)}" # Session management async def create_session(service_id: str, user_id: str, user_message: str = "") -> str: """Create a new session with a Hermes service.""" try: # Get service configuration (verify it belongs to current user) service = await get_service_by_id(service_id, user_id) if not service: raise ValueError(f"Service {service_id} not found or access denied") service_url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = { "Content-Type": "application/json" } # Add Authorization header if API key is provided if apikey: headers["Authorization"] = f"Bearer {apikey}" # Call remote service API to create session timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( f"{service_url.rstrip('/')}/api/v1/sessions", json={ "user_id": user_id, "initial_message": user_message if user_message else None }, headers=headers ) as response: response.raise_for_status() result = await response.json() # Get the session ID from the remote service remote_session_id = result.get("session_id", "") if not remote_session_id: raise ValueError("Remote service did not return a session ID") # Create local session record in database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['create']['sql_template'] await sor.sqlExe(sql_template, { 'session_id': remote_session_id, 'user_id': user_id, 'service_id': service_id, 'session_name': None, 'service_name': service.get("name", "Unknown Service") }) # Return the session ID from the remote service return remote_session_id except Exception as e: print(f"Error creating session: {str(e)}") raise async def send_message_to_service(service_id: str, session_id: str, message: str, user_id: str) -> Dict: """Send a message to a Hermes service and get response (only if session owned by specified user). Args: service_id: The service ID session_id: The session ID message: The message to send user_id: The ID of the user sending the message Returns: Response from the service """ try: # Verify session belongs to current user before sending message session = await get_session_by_id(session_id, user_id) if not session: raise ValueError(f"Session {session_id} not found or access denied for user {user_id}") service = await get_service_by_id(session['service_id'], user_id) if not service: raise ValueError(f"Service for session {session_id} not found or access denied for user {user_id}") service_url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = { "Content-Type": "application/json" } # Add Authorization header if API key is provided if apikey: headers["Authorization"] = f"Bearer {apikey}" # Call remote service API timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( f"{service_url.rstrip('/')}/api/v1/sessions/{session_id}/messages", json={ "message": message }, headers=headers ) as response: response.raise_for_status() return await response.json() except Exception as e: print(f"Error sending message: {e}") raise async def get_session_messages(session_id: str, user_id: str) -> List[Dict]: """Get all messages for a session (only if session owned by specified user). Args: session_id: The session ID user_id: The ID of the user requesting messages Returns: List of message dictionaries """ try: # Verify session belongs to current user before getting messages session = await get_session_by_id(session_id) if not session: print(f"Session {session_id} not found or access denied for user {user_id}") return [] # Get the associated service service = await get_service_by_id(session['service_id'], user_id) if not service: print(f"Service for session {session_id} not found or access denied") return [] service_url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = { "Content-Type": "application/json" } # Add Authorization header if API key is provided if apikey: headers["Authorization"] = f"Bearer {apikey}" # Call remote service API to get messages timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get( f"{service_url.rstrip('/')}/api/v1/sessions/{session_id}/messages", headers=headers ) as response: response.raise_for_status() messages = await response.json() # Update session last_active timestamp and message count in local database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: await sor.sqlExe(""" UPDATE sessions SET last_active = CURRENT_TIMESTAMP, message_count = ${message_count}$ WHERE session_id = ${session_id}$ AND user_id = ${user_id}$ """, { 'session_id': session_id, 'user_id': user_id, 'message_count': len(messages) }) return messages except Exception as e: print(f"Error getting session messages: {str(e)}") return [] async def get_active_sessions(user_id: str) -> List[Dict]: """Get all active sessions for the specified user from database. Args: user_id: The ID of the user whose active sessions to retrieve Returns: List of active session dictionaries """ try: # Query the sessions table for active sessions belonging to current user using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['read_active']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id}) # Convert datetime objects to ISO format strings for JSON serialization result = [] for rec in recs: session_dict = dict(rec) if 'created_at' in session_dict and session_dict['created_at']: session_dict['created_at'] = session_dict['created_at'].isoformat() if 'last_active' in session_dict and session_dict['last_active']: session_dict['last_active'] = session_dict['last_active'].isoformat() result.append(session_dict) return result except Exception as e: print(f"Error getting active sessions: {str(e)}") return [] async def get_recent_sessions(user_id: str, limit: int = 5) -> List[Dict]: """Get recent sessions for the specified user from database, ordered by creation time (most recent first). Args: user_id: The ID of the user whose recent sessions to retrieve limit: Maximum number of sessions to return (default: 5) Returns: List of recent session dictionaries """ try: # Query the sessions table for recent sessions belonging to current user using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['read_recent']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id, 'limit': limit}) # Convert datetime objects to ISO format strings for JSON serialization result = [] for rec in recs: session_dict = dict(rec) if 'created_at' in session_dict and session_dict['created_at']: session_dict['created_at'] = session_dict['created_at'].isoformat() if 'last_active' in session_dict and session_dict['last_active']: session_dict['last_active'] = session_dict['last_active'].isoformat() result.append(session_dict) return result except Exception as e: print(f"Error getting recent sessions: {str(e)}") return [] async def get_session_by_id(session_id: str, user_id: str) -> Optional[Dict]: """Get session details by session ID (only if owned by specified user). Args: session_id: The session ID to retrieve user_id: The ID of the user requesting the session Returns: Session dictionary if found and owned by user, None otherwise """ try: # Query database directly with user_id filter for security db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['read_by_id']['sql_template'] recs = await sor.sqlExe(sql_template, { 'session_id': session_id, 'user_id': user_id }) if len(recs) > 0: session_dict = dict(recs[0]) if 'created_at' in session_dict and session_dict['created_at']: session_dict['created_at'] = session_dict['created_at'].isoformat() if 'last_active' in session_dict and session_dict['last_active']: session_dict['last_active'] = session_dict['last_active'].isoformat() return session_dict return None except Exception as e: print(f"Error getting session by ID: {str(e)}") return None # Database operations using sqlor-database-module def validate_service_url(url: str) -> bool: """Validate if a URL is a valid Hermes service endpoint.""" if not url.startswith(('http://', 'https://')): return False # Additional validation can be added here return True def generate_session_id() -> str: """Generate a unique session ID.""" return getID() # Settings management async def get_setting(user_id: str) -> Dict: """Get user settings from database or return defaults. Args: user_id: The ID of the user whose settings to retrieve Returns: User settings dictionary """ import json default_settings = { "security": { "require_auth": False, "encrypt_storage": False }, "general": { "default_model": "", "session_timeout": 30, "auto_save": True }, "appearance": { "theme": "dark" } } try: # Query user settings from database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SETTINGS_CRUD['operations']['read']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id}) if len(recs) > 0: settings_json = recs[0]['settings_json'] if settings_json: saved_settings = json.loads(settings_json) # Merge with defaults to ensure all keys exist for section, defaults in default_settings.items(): if section not in saved_settings: saved_settings[section] = defaults else: for key, value in defaults.items(): if key not in saved_settings[section]: saved_settings[section][key] = value return saved_settings except Exception as e: print(f"Error getting settings: {str(e)}") # Fall back to defaults on error return default_settings async def save_setting(section: str, key: str, value, user_id: str) -> bool: """Save a specific setting value to user\'s database record. Args: section: Settings section name key: Setting key name value: Setting value user_id: The ID of the user whose settings to update Returns: True if saved successfully, False otherwise """ import json # Load existing settings or start with defaults settings = await get_setting(user_id) # Update the specific setting if section not in settings: settings[section] = {} settings[section][key] = value try: # Save to database using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname() async with db.sqlorContext(dbname) as sor: sql_template = SETTINGS_CRUD['operations']['create_or_update']['sql_template'] await sor.sqlExe(sql_template, { 'user_id': user_id, 'settings_json': json.dumps(settings) }) return True except Exception as e: print(f"Error saving settings: {str(e)}") return False # Module metadata MODULE_NAME = "hermes-web-cli" MODULE_VERSION = "0.2.0" # Export all public functions __all__ = [ 'load_hermes_web_cli', 'get_all_services', 'create_service', 'delete_service', 'get_service_by_id', 'test_service_connection', 'create_session', 'send_message_to_service', 'get_session_messages', 'get_active_sessions', 'get_recent_sessions', 'get_session_by_id', 'validate_service_url', 'generate_session_id', 'get_setting', 'save_setting', 'MODULE_NAME', 'MODULE_VERSION' ]