- 新增 per-user WebSocket 回调机制,支持多用户并发推理会话 - 在每个推理阶段推送实时事件:context/plan/safety/execution/tool_call - reasoning_console.wss 使用 ws_push_callbacks 字典替代共享 ws_push - 新增 _current_user_id/_current_org_id 追踪用于用户隔离 - _execute_tool 传递 context 参数确保工具执行的用户隔离 - 新增文件技能搜索:扫描用户目录和共享目录的 skills,与DB技能去重
124 lines
3.6 KiB
Plaintext
124 lines
3.6 KiB
Plaintext
"""
|
||
Reasoning Console WebSocket endpoint.
|
||
Handles real-time push of reasoning steps to the frontend.
|
||
"""
|
||
import json
|
||
import asyncio
|
||
import time
|
||
from appPublic.uniqueID import getID
|
||
from appPublic.log import info, debug, error, exception
|
||
|
||
# 全局存储活跃 ws_pool 引用
|
||
_reasoning_ws_sessions = {}
|
||
|
||
async def myfunc(request, **kwargs):
|
||
"""WebSocket handler for reasoning console."""
|
||
ws_pool = kwargs.get('ws_pool')
|
||
ws_data = kwargs.get('ws_data')
|
||
lenv = kwargs
|
||
|
||
try:
|
||
data = json.loads(ws_data) if ws_data else {}
|
||
except:
|
||
data = {}
|
||
|
||
cmd = data.get('cmd', '')
|
||
|
||
if cmd == 'connect':
|
||
# 前端连接时注册 session
|
||
user_id = data.get('user_id', 'anonymous')
|
||
session_id = data.get('session_id', getID())
|
||
_reasoning_ws_sessions[user_id] = {
|
||
'ws_pool': ws_pool,
|
||
'session_id': session_id,
|
||
}
|
||
debug(f"WS connected: user={user_id}, session={session_id}")
|
||
await ws_pool.sendto(json.dumps({
|
||
'type': 'connected',
|
||
'session_id': session_id,
|
||
'message': 'WebSocket 连接成功'
|
||
}))
|
||
|
||
elif cmd == 'start_reasoning':
|
||
# 前端发起推理请求
|
||
user_id = data.get('user_id', 'anonymous')
|
||
request_text = data.get('request', '')
|
||
|
||
if not request_text:
|
||
await ws_pool.sendto(json.dumps({
|
||
'type': 'error',
|
||
'message': '请求内容为空'
|
||
}))
|
||
return
|
||
|
||
# 推送推理开始事件
|
||
await _ws_push(user_id, {
|
||
'type': 'reasoning_start',
|
||
'data': {
|
||
'request': request_text,
|
||
'message': '推理引擎启动',
|
||
'timestamp': time.time()
|
||
}
|
||
})
|
||
|
||
# 调用推理引擎(异步执行,不阻塞 websocket)
|
||
asyncio.create_task(
|
||
_run_reasoning(user_id, request_text)
|
||
)
|
||
|
||
elif cmd == 'ping':
|
||
await ws_pool.sendto(json.dumps({
|
||
'type': 'pong',
|
||
'timestamp': time.time()
|
||
}))
|
||
|
||
|
||
async def _ws_push(user_id, message):
|
||
"""推送消息到指定用户的 websocket 连接。"""
|
||
session = _reasoning_ws_sessions.get(user_id)
|
||
if session and session.get('ws_pool'):
|
||
try:
|
||
await session['ws_pool'].sendto(json.dumps(message))
|
||
except Exception as e:
|
||
error(f"WS push failed for user {user_id}: {e}")
|
||
|
||
|
||
async def _run_reasoning(user_id, request_text):
|
||
"""异步执行推理并推送每一步到前端。"""
|
||
from harnessed_reasoning.core import get_harnessed_reasoning_engine
|
||
|
||
engine = get_harnessed_reasoning_engine()
|
||
|
||
# 注册 per-user ws_push 回调到引擎
|
||
engine.ws_push_callbacks[user_id] = lambda msg: _ws_push(user_id, msg)
|
||
|
||
try:
|
||
result = await engine.reason_and_execute(
|
||
request=request_text,
|
||
execute_immediately=True,
|
||
user_id=user_id,
|
||
)
|
||
|
||
# 推送最终结果
|
||
await _ws_push(user_id, {
|
||
'type': 'reasoning_complete',
|
||
'data': {
|
||
'result': result,
|
||
'message': '推理完成',
|
||
'timestamp': time.time()
|
||
}
|
||
})
|
||
except Exception as e:
|
||
exception(f"Reasoning failed for user {user_id}")
|
||
await _ws_push(user_id, {
|
||
'type': 'error',
|
||
'data': {
|
||
'error': str(e),
|
||
'message': f'推理失败: {str(e)}',
|
||
'timestamp': time.time()
|
||
}
|
||
})
|
||
finally:
|
||
# 清理 per-user 回调
|
||
engine.ws_push_callbacks.pop(user_id, None)
|