""" hermes-web-cli module - Complete business logic implementation. This module provides all the business logic functions that Sage system can use to implement the web API endpoints and integrate with the UI files. The .ui files in wwwroot/ contain static JSON configurations that reference endpoints like "/hermes-web-cli/services". Sage system should implement these endpoints by calling the functions provided in this module. """ import json import uuid import asyncio import aiohttp from typing import Dict, List, Optional, Tuple from datetime import datetime # Import sqlor database module from sqlor.dbpools import get_sor_context, DBPools from ahserver.serverenv import ServerEnv # Import database table definitions and CRUD operations from .db_tables import TABLE_DEFINITIONS from .crud_ops import SERVICES_CRUD, SESSIONS_CRUD, SETTINGS_CRUD def load_hermes_web_cli(): """Initialize and load the hermes-web-cli module. This function is called by Sage system during module loading. It registers all module functions with the ServerEnv instance so they can be called directly from .ui and .dspy files. """ # Get the ServerEnv instance env = ServerEnv() # Register all module functions with ServerEnv env.get_setting = get_setting env.save_setting = save_setting env.get_all_services = get_all_services env.create_service = create_service env.delete_service = delete_service env.get_service_by_id = get_service_by_id env.test_service_connection = test_service_connection env.create_session = create_session env.send_message_to_service = send_message_to_service env.get_session_messages = get_session_messages env.get_active_sessions = get_active_sessions env.get_recent_sessions = get_recent_sessions env.get_session_by_id = get_session_by_id env.validate_service_url = validate_service_url env.generate_session_id = generate_session_id return True # Database operations using sqlor-database-module async def get_all_services(orgid: str) -> List[Dict]: """Get all registered Hermes services for the specified organization from database. Args: orgid: The ID of the organization whose services to retrieve Returns: List of service dictionaries belonging to the specified organization """ try: # Query services table with orgid filter using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['read_all']['sql_template'] recs = await sor.sqlExe(sql_template, {'orgid': orgid}) # Convert datetime objects to ISO format strings for JSON serialization result = [] for rec in recs: result.append(dict(rec)) return result except Exception as e: print(f"Error getting services: {str(e)}") return [] async def create_service(name: str, url: str, orgid: str, description: str = "", apikey: str = "") -> str: """Create a new Hermes service registration for the specified organization. Args: name: Service name url: Service URL orgid: The ID of the organization creating the service description: Service description (optional) apikey: API key for the service (optional) Returns: The created service ID """ try: # Validate service URL if not await validate_service_url(url): raise ValueError("Invalid service URL") service_id = str(uuid.uuid4()) # Save to database using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['create']['sql_template'] await sor.sqlExe(sql_template, { 'id': service_id, 'orgid': orgid, 'name': name, 'service_url': url, 'description': description, 'apikey': apikey, 'status': 'active' }) return service_id except Exception as e: print(f"Error creating service: {str(e)}") raise async def delete_service(service_id: str, orgid: str) -> bool: """Delete a Hermes service registration (only if owned by specified organization). Args: service_id: The ID of the service to delete orgid: The ID of the organization attempting deletion Returns: True if deleted successfully, False otherwise """ try: # Verify service belongs to current org before deletion service = await get_service_by_id(service_id, orgid) if not service: return False if service.get("orgid") != orgid: print(f"Permission denied: Service {service_id} does not belong to org {orgid}") return False # Delete from database using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['delete']['sql_template'] await sor.sqlExe(sql_template, { 'service_id': service_id, 'orgid': orgid }) # Also delete associated sessions env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: await sor.sqlExe(""" DELETE FROM sessions WHERE service_id = ${service_id}$ """, { 'service_id': service_id }) return True except Exception as e: print(f"Error deleting service: {str(e)}") return False async def get_service_by_id(service_id: str, orgid: str) -> Optional[Dict]: """Get service configuration by ID (only if owned by specified organization). Args: service_id: The ID of the service to retrieve orgid: The ID of the organization requesting the service Returns: Service dictionary if found and owned by org, None otherwise """ try: # Query database directly with orgid filter for security db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SERVICES_CRUD['operations']['read_by_id']['sql_template'] recs = await sor.sqlExe(sql_template, { 'service_id': service_id, 'orgid': orgid }) if len(recs) > 0: return dict(recs[0]) return None except Exception as e: print(f"Error getting service: {str(e)}") return None # Service connection testing async def test_service_connection(service_id: str, orgid: str = "") -> Tuple[bool, str]: """Test connection to a Hermes service endpoint. Args: service_id: The ID of the service to test orgid: The ID of the organization (optional, for org-scoped lookup) Returns: Tuple[bool, str]: (is_connected, status_message) """ try: # Get service configuration from database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: if orgid: sql_template = SERVICES_CRUD['operations']['read_by_id']['sql_template'] recs = await sor.sqlExe(sql_template, { 'service_id': service_id, 'orgid': orgid }) else: recs = await sor.sqlExe(""" SELECT id, orgid, name, service_url, description, apikey, status, created_at, updated_at FROM hermes_services WHERE id = ${service_id}$ """, { 'service_id': service_id }) if not recs: return False, "Service not found" service = dict(recs[0]) url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = {} if apikey: headers["Authorization"] = f"Bearer {apikey}" # Test the /health endpoint or similar timeout = aiohttp.ClientTimeout(total=10) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(f"{url.rstrip('/')}/health", headers=headers) as response: if response.status == 200: return True, "Connected" else: return False, f"HTTP {response.status}" except asyncio.TimeoutError: return False, "Connection timeout" except aiohttp.ClientConnectorError: return False, "Connection refused" except Exception as e: return False, f"Error: {str(e)}" # Session management async def create_session(service_id: str, user_id: str, orgid: str, user_message: str = "") -> str: """Create a new session with a Hermes service.""" try: # Get service configuration (verify it belongs to current org) service = await get_service_by_id(service_id, orgid) if not service: raise ValueError(f"Service {service_id} not found or access denied") service_url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = { "Content-Type": "application/json" } # Add Authorization header if API key is provided if apikey: headers["Authorization"] = f"Bearer {apikey}" # Call remote service API to create session timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( f"{service_url.rstrip('/')}/api/v1/sessions", json={ "user_id": user_id, "initial_message": user_message if user_message else None }, headers=headers ) as response: response.raise_for_status() result = await response.json() # Get the session ID from the remote service remote_session_id = result.get("session_id", "") if not remote_session_id: raise ValueError("Remote service did not return a session ID") # Create local session record in database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['create']['sql_template'] await sor.sqlExe(sql_template, { 'session_id': remote_session_id, 'user_id': user_id, 'service_id': service_id, 'session_name': None, 'service_name': service.get("name", "Unknown Service") }) # Return the session ID from the remote service return remote_session_id except Exception as e: print(f"Error creating session: {str(e)}") raise async def send_message_to_service(service_id: str, session_id: str, message: str, user_id: str, orgid: str) -> Dict: """Send a message to a Hermes service and get response (only if session owned by specified user). Args: service_id: The service ID session_id: The session ID message: The message to send user_id: The ID of the user sending the message orgid: The ID of the organization Returns: Response from the service """ try: # Verify session belongs to current user before sending message session = await get_session_by_id(session_id, user_id) if not session: raise ValueError(f"Session {session_id} not found or access denied for user {user_id}") service = await get_service_by_id(session['service_id'], orgid) if not service: raise ValueError(f"Service for session {session_id} not found or access denied for org {orgid}") service_url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = { "Content-Type": "application/json" } # Add Authorization header if API key is provided if apikey: headers["Authorization"] = f"Bearer {apikey}" # Call remote service API timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( f"{service_url.rstrip('/')}/api/v1/sessions/{session_id}/messages", json={ "message": message }, headers=headers ) as response: response.raise_for_status() return await response.json() except Exception as e: print(f"Error sending message: {e}") raise async def get_session_messages(session_id: str, user_id: str, orgid: str) -> List[Dict]: """Get all messages for a session (only if session owned by specified user). Args: session_id: The session ID user_id: The ID of the user requesting messages orgid: The ID of the organization Returns: List of message dictionaries """ try: # Verify session belongs to current user before getting messages session = await get_session_by_id(session_id, user_id) if not session: print(f"Session {session_id} not found or access denied for user {user_id}") return [] # Get the associated service (verify org access) service = await get_service_by_id(session['service_id'], orgid) if not service: print(f"Service for session {session_id} not found or access denied") return [] service_url = service["service_url"] apikey = service.get("apikey", "") # Prepare headers headers = { "Content-Type": "application/json" } # Add Authorization header if API key is provided if apikey: headers["Authorization"] = f"Bearer {apikey}" # Call remote service API to get messages timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get( f"{service_url.rstrip('/')}/api/v1/sessions/{session_id}/messages", headers=headers ) as response: response.raise_for_status() messages = await response.json() # Update session last_active timestamp and message count in local database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: await sor.sqlExe(""" UPDATE sessions SET last_active = CURRENT_TIMESTAMP, message_count = ${message_count}$ WHERE session_id = ${session_id}$ AND user_id = ${user_id}$ """, { 'session_id': session_id, 'user_id': user_id, 'message_count': len(messages) }) return messages except Exception as e: print(f"Error getting session messages: {str(e)}") return [] async def get_active_sessions(user_id: str) -> List[Dict]: """Get all active sessions for the specified user from database. Args: user_id: The ID of the user whose active sessions to retrieve Returns: List of active session dictionaries """ try: # Query the sessions table for active sessions belonging to current user using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['read_active']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id}) # Convert datetime objects to ISO format strings for JSON serialization result = [] for rec in recs: result.append(dict(rec)) return result except Exception as e: print(f"Error getting active sessions: {str(e)}") return [] async def get_recent_sessions(user_id: str, limit: int = 5) -> List[Dict]: """Get recent sessions for the specified user from database, ordered by creation time (most recent first). Args: user_id: The ID of the user whose recent sessions to retrieve limit: Maximum number of sessions to return (default: 5) Returns: List of recent session dictionaries """ try: # Query the sessions table for recent sessions belonging to current user using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['read_recent']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id, 'limit': limit}) # Convert datetime objects to ISO format strings for JSON serialization result = [] for rec in recs: result.append(dict(rec)) return result except Exception as e: print(f"Error getting recent sessions: {str(e)}") return [] async def get_session_by_id(session_id: str, user_id: str) -> Optional[Dict]: """Get session details by session ID (only if owned by specified user). Args: session_id: The session ID to retrieve user_id: The ID of the user requesting the session Returns: Session dictionary if found and owned by user, None otherwise """ try: # Query database directly with user_id filter for security db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SESSIONS_CRUD['operations']['read_by_id']['sql_template'] recs = await sor.sqlExe(sql_template, { 'session_id': session_id, 'user_id': user_id }) if len(recs) > 0: return dict(recs[0]) return None except Exception as e: print(f"Error getting session by ID: {str(e)}") return None # Database operations using sqlor-database-module def validate_service_url(url: str) -> bool: """Validate if a URL is a valid Hermes service endpoint.""" if not url.startswith(('http://', 'https://')): return False # Additional validation can be added here return True def generate_session_id() -> str: """Generate a unique session ID.""" return getID() # Settings management async def get_setting(user_id: str) -> Dict: """Get user settings from database or return defaults. Args: user_id: The ID of the user whose settings to retrieve Returns: User settings dictionary """ import json default_settings = { "security": { "require_auth": False, "encrypt_storage": False }, "general": { "default_model": "", "session_timeout": 30, "auto_save": True }, "appearance": { "theme": "dark" } } try: # Query user settings from database db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SETTINGS_CRUD['operations']['read']['sql_template'] recs = await sor.sqlExe(sql_template, {'user_id': user_id}) if len(recs) > 0: settings_json = recs[0]['settings_json'] if settings_json: saved_settings = json.loads(settings_json) # Merge with defaults to ensure all keys exist for section, defaults in default_settings.items(): if section not in saved_settings: saved_settings[section] = defaults else: for key, value in defaults.items(): if key not in saved_settings[section]: saved_settings[section][key] = value return saved_settings except Exception as e: print(f"Error getting settings: {str(e)}") # Fall back to defaults on error return default_settings async def save_setting(section: str, key: str, value, user_id: str) -> bool: """Save a specific setting value to user\'s database record. Args: section: Settings section name key: Setting key name value: Setting value user_id: The ID of the user whose settings to update Returns: True if saved successfully, False otherwise """ import json # Load existing settings or start with defaults settings = await get_setting(user_id) # Update the specific setting if section not in settings: settings[section] = {} settings[section][key] = value try: # Save to database using sqlor-database-module db = DBPools() env = ServerEnv() dbname = env.get_module_dbname('hermes-web-cli') async with db.sqlorContext(dbname) as sor: sql_template = SETTINGS_CRUD['operations']['create_or_update']['sql_template'] await sor.sqlExe(sql_template, { 'user_id': user_id, 'settings_json': json.dumps(settings) }) return True except Exception as e: print(f"Error saving settings: {str(e)}") return False # Module metadata MODULE_NAME = "hermes-web-cli" MODULE_VERSION = "0.2.0" # Export all public functions __all__ = [ 'load_hermes_web_cli', 'get_all_services', 'create_service', 'delete_service', 'get_service_by_id', 'test_service_connection', 'create_session', 'send_message_to_service', 'get_session_messages', 'get_active_sessions', 'get_recent_sessions', 'get_session_by_id', 'validate_service_url', 'generate_session_id', 'get_setting', 'save_setting', 'MODULE_NAME', 'MODULE_VERSION' ]