harnessed_reasoning/wwwroot/reasoning_console.wss
yumoqing 89e099ee12 feat: 实现推理过程实时可视化(WebSocket事件推送)
- 新增 per-user WebSocket 回调机制,支持多用户并发推理会话
- 在每个推理阶段推送实时事件:context/plan/safety/execution/tool_call
- reasoning_console.wss 使用 ws_push_callbacks 字典替代共享 ws_push
- 新增 _current_user_id/_current_org_id 追踪用于用户隔离
- _execute_tool 传递 context 参数确保工具执行的用户隔离
- 新增文件技能搜索:扫描用户目录和共享目录的 skills,与DB技能去重
2026-05-13 13:44:09 +08:00

124 lines
3.6 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Reasoning Console WebSocket endpoint.
Handles real-time push of reasoning steps to the frontend.
"""
import json
import asyncio
import time
from appPublic.uniqueID import getID
from appPublic.log import info, debug, error, exception
# 全局存储活跃 ws_pool 引用
_reasoning_ws_sessions = {}
async def myfunc(request, **kwargs):
"""WebSocket handler for reasoning console."""
ws_pool = kwargs.get('ws_pool')
ws_data = kwargs.get('ws_data')
lenv = kwargs
try:
data = json.loads(ws_data) if ws_data else {}
except:
data = {}
cmd = data.get('cmd', '')
if cmd == 'connect':
# 前端连接时注册 session
user_id = data.get('user_id', 'anonymous')
session_id = data.get('session_id', getID())
_reasoning_ws_sessions[user_id] = {
'ws_pool': ws_pool,
'session_id': session_id,
}
debug(f"WS connected: user={user_id}, session={session_id}")
await ws_pool.sendto(json.dumps({
'type': 'connected',
'session_id': session_id,
'message': 'WebSocket 连接成功'
}))
elif cmd == 'start_reasoning':
# 前端发起推理请求
user_id = data.get('user_id', 'anonymous')
request_text = data.get('request', '')
if not request_text:
await ws_pool.sendto(json.dumps({
'type': 'error',
'message': '请求内容为空'
}))
return
# 推送推理开始事件
await _ws_push(user_id, {
'type': 'reasoning_start',
'data': {
'request': request_text,
'message': '推理引擎启动',
'timestamp': time.time()
}
})
# 调用推理引擎(异步执行,不阻塞 websocket
asyncio.create_task(
_run_reasoning(user_id, request_text)
)
elif cmd == 'ping':
await ws_pool.sendto(json.dumps({
'type': 'pong',
'timestamp': time.time()
}))
async def _ws_push(user_id, message):
"""推送消息到指定用户的 websocket 连接。"""
session = _reasoning_ws_sessions.get(user_id)
if session and session.get('ws_pool'):
try:
await session['ws_pool'].sendto(json.dumps(message))
except Exception as e:
error(f"WS push failed for user {user_id}: {e}")
async def _run_reasoning(user_id, request_text):
"""异步执行推理并推送每一步到前端。"""
from harnessed_reasoning.core import get_harnessed_reasoning_engine
engine = get_harnessed_reasoning_engine()
# 注册 per-user ws_push 回调到引擎
engine.ws_push_callbacks[user_id] = lambda msg: _ws_push(user_id, msg)
try:
result = await engine.reason_and_execute(
request=request_text,
execute_immediately=True,
user_id=user_id,
)
# 推送最终结果
await _ws_push(user_id, {
'type': 'reasoning_complete',
'data': {
'result': result,
'message': '推理完成',
'timestamp': time.time()
}
})
except Exception as e:
exception(f"Reasoning failed for user {user_id}")
await _ws_push(user_id, {
'type': 'error',
'data': {
'error': str(e),
'message': f'推理失败: {str(e)}',
'timestamp': time.time()
}
})
finally:
# 清理 per-user 回调
engine.ws_push_callbacks.pop(user_id, None)