""" Reasoning Console WebSocket endpoint. Handles real-time push of reasoning steps to the frontend. """ import json import asyncio import time from appPublic.uniqueID import getID from appPublic.log import info, debug, error, exception # 全局存储活跃 ws_pool 引用 _reasoning_ws_sessions = {} async def myfunc(request, **kwargs): """WebSocket handler for reasoning console.""" ws_pool = kwargs.get('ws_pool') ws_data = kwargs.get('ws_data') lenv = kwargs try: data = json.loads(ws_data) if ws_data else {} except: data = {} cmd = data.get('cmd', '') if cmd == 'connect': # 前端连接时注册 session user_id = data.get('user_id', 'anonymous') session_id = data.get('session_id', getID()) _reasoning_ws_sessions[user_id] = { 'ws_pool': ws_pool, 'session_id': session_id, } debug(f"WS connected: user={user_id}, session={session_id}") await ws_pool.sendto(json.dumps({ 'type': 'connected', 'session_id': session_id, 'message': 'WebSocket 连接成功' })) elif cmd == 'start_reasoning': # 前端发起推理请求 user_id = data.get('user_id', 'anonymous') request_text = data.get('request', '') if not request_text: await ws_pool.sendto(json.dumps({ 'type': 'error', 'message': '请求内容为空' })) return # 推送推理开始事件 await _ws_push(user_id, { 'type': 'reasoning_start', 'data': { 'request': request_text, 'message': '推理引擎启动', 'timestamp': time.time() } }) # 调用推理引擎(异步执行,不阻塞 websocket) asyncio.create_task( _run_reasoning(user_id, request_text) ) elif cmd == 'ping': await ws_pool.sendto(json.dumps({ 'type': 'pong', 'timestamp': time.time() })) async def _ws_push(user_id, message): """推送消息到指定用户的 websocket 连接。""" session = _reasoning_ws_sessions.get(user_id) if session and session.get('ws_pool'): try: await session['ws_pool'].sendto(json.dumps(message)) except Exception as e: error(f"WS push failed for user {user_id}: {e}") async def _run_reasoning(user_id, request_text): """异步执行推理并推送每一步到前端。""" from harnessed_reasoning.core import get_harnessed_reasoning_engine engine = get_harnessed_reasoning_engine() # 注册 per-user ws_push 回调到引擎 engine.ws_push_callbacks[user_id] = lambda msg: _ws_push(user_id, msg) try: result = await engine.reason_and_execute( request=request_text, execute_immediately=True, user_id=user_id, ) # 推送最终结果 await _ws_push(user_id, { 'type': 'reasoning_complete', 'data': { 'result': result, 'message': '推理完成', 'timestamp': time.time() } }) except Exception as e: exception(f"Reasoning failed for user {user_id}") await _ws_push(user_id, { 'type': 'error', 'data': { 'error': str(e), 'message': f'推理失败: {str(e)}', 'timestamp': time.time() } }) finally: # 清理 per-user 回调 engine.ws_push_callbacks.pop(user_id, None)