harnessed_reasoning/wwwroot/reasoning_console.wss
yumoqing ea4a9e3bd9 feat: 推理过程可视化 - websocket实时推送推理步骤到前端
- core.py: 注入ws_push回调, 17个推理节点实时推送事件(上下文/规划/工具调用/执行)
- wwwroot/reasoning_console.wss: 新建websocket端点, 支持connect/start_reasoning/ping
- wwwroot/reasoning_console.ui: 重写HTML前端, 时间线式可视化展示推理过程
2026-05-09 15:48:12 +08:00

127 lines
3.6 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Reasoning Console WebSocket endpoint.
Handles real-time push of reasoning steps to the frontend.
"""
import json
import asyncio
import time
from appPublic.uniqueID import getID
from appPublic.log import info, debug, error, exception
# 全局存储活跃 ws_pool 引用
_reasoning_ws_sessions = {}
async def myfunc(request, **kwargs):
"""WebSocket handler for reasoning console."""
ws_pool = kwargs.get('ws_pool')
ws_data = kwargs.get('ws_data')
lenv = kwargs
try:
data = json.loads(ws_data) if ws_data else {}
except:
data = {}
cmd = data.get('cmd', '')
if cmd == 'connect':
# 前端连接时注册 session
user_id = data.get('user_id', 'anonymous')
session_id = data.get('session_id', getID())
_reasoning_ws_sessions[user_id] = {
'ws_pool': ws_pool,
'session_id': session_id,
}
debug(f"WS connected: user={user_id}, session={session_id}")
await ws_pool.sendto(json.dumps({
'type': 'connected',
'session_id': session_id,
'message': 'WebSocket 连接成功'
}))
elif cmd == 'start_reasoning':
# 前端发起推理请求
user_id = data.get('user_id', 'anonymous')
request_text = data.get('request', '')
if not request_text:
await ws_pool.sendto(json.dumps({
'type': 'error',
'message': '请求内容为空'
}))
return
# 推送推理开始事件
await _ws_push(user_id, {
'type': 'reasoning_start',
'data': {
'request': request_text,
'message': '推理引擎启动',
'timestamp': time.time()
}
})
# 调用推理引擎(异步执行,不阻塞 websocket
asyncio.create_task(
_run_reasoning(user_id, request_text)
)
elif cmd == 'ping':
await ws_pool.sendto(json.dumps({
'type': 'pong',
'timestamp': time.time()
}))
async def _ws_push(user_id, message):
"""推送消息到指定用户的 websocket 连接。"""
session = _reasoning_ws_sessions.get(user_id)
if session and session.get('ws_pool'):
try:
await session['ws_pool'].sendto(json.dumps(message))
except Exception as e:
error(f"WS push failed for user {user_id}: {e}")
async def _run_reasoning(user_id, request_text):
"""异步执行推理并推送每一步到前端。"""
from harnessed_reasoning.core import get_harnessed_reasoning_engine
engine = get_harnessed_reasoning_engine()
# 注入 ws_push 回调到引擎实例
async def push_callback(msg):
await _ws_push(user_id, msg)
engine.ws_push = push_callback
try:
result = await engine.reason_and_execute(
request=request_text,
execute_immediately=True,
user_id=user_id,
)
# 推送最终结果
await _ws_push(user_id, {
'type': 'reasoning_complete',
'data': {
'result': result,
'message': '推理完成',
'timestamp': time.time()
}
})
except Exception as e:
exception(f"Reasoning failed for user {user_id}")
await _ws_push(user_id, {
'type': 'error',
'data': {
'error': str(e),
'message': f'推理失败: {str(e)}',
'timestamp': time.time()
}
})
finally:
# 清理回调
engine.ws_push = None