hermes-service/main.py
yumoqing 57afe1264c feat(security): implement IP filtering and API key authentication
- Added validate_ip_and_apikey() decorator for endpoint protection
- Implemented IP address validation with configurable allowed_ips list
- Added API key authentication with header-based or bearer token support
- Fixed endpoint function signatures to properly receive Request objects
- Updated configuration structure and security documentation
- Removed debug print statements before final commit
2026-04-22 21:41:45 +08:00

387 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Hermes Service with global session management and Nginx security support
"""
import os
import sys
import asyncio
import uuid
from datetime import datetime
from pathlib import Path
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, Any, List
import json
import shutil
import ipaddress
import yaml
from functools import wraps
# Base Hermes Agent path
BASE_HERMES_PATH = "/d/hermesai/.hermes/hermes-agent"
# Clean user data directory structure: /d/hermesai/users/{user_id}/.hermes
USERS_BASE = "/d/hermesai/users"
# Load configuration
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "config.yaml")
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = yaml.safe_load(f)
else:
# Default configuration
config = {
'security': {
'enable_ip_check': False,
'allowed_ips': ['127.0.0.1', '::1'],
'enable_api_key': False,
'api_keys': [],
'api_key_header': 'X-API-Key'
},
'nginx': {
'trusted_proxies': ['127.0.0.1', '::1'],
'enable_real_ip': True
},
'service': {
'host': '127.0.0.1',
'port': 9120,
'log_level': 'info'
},
'cors': {
'allow_origins': ['*'],
'allow_credentials': True,
'allow_methods': ['*'],
'allow_headers': ['*']
}
}
print(f"Security config - IP check: {config['security']['enable_ip_check']}, API key: {config['security']['enable_api_key']}")
app = FastAPI(title="Hermes Service API", version="1.2.0")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=config['cors']['allow_origins'],
allow_credentials=config['cors']['allow_credentials'],
allow_methods=config['cors']['allow_methods'],
allow_headers=config['cors']['allow_headers'],
)
# Global session registry: global_session_id -> {user_id, local_session_id, created_at}
global_sessions = {}
def get_real_ip(request: Request) -> str:
"""Get the real client IP address, considering X-Forwarded-For header"""
if not config['nginx']['enable_real_ip']:
return request.client.host
# Check if the request comes from a trusted proxy
client_host = request.client.host
trusted_proxies = config['nginx']['trusted_proxies']
is_trusted = False
for trusted_proxy in trusted_proxies:
try:
if ipaddress.ip_address(client_host) in ipaddress.ip_network(trusted_proxy, strict=False):
is_trusted = True
break
except ValueError:
# Invalid IP or network, skip
continue
if is_trusted:
# Get the real IP from X-Forwarded-For header
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
# X-Forwarded-For can contain multiple IPs, take the first one
real_ip = forwarded_for.split(",")[0].strip()
return real_ip
return client_host
def validate_ip_and_apikey():
"""Decorator to validate IP and API key for protected endpoints"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract request object (assuming it's the first argument after self)
request = None
for arg in args:
if isinstance(arg, Request):
request = arg
break
if request is None:
# Try to find request in kwargs
request = kwargs.get('request')
if request is None:
# If no request object found, skip validation
return await func(*args, **kwargs)
# IP validation
if config['security']['enable_ip_check']:
client_ip = get_real_ip(request)
print(f"DEBUG: Client IP: {client_ip}") # Debug log
allowed = False
for allowed_ip in config['security']['allowed_ips']:
try:
if ipaddress.ip_address(client_ip) in ipaddress.ip_network(allowed_ip, strict=False):
allowed = True
break
except ValueError:
# Invalid IP or network, skip
continue
if not allowed:
raise HTTPException(status_code=403, detail="IP address not allowed")
# API Key validation
if config['security']['enable_api_key']:
provided_key = None
if config['security']['auth_method'] == 'bearer':
# Check Authorization header for Bearer token
auth_header = request.headers.get("authorization")
if auth_header and auth_header.lower().startswith("bearer "):
provided_key = auth_header[7:].strip() # Remove "Bearer " prefix
else:
# Check custom header (default: X-API-Key)
api_key_header = config['security']['api_key_header']
provided_key = request.headers.get(api_key_header)
if not provided_key:
raise HTTPException(status_code=401, detail="API key required")
valid_key = False
for key_config in config['security']['api_keys']:
if key_config['key'] == provided_key:
# Check expiration if set
if 'expires_at' in key_config and key_config['expires_at']:
# TODO: Implement expiration check
pass
valid_key = True
break
if not valid_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return await func(*args, **kwargs)
return wrapper
return decorator
def get_user_hermes_path(user_id: str) -> str:
"""Get isolated Hermes environment path for a user"""
if not user_id or user_id == "anonymous":
user_id = "anonymous"
safe_user_id = "".join(c for c in user_id if c.isalnum() or c in "-_.")
return os.path.join(USERS_BASE, safe_user_id)
def ensure_user_hermes_env(user_id: str):
"""Ensure user has isolated Hermes environment"""
user_base_path = get_user_hermes_path(user_id)
user_hermes_path = os.path.join(user_base_path, "hermes-agent")
user_dot_hermes = os.path.join(user_base_path, ".hermes")
if not os.path.exists(user_hermes_path):
os.makedirs(user_base_path, exist_ok=True, mode=0o700)
shutil.copytree(
BASE_HERMES_PATH,
user_hermes_path,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns('.git', '__pycache__', '*.pyc', '.venv', 'web_dist')
)
os.makedirs(user_dot_hermes, exist_ok=True, mode=0o700)
venv_link = os.path.join(user_hermes_path, '.venv')
if not os.path.exists(venv_link):
os.symlink(
os.path.join(BASE_HERMES_PATH, '.venv'),
venv_link
)
return user_hermes_path
@app.get("/health")
@validate_ip_and_apikey()
async def health_check():
return {"status": "healthy", "service": "hermes-service", "multi_user": True}
@app.get("/api/v1/status")
@validate_ip_and_apikey()
async def get_hermes_status(request: Request):
try:
result = await execute_hermes_command(["--version"], user_id=None)
return {"status": "running", "version": result.get("stdout", "").strip()}
except Exception as e:
return {"status": "error", "error": str(e)}
class SessionCreateRequest(BaseModel):
user_id: str
initial_message: Optional[str] = None
class CommandRequest(BaseModel):
command: list[str]
user_context: Optional[Dict[str, Any]] = None
timeout: int = 300
class SessionMessageRequest(BaseModel):
message: str
user_context: Optional[Dict[str, Any]] = None
@app.post("/api/v1/sessions")
@validate_ip_and_apikey()
async def create_session(request: SessionCreateRequest):
if not request.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
# Create global session ID
global_session_id = str(uuid.uuid4())
# Ensure user environment exists
user_hermes_path = ensure_user_hermes_env(request.user_id)
# For now, we'll use the global session ID as the local session ID
# In a production system, we might want to create a proper local session
local_session_id = global_session_id
# Register global session
global_sessions[global_session_id] = {
"user_id": request.user_id,
"local_session_id": local_session_id,
"created_at": datetime.now().isoformat(),
"hermes_path": user_hermes_path,
"status": "active"
}
return {
"session_id": global_session_id,
"user_id": request.user_id,
"hermes_path": user_hermes_path,
"status": "created"
}
@app.post("/api/v1/execute")
@validate_ip_and_apikey()
async def execute_command(request: CommandRequest):
# If no user context provided, use anonymous user
user_id = None
if request.user_context:
user_id = request.user_context.get("user_id")
result = await execute_hermes_command(
request.command,
user_id=user_id,
timeout=request.timeout
)
if not result["success"]:
raise HTTPException(status_code=500, detail=result["stderr"])
return result
@app.post("/api/v1/sessions/{session_id}/messages")
@validate_ip_and_apikey()
async def send_session_message(session_id: str, request: SessionMessageRequest):
if session_id not in global_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session_info = global_sessions[session_id]
user_id = session_info["user_id"]
local_session_id = session_info["local_session_id"]
# For chat messages, we need to think about how to properly integrate
# with Hermes' session system. For now, we'll execute commands directly.
# In production, this would interface with Hermes' internal session management.
# Execute the message as a command
command_args = ["chat", request.message]
result = await execute_hermes_command(
command_args,
user_id=user_id,
timeout=300
)
response_content = result.get("stdout", "") if result["success"] else result.get("stderr", "Command failed")
return {
"session_id": session_id,
"response": response_content,
"success": result["success"]
}
@app.get("/api/v1/sessions/{session_id}")
@validate_ip_and_apikey()
async def get_session(session_id: str):
if session_id not in global_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session_info = global_sessions[session_id].copy()
session_info.pop("hermes_path", None) # Don't expose internal paths
return session_info
async def execute_hermes_command(command_args, user_id=None, timeout=300):
try:
if user_id:
user_base_path = get_user_hermes_path(user_id)
user_hermes_path = os.path.join(user_base_path, "hermes-agent")
hermes_dot_path = os.path.join(user_base_path, ".hermes")
else:
user_hermes_path = BASE_HERMES_PATH
hermes_dot_path = "/d/hermesai/.hermes"
python_path = "/d/hermesai/.hermes/hermes-agent/.venv/bin/python3"
cmd = [python_path, "-m", "hermes_cli.main"] + command_args
env = os.environ.copy()
env['HOME'] = hermes_dot_path
env['HERMES_USER_ID'] = str(user_id or 'anonymous')
env['HERMES_SESSION_ID'] = str(uuid.uuid4())
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=user_hermes_path,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
return {
'success': process.returncode == 0,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace'),
'returncode': process.returncode
}
except asyncio.TimeoutError:
process.kill()
await process.wait()
return {
'success': False,
'stdout': '',
'stderr': f'Command timed out after {timeout} seconds',
'returncode': -1
}
except Exception as e:
return {
'success': False,
'stdout': '',
'stderr': str(e),
'returncode': -1
}
os.makedirs(USERS_BASE, exist_ok=True, mode=0o755)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host=config['service']['host'],
port=config['service']['port'],
log_level=config['service']['log_level']
)