harnessed_reasoning/wwwroot/reasoning_console.wss
yumoqing 6c51e98108 Fix: per-user WebSocket push and file-based skill search in reasoning engine
- Replace shared ws_push callback with per-user ws_push_callbacks dict
  to prevent cross-user event leakage in concurrent sessions
- Add _current_user_id and _current_org_id tracking for user isolation
- Pass context parameter to harnessed_execute_tool for user isolation
- Add _search_skill_dir to search file-based skills in user and shared
  directories alongside DB-based skills
- Update _find_relevant_skills to search three sources: DB skills,
  user skills (~/.hermes/users/{id}/skills/), and shared skills
  (~/.hermes/skills/), with deduplication by skill name
- Update all _push() calls to pass user_id for per-user routing
- Update reasoning_console.wss to register per-user callbacks
- Clean up _current_user_id/_current_org_id in finally block
2026-05-13 13:41:42 +08:00

124 lines
3.6 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Reasoning Console WebSocket endpoint.
Handles real-time push of reasoning steps to the frontend.
"""
import json
import asyncio
import time
from appPublic.uniqueID import getID
from appPublic.log import info, debug, error, exception
# 全局存储活跃 ws_pool 引用
_reasoning_ws_sessions = {}
async def myfunc(request, **kwargs):
"""WebSocket handler for reasoning console."""
ws_pool = kwargs.get('ws_pool')
ws_data = kwargs.get('ws_data')
lenv = kwargs
try:
data = json.loads(ws_data) if ws_data else {}
except:
data = {}
cmd = data.get('cmd', '')
if cmd == 'connect':
# 前端连接时注册 session
user_id = data.get('user_id', 'anonymous')
session_id = data.get('session_id', getID())
_reasoning_ws_sessions[user_id] = {
'ws_pool': ws_pool,
'session_id': session_id,
}
debug(f"WS connected: user={user_id}, session={session_id}")
await ws_pool.sendto(json.dumps({
'type': 'connected',
'session_id': session_id,
'message': 'WebSocket 连接成功'
}))
elif cmd == 'start_reasoning':
# 前端发起推理请求
user_id = data.get('user_id', 'anonymous')
request_text = data.get('request', '')
if not request_text:
await ws_pool.sendto(json.dumps({
'type': 'error',
'message': '请求内容为空'
}))
return
# 推送推理开始事件
await _ws_push(user_id, {
'type': 'reasoning_start',
'data': {
'request': request_text,
'message': '推理引擎启动',
'timestamp': time.time()
}
})
# 调用推理引擎(异步执行,不阻塞 websocket
asyncio.create_task(
_run_reasoning(user_id, request_text)
)
elif cmd == 'ping':
await ws_pool.sendto(json.dumps({
'type': 'pong',
'timestamp': time.time()
}))
async def _ws_push(user_id, message):
"""推送消息到指定用户的 websocket 连接。"""
session = _reasoning_ws_sessions.get(user_id)
if session and session.get('ws_pool'):
try:
await session['ws_pool'].sendto(json.dumps(message))
except Exception as e:
error(f"WS push failed for user {user_id}: {e}")
async def _run_reasoning(user_id, request_text):
"""异步执行推理并推送每一步到前端。"""
from harnessed_reasoning.core import get_harnessed_reasoning_engine
engine = get_harnessed_reasoning_engine()
# 注册 per-user ws_push 回调到引擎
engine.ws_push_callbacks[user_id] = lambda msg: _ws_push(user_id, msg)
try:
result = await engine.reason_and_execute(
request=request_text,
execute_immediately=True,
user_id=user_id,
)
# 推送最终结果
await _ws_push(user_id, {
'type': 'reasoning_complete',
'data': {
'result': result,
'message': '推理完成',
'timestamp': time.time()
}
})
except Exception as e:
exception(f"Reasoning failed for user {user_id}")
await _ws_push(user_id, {
'type': 'error',
'data': {
'error': str(e),
'message': f'推理失败: {str(e)}',
'timestamp': time.time()
}
})
finally:
# 清理 per-user 回调
engine.ws_push_callbacks.pop(user_id, None)