hermes-service/main.py
yumoqing 7d70f362b2 feat: Initial implementation of hermes-service with multi-user support
- Complete REST API with session management
- Dynamic user creation with isolated environments
- Multi-user isolation using /d/hermesai/users/{user_id}/.hermes structure
- Full command execution capabilities via Hermes CLI
- Health check and status endpoints
- Follows module development specifications
2026-04-21 13:20:10 +08:00

245 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
Hermes Service with complete session messaging support
"""
import os
import sys
import asyncio
import uuid
from datetime import datetime
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
import json
import shutil
# Base Hermes Agent path
BASE_HERMES_PATH = "/d/hermesai/.hermes/hermes-agent"
# Clean user data directory structure: /d/hermesai/users/{user_id}/.hermes
USERS_BASE = "/d/hermesai/users"
app = FastAPI(title="Hermes Service API", version="1.2.0")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# In-memory session storage for message history
active_sessions = {}
def get_user_hermes_path(user_id: str) -> str:
"""Get isolated Hermes environment path for a user"""
if not user_id or user_id == "anonymous":
user_id = "anonymous"
safe_user_id = "".join(c for c in user_id if c.isalnum() or c in "-_.")
return os.path.join(USERS_BASE, safe_user_id)
def ensure_user_hermes_env(user_id: str):
"""Ensure user has isolated Hermes environment"""
user_base_path = get_user_hermes_path(user_id)
user_hermes_path = os.path.join(user_base_path, "hermes-agent")
user_dot_hermes = os.path.join(user_base_path, ".hermes")
if not os.path.exists(user_hermes_path):
os.makedirs(user_base_path, exist_ok=True, mode=0o700)
shutil.copytree(
BASE_HERMES_PATH,
user_hermes_path,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns('.git', '__pycache__', '*.pyc', '.venv', 'web_dist')
)
os.makedirs(user_dot_hermes, exist_ok=True, mode=0o700)
venv_link = os.path.join(user_hermes_path, '.venv')
if not os.path.exists(venv_link):
os.symlink(
os.path.join(BASE_HERMES_PATH, '.venv'),
venv_link
)
return user_hermes_path
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "hermes-service", "multi_user": True}
@app.get("/api/v1/status")
async def get_hermes_status():
try:
result = await execute_hermes_command(["--version"], user_id=None)
return {"status": "running", "version": result.get("stdout", "").strip()}
except Exception as e:
return {"status": "error", "error": str(e)}
class SessionCreateRequest(BaseModel):
user_id: str
initial_message: Optional[str] = None
class CommandRequest(BaseModel):
command: list[str]
user_context: Optional[Dict[str, Any]] = None
timeout: int = 300
class SessionMessageRequest(BaseModel):
message: str
user_context: Optional[Dict[str, Any]] = None
@app.post("/api/v1/sessions")
async def create_session(request: SessionCreateRequest):
if not request.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
session_id = str(uuid.uuid4())
user_hermes_path = ensure_user_hermes_env(request.user_id)
active_sessions[session_id] = {
"id": session_id,
"user_id": request.user_id,
"created_at": datetime.now().isoformat(),
"messages": [],
"status": "active"
}
if request.initial_message:
active_sessions[session_id]["messages"].append({
"role": "user",
"content": request.initial_message,
"timestamp": datetime.now().isoformat()
})
return {
"session_id": session_id,
"user_id": request.user_id,
"hermes_path": user_hermes_path,
"status": "created"
}
@app.post("/api/v1/execute")
async def execute_command(request: CommandRequest):
user_id = None
if request.user_context:
user_id = request.user_context.get("user_id")
result = await execute_hermes_command(
request.command,
user_id=user_id,
timeout=request.timeout
)
if not result["success"]:
raise HTTPException(status_code=500, detail=result["stderr"])
return result
@app.post("/api/v1/sessions/{session_id}/messages")
async def send_session_message(session_id: str, request: SessionMessageRequest):
if session_id not in active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session_data = active_sessions[session_id]
user_id = session_data["user_id"]
session_data["messages"].append({
"role": "user",
"content": request.message,
"timestamp": datetime.now().isoformat()
})
# Correct way to send chat messages to Hermes
# Use the chat subcommand with the message as direct argument
command_args = ["chat", request.message]
result = await execute_hermes_command(
command_args,
user_id=user_id,
timeout=300
)
response_content = result.get("stdout", "") if result["success"] else result.get("stderr", "Command failed")
session_data["messages"].append({
"role": "assistant",
"content": response_content,
"timestamp": datetime.now().isoformat()
})
return {
"session_id": session_id,
"response": response_content,
"success": result["success"],
"message_count": len(session_data["messages"])
}
@app.get("/api/v1/sessions/{session_id}")
async def get_session(session_id: str):
if session_id not in active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session_data = active_sessions[session_id].copy()
session_data.pop("id", None)
return session_data
async def execute_hermes_command(command_args, user_id=None, timeout=300):
try:
if user_id:
user_base_path = get_user_hermes_path(user_id)
user_hermes_path = os.path.join(user_base_path, "hermes-agent")
hermes_dot_path = os.path.join(user_base_path, ".hermes")
else:
user_hermes_path = BASE_HERMES_PATH
hermes_dot_path = "/d/hermesai/.hermes"
python_path = "/d/hermesai/.hermes/hermes-agent/.venv/bin/python3"
cmd = [python_path, "-m", "hermes_cli.main"] + command_args
env = os.environ.copy()
env['HOME'] = hermes_dot_path
env['HERMES_USER_ID'] = str(user_id or 'anonymous')
env['HERMES_SESSION_ID'] = str(uuid.uuid4())
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=user_hermes_path,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
return {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'returncode': process.returncode
}
except asyncio.TimeoutError:
process.kill()
await process.wait()
return {
'success': False,
'stdout': '',
'stderr': f'Command timed out after {timeout} seconds',
'returncode': -1
}
except Exception as e:
return {
'success': False,
'stdout': '',
'stderr': str(e),
'returncode': -1
}
os.makedirs(USERS_BASE, exist_ok=True, mode=0o755)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=9120, log_level="info")