feat: 推理过程可视化 - websocket实时推送推理步骤到前端

- core.py: 注入ws_push回调, 17个推理节点实时推送事件(上下文/规划/工具调用/执行)
- wwwroot/reasoning_console.wss: 新建websocket端点, 支持connect/start_reasoning/ping
- wwwroot/reasoning_console.ui: 重写HTML前端, 时间线式可视化展示推理过程
This commit is contained in:
yumoqing 2026-05-09 15:48:12 +08:00
parent 564084f3c8
commit ea4a9e3bd9
3 changed files with 232 additions and 86 deletions

View File

@ -101,6 +101,9 @@ TOOL_DESCRIPTIONS = "\n".join(f"- {t['name']}: {t['desc']}" for t in AVAILABLE_T
class HermesReasoningEngine:
"""Production reasoning engine that uses LLM and real tool execution."""
# Websocket push callback (injected by .wss endpoint)
ws_push = None
DEFAULT_SAFETY_RULES = {
"strict": [
"rm -rf /", "format ", "dd if=/dev/", "mkfs", "chmod 777",
@ -115,6 +118,19 @@ class HermesReasoningEngine:
def __init__(self):
pass
async def _push(self, event_type: str, data: Dict[str, Any] = None):
"""Push a reasoning step event via websocket."""
if self.ws_push:
msg = {
'event': event_type,
'data': data or {},
'timestamp': time.time(),
}
try:
await self.ws_push(msg)
except Exception as e:
error(f"ws_push failed: {e}")
# --------------------------------------------------------
# Config helpers
# --------------------------------------------------------
@ -164,9 +180,12 @@ class HermesReasoningEngine:
"""Get real memory and session context from harnessed_agent."""
context = {"user_id": user_id, "memory_entries": [], "recent_sessions": [], "skills": []}
await self._push('context_start', {'message': '正在收集上下文...', 'user_id': user_id})
try:
# Intelligent memory
max_tokens = int(config.get('max_context_tokens', 4000)) // 3
await self._push('context_memory', {'message': '加载记忆上下文...', 'max_tokens': max_tokens})
if hasattr(ServerEnv(), 'harnessed_get_intelligent_memory_context'):
mem_result = await ServerEnv().harnessed_get_intelligent_memory_context(
current_task=request,
@ -174,6 +193,10 @@ class HermesReasoningEngine:
)
if mem_result.get('success'):
context['memory_entries'] = mem_result.get('memories', [])
await self._push('context_memory_done', {
'message': f'加载 {len(context["memory_entries"])} 条记忆',
'count': len(context['memory_entries'])
})
# Session search
if config.get('enable_cross_session_search', '1') == '1':
@ -292,6 +315,8 @@ class HermesReasoningEngine:
async def _generate_plan(self, request: str, context: Dict[str, Any],
config: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM to analyze request and generate execution plan."""
await self._push('plan_start', {'message': 'LLM 正在分析请求并生成执行计划...', 'request': request[:100]})
# Build context summary
ctx_parts = []
if context.get('memory_entries'):
@ -327,6 +352,7 @@ class HermesReasoningEngine:
if 'error' in result:
error(f"LLM planning failed: {result['error'].get('message')}")
await self._push('plan_error', {'message': f'LLM 调用失败: {result["error"].get("message")}'})
return {
'analysis': 'LLM 调用失败,无法生成计划',
'steps': [],
@ -336,7 +362,16 @@ class HermesReasoningEngine:
# Extract JSON from response
content = result.get('choices', [{}])[0].get('message', {}).get('content', '')
return self._parse_plan_json(content)
plan = self._parse_plan_json(content)
steps_count = len(plan.get('steps', []))
await self._push('plan_complete', {
'message': f'执行计划已生成,共 {steps_count} 个步骤',
'analysis': plan.get('analysis', ''),
'step_count': steps_count,
'steps': plan.get('steps', [])
})
return plan
def _parse_plan_json(self, text: str) -> Dict[str, Any]:
"""Extract and parse JSON plan from LLM response."""
@ -452,9 +487,20 @@ class HermesReasoningEngine:
try:
# Step 1: Gather real context
info(f"Reasoning start: user={user_id}, request={request[:80]}...")
await self._push('reasoning_start', {
'session_id': session_id,
'user_id': user_id,
'request': request,
'message': '推理引擎启动'
})
context = await self._get_memory_context(user_id, request, config)
context['user_id'] = user_id # Ensure user_id is available for tool execution
await self._push('context_complete', {
'message': self._context_summary(context),
'summary': self._context_summary(context)
})
# Step 2: LLM-based planning
plan = await self._generate_plan(request, context, config)
@ -465,6 +511,12 @@ class HermesReasoningEngine:
violations = self._safety_check(plan, safety_mode)
if violations:
warning(f"Safety violations: {violations}")
await self._push('safety_violation', {
'violations': violations,
'message': f'安全检查发现 {len(violations)} 个违规'
})
else:
await self._push('safety_pass', {'message': '安全检查通过'})
# Step 4: Store session
await self._store_session(session_id, user_id, request, plan, violations, "planned")
@ -499,12 +551,21 @@ class HermesReasoningEngine:
elapsed_total = time.time() - start_time
info(f"Reasoning complete in {elapsed_total:.1f}s, status={result['status']}")
self._push('reasoning_complete', {
'status': result.get('status', 'completed'),
'elapsed': round(elapsed_total, 1),
'message': f'推理完成,状态: {result.get("status", "completed")}'
})
except Exception as e:
exception(f"Reasoning failed: {e}")
result["success"] = False
result["error"] = str(e)
result["status"] = "failed"
await self._push('reasoning_error', {
'error': str(e),
'message': f'推理失败: {str(e)}'
})
try:
await self._update_session_status(session_id, "failed")
@ -546,10 +607,22 @@ class HermesReasoningEngine:
max_steps = int(config.get('max_reasoning_steps', 10))
max_tools = int(config.get('max_tool_calls_per_step', 5))
await self._push('execution_start', {
'message': f'开始执行计划,共 {len(steps)} 个步骤',
'total_steps': len(steps)
})
for step in steps[:max_steps]:
step_num = step.get('step_number', '?')
step_desc = step.get('description', '')
step_results = []
await self._push('step_start', {
'step_number': step_num,
'description': step_desc,
'message': f'步骤 {step_num}: {step_desc}'
})
for action in step.get('actions', [])[:max_tools]:
tool = action.get('tool', '')
params = action.get('parameters', {})
@ -557,9 +630,24 @@ class HermesReasoningEngine:
if not tool:
continue
await self._push('tool_call_start', {
'step_number': step_num,
'tool': tool,
'parameters': params,
'message': f'调用工具: {tool}'
})
info(f"Executing step {step_num}: {tool}({json.dumps(params, ensure_ascii=False)[:100]})")
tool_result = await self._execute_tool(tool, params, context)
await self._push('tool_call_result', {
'step_number': step_num,
'tool': tool,
'success': tool_result.get('success', False),
'result': str(tool_result)[:1000],
'message': f'工具 {tool} 执行{"成功" if tool_result.get("success") else "失败"}'
})
step_results.append({
'tool': tool,
'parameters': params,
@ -581,6 +669,18 @@ class HermesReasoningEngine:
'actions': step_results,
})
await self._push('step_complete', {
'step_number': step_num,
'description': step.get('description', ''),
'action_count': len(step_results),
'message': f'步骤 {step_num} 完成,执行了 {len(step_results)} 个操作'
})
await self._push('execution_complete', {
'message': f'计划执行完成,共 {len(all_results)} 个步骤',
'total_steps': len(all_results)
})
return all_results
async def _try_recovery(self, tool: str, params: Dict, error: str,

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,126 @@
"""
Reasoning Console WebSocket endpoint.
Handles real-time push of reasoning steps to the frontend.
"""
import json
import asyncio
import time
from appPublic.uniqueID import getID
from appPublic.log import info, debug, error, exception
# 全局存储活跃 ws_pool 引用
_reasoning_ws_sessions = {}
async def myfunc(request, **kwargs):
"""WebSocket handler for reasoning console."""
ws_pool = kwargs.get('ws_pool')
ws_data = kwargs.get('ws_data')
lenv = kwargs
try:
data = json.loads(ws_data) if ws_data else {}
except:
data = {}
cmd = data.get('cmd', '')
if cmd == 'connect':
# 前端连接时注册 session
user_id = data.get('user_id', 'anonymous')
session_id = data.get('session_id', getID())
_reasoning_ws_sessions[user_id] = {
'ws_pool': ws_pool,
'session_id': session_id,
}
debug(f"WS connected: user={user_id}, session={session_id}")
await ws_pool.sendto(json.dumps({
'type': 'connected',
'session_id': session_id,
'message': 'WebSocket 连接成功'
}))
elif cmd == 'start_reasoning':
# 前端发起推理请求
user_id = data.get('user_id', 'anonymous')
request_text = data.get('request', '')
if not request_text:
await ws_pool.sendto(json.dumps({
'type': 'error',
'message': '请求内容为空'
}))
return
# 推送推理开始事件
await _ws_push(user_id, {
'type': 'reasoning_start',
'data': {
'request': request_text,
'message': '推理引擎启动',
'timestamp': time.time()
}
})
# 调用推理引擎(异步执行,不阻塞 websocket
asyncio.create_task(
_run_reasoning(user_id, request_text)
)
elif cmd == 'ping':
await ws_pool.sendto(json.dumps({
'type': 'pong',
'timestamp': time.time()
}))
async def _ws_push(user_id, message):
"""推送消息到指定用户的 websocket 连接。"""
session = _reasoning_ws_sessions.get(user_id)
if session and session.get('ws_pool'):
try:
await session['ws_pool'].sendto(json.dumps(message))
except Exception as e:
error(f"WS push failed for user {user_id}: {e}")
async def _run_reasoning(user_id, request_text):
"""异步执行推理并推送每一步到前端。"""
from harnessed_reasoning.core import get_harnessed_reasoning_engine
engine = get_harnessed_reasoning_engine()
# 注入 ws_push 回调到引擎实例
async def push_callback(msg):
await _ws_push(user_id, msg)
engine.ws_push = push_callback
try:
result = await engine.reason_and_execute(
request=request_text,
execute_immediately=True,
user_id=user_id,
)
# 推送最终结果
await _ws_push(user_id, {
'type': 'reasoning_complete',
'data': {
'result': result,
'message': '推理完成',
'timestamp': time.time()
}
})
except Exception as e:
exception(f"Reasoning failed for user {user_id}")
await _ws_push(user_id, {
'type': 'error',
'data': {
'error': str(e),
'message': f'推理失败: {str(e)}',
'timestamp': time.time()
}
})
finally:
# 清理回调
engine.ws_push = None