feat: 实现推理过程实时可视化(WebSocket事件推送)

- 新增 per-user WebSocket 回调机制,支持多用户并发推理会话
- 在每个推理阶段推送实时事件:context/plan/safety/execution/tool_call
- reasoning_console.wss 使用 ws_push_callbacks 字典替代共享 ws_push
- 新增 _current_user_id/_current_org_id 追踪用于用户隔离
- _execute_tool 传递 context 参数确保工具执行的用户隔离
- 新增文件技能搜索:扫描用户目录和共享目录的 skills,与DB技能去重
This commit is contained in:
yumoqing 2026-05-13 13:41:42 +08:00
parent 766bd9ecb9
commit 89e099ee12
2 changed files with 110 additions and 44 deletions

View File

@ -101,8 +101,8 @@ TOOL_DESCRIPTIONS = "\n".join(f"- {t['name']}: {t['desc']}" for t in AVAILABLE_T
class HermesReasoningEngine: class HermesReasoningEngine:
"""Production reasoning engine that uses LLM and real tool execution.""" """Production reasoning engine that uses LLM and real tool execution."""
# Websocket push callback (injected by .wss endpoint) # Websocket push callbacks keyed by user_id (injected by .wss endpoint)
ws_push = None ws_push_callbacks: Dict[str, callable] = {}
DEFAULT_SAFETY_RULES = { DEFAULT_SAFETY_RULES = {
"strict": [ "strict": [
@ -116,20 +116,22 @@ class HermesReasoningEngine:
} }
def __init__(self): def __init__(self):
pass self._current_user_id = None # Set during reason_and_execute execution
self._current_org_id = None # Set during reason_and_execute execution
async def _push(self, event_type: str, data: Dict[str, Any] = None): async def _push(self, event_type: str, data: Dict[str, Any] = None, user_id: str = None):
"""Push a reasoning step event via websocket.""" """Push a reasoning step event via websocket for a specific user."""
if self.ws_push: if user_id and user_id in self.ws_push_callbacks:
ws_push = self.ws_push_callbacks[user_id]
msg = { msg = {
'event': event_type, 'event': event_type,
'data': data or {}, 'data': data or {},
'timestamp': time.time(), 'timestamp': time.time(),
} }
try: try:
await self.ws_push(msg) await ws_push(msg)
except Exception as e: except Exception as e:
error(f"ws_push failed: {e}") error(f"ws_push failed for user {user_id}: {e}")
# -------------------------------------------------------- # --------------------------------------------------------
# Config helpers # Config helpers
@ -178,14 +180,14 @@ class HermesReasoningEngine:
async def _get_memory_context(self, user_id: str, request: str, config: Dict) -> Dict[str, Any]: async def _get_memory_context(self, user_id: str, request: str, config: Dict) -> Dict[str, Any]:
"""Get real memory and session context from harnessed_agent.""" """Get real memory and session context from harnessed_agent."""
context = {"user_id": user_id, "memory_entries": [], "recent_sessions": [], "skills": []} context = {"user_id": user_id, "org_id": self._current_org_id, "memory_entries": [], "recent_sessions": [], "skills": []}
await self._push('context_start', {'message': '正在收集上下文...', 'user_id': user_id}) await self._push('context_start', {'message': '正在收集上下文...', 'user_id': user_id}, user_id=self._current_user_id)
try: try:
# Intelligent memory # Intelligent memory
max_tokens = int(config.get('max_context_tokens', 4000)) // 3 max_tokens = int(config.get('max_context_tokens', 4000)) // 3
await self._push('context_memory', {'message': '加载记忆上下文...', 'max_tokens': max_tokens}) await self._push('context_memory', {'message': '加载记忆上下文...', 'max_tokens': max_tokens}, user_id=self._current_user_id)
if hasattr(ServerEnv(), 'harnessed_get_intelligent_memory_context'): if hasattr(ServerEnv(), 'harnessed_get_intelligent_memory_context'):
mem_result = await ServerEnv().harnessed_get_intelligent_memory_context( mem_result = await ServerEnv().harnessed_get_intelligent_memory_context(
current_task=request, current_task=request,
@ -196,7 +198,7 @@ class HermesReasoningEngine:
await self._push('context_memory_done', { await self._push('context_memory_done', {
'message': f'加载 {len(context["memory_entries"])} 条记忆', 'message': f'加载 {len(context["memory_entries"])} 条记忆',
'count': len(context['memory_entries']) 'count': len(context['memory_entries'])
}) }, user_id=self._current_user_id)
# Session search # Session search
if config.get('enable_cross_session_search', '1') == '1': if config.get('enable_cross_session_search', '1') == '1':
@ -217,14 +219,50 @@ class HermesReasoningEngine:
return context return context
def _search_skill_dir(self, skills_dir: str, keywords: set, source: str = "user") -> List[Dict[str, Any]]:
"""Search a skill directory for skills matching keywords."""
import os
results = []
if not os.path.exists(skills_dir):
return results
for skill_name in os.listdir(skills_dir):
skill_path = os.path.join(skills_dir, skill_name)
skill_file = os.path.join(skill_path, "SKILL.md")
if os.path.isdir(skill_path) and os.path.exists(skill_file):
matches = False
for kw in keywords:
if kw in skill_name.lower():
matches = True
break
if not matches:
with open(skill_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().lower()
for kw in keywords:
if kw in content:
matches = True
break
if matches:
results.append({
'id': f'{source}_{skill_name}',
'name': skill_name,
'description': f'{source} skill: {skill_name}',
'content_preview': '',
'source': source
})
return results
async def _find_relevant_skills(self, user_id: str, request: str) -> List[Dict[str, Any]]: async def _find_relevant_skills(self, user_id: str, request: str) -> List[Dict[str, Any]]:
"""Find skills relevant to the request via keyword search.""" """Find skills relevant to the request via keyword search.
Searches both DB-based skills and file-based user skills directory."""
import os
keywords = set() keywords = set()
for word in request.lower().split(): for word in request.lower().split():
if len(word) > 2: if len(word) > 2:
keywords.add(word) keywords.add(word)
skills = [] skills = []
# 1. Search DB-based skills (harnessed_agent module's manage_skills)
try: try:
env = ServerEnv() env = ServerEnv()
dbname = env.get_module_dbname('harnessed_reasoning') dbname = env.get_module_dbname('harnessed_reasoning')
@ -242,17 +280,34 @@ class HermesReasoningEngine:
}) })
rows = (rows or [])[:2] rows = (rows or [])[:2]
skills.extend(rows) skills.extend(rows)
# Deduplicate
seen = set()
unique = []
for s in skills:
if s['id'] not in seen:
seen.add(s['id'])
unique.append(s)
return unique[:5]
except Exception: except Exception:
return [] pass
# 2. Search file-based user skills directory (~/.hermes/users/{user_id}/skills/)
try:
hermes_dir = os.path.expanduser("~/.hermes")
user_skills_dir = os.path.join(hermes_dir, "users", str(user_id), "skills")
skills.extend(self._search_skill_dir(user_skills_dir, keywords, source="user"))
except Exception:
pass
# 3. Search shared skills directory (~/.hermes/skills/)
try:
hermes_dir = os.path.expanduser("~/.hermes")
shared_skills_dir = os.path.join(hermes_dir, "skills")
skills.extend(self._search_skill_dir(shared_skills_dir, keywords, source="shared"))
except Exception:
pass
# Deduplicate by name
seen = set()
unique = []
for s in skills:
name = s.get('name', '')
if name not in seen:
seen.add(name)
unique.append(s)
return unique[:5]
# -------------------------------------------------------- # --------------------------------------------------------
# Safety checks # Safety checks
@ -315,7 +370,7 @@ class HermesReasoningEngine:
async def _generate_plan(self, request: str, context: Dict[str, Any], async def _generate_plan(self, request: str, context: Dict[str, Any],
config: Dict[str, Any]) -> Dict[str, Any]: config: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM to analyze request and generate execution plan.""" """Use LLM to analyze request and generate execution plan."""
await self._push('plan_start', {'message': 'LLM 正在分析请求并生成执行计划...', 'request': request[:100]}) await self._push('plan_start', {'message': 'LLM 正在分析请求并生成执行计划...', 'request': request[:100]}, user_id=self._current_user_id)
# Build context summary # Build context summary
ctx_parts = [] ctx_parts = []
@ -352,7 +407,7 @@ class HermesReasoningEngine:
if 'error' in result: if 'error' in result:
error(f"LLM planning failed: {result['error'].get('message')}") error(f"LLM planning failed: {result['error'].get('message')}")
await self._push('plan_error', {'message': f'LLM 调用失败: {result["error"].get("message")}'}) await self._push('plan_error', {'message': f'LLM 调用失败: {result["error"].get("message")}'}, user_id=self._current_user_id)
return { return {
'analysis': 'LLM 调用失败,无法生成计划', 'analysis': 'LLM 调用失败,无法生成计划',
'steps': [], 'steps': [],
@ -370,7 +425,7 @@ class HermesReasoningEngine:
'analysis': plan.get('analysis', ''), 'analysis': plan.get('analysis', ''),
'step_count': steps_count, 'step_count': steps_count,
'steps': plan.get('steps', []) 'steps': plan.get('steps', [])
}) }, user_id=self._current_user_id)
return plan return plan
def _parse_plan_json(self, text: str) -> Dict[str, Any]: def _parse_plan_json(self, text: str) -> Dict[str, Any]:
@ -439,6 +494,7 @@ class HermesReasoningEngine:
return await env.harnessed_execute_tool( return await env.harnessed_execute_tool(
tool_name=tool_name, tool_name=tool_name,
parameters=parameters, parameters=parameters,
context=context,
) )
# Fallback: try to call the tool function directly if registered on ServerEnv # Fallback: try to call the tool function directly if registered on ServerEnv
@ -473,6 +529,16 @@ class HermesReasoningEngine:
if not user_id: if not user_id:
user_id = "anonymous" user_id = "anonymous"
self._current_user_id = user_id # Set for user-isolated ws_push
# Get org_id from ServerEnv for shared skill permission checks
self._current_org_id = None
try:
env = ServerEnv()
self._current_org_id = getattr(env, 'orgid', None) or getattr(env, 'org_id', None)
except Exception:
pass
config = await self._get_config(user_id) config = await self._get_config(user_id)
safety_mode = config.get('safety_mode', 'strict') safety_mode = config.get('safety_mode', 'strict')
@ -492,14 +558,14 @@ class HermesReasoningEngine:
'user_id': user_id, 'user_id': user_id,
'request': request, 'request': request,
'message': '推理引擎启动' 'message': '推理引擎启动'
}) }, user_id=self._current_user_id)
context = await self._get_memory_context(user_id, request, config) context = await self._get_memory_context(user_id, request, config)
context['user_id'] = user_id # Ensure user_id is available for tool execution context['user_id'] = user_id # Ensure user_id is available for tool execution
await self._push('context_complete', { await self._push('context_complete', {
'message': self._context_summary(context), 'message': self._context_summary(context),
'summary': self._context_summary(context) 'summary': self._context_summary(context)
}) }, user_id=self._current_user_id)
# Step 2: LLM-based planning # Step 2: LLM-based planning
plan = await self._generate_plan(request, context, config) plan = await self._generate_plan(request, context, config)
@ -514,9 +580,9 @@ class HermesReasoningEngine:
await self._push('safety_violation', { await self._push('safety_violation', {
'violations': violations, 'violations': violations,
'message': f'安全检查发现 {len(violations)} 个违规' 'message': f'安全检查发现 {len(violations)} 个违规'
}) }, user_id=self._current_user_id)
else: else:
await self._push('safety_pass', {'message': '安全检查通过'}) await self._push('safety_pass', {'message': '安全检查通过'}, user_id=self._current_user_id)
# Step 4: Store session # Step 4: Store session
await self._store_session(session_id, user_id, request, plan, violations, "planned") await self._store_session(session_id, user_id, request, plan, violations, "planned")
@ -565,12 +631,15 @@ class HermesReasoningEngine:
await self._push('reasoning_error', { await self._push('reasoning_error', {
'error': str(e), 'error': str(e),
'message': f'推理失败: {str(e)}' 'message': f'推理失败: {str(e)}'
}) }, user_id=user_id)
try: try:
await self._update_session_status(session_id, "failed") await self._update_session_status(session_id, "failed")
except Exception: except Exception:
pass pass
finally:
self._current_user_id = None # Clean up
self._current_org_id = None # Clean up
return result return result
@ -610,7 +679,7 @@ class HermesReasoningEngine:
await self._push('execution_start', { await self._push('execution_start', {
'message': f'开始执行计划,共 {len(steps)} 个步骤', 'message': f'开始执行计划,共 {len(steps)} 个步骤',
'total_steps': len(steps) 'total_steps': len(steps)
}) }, user_id=self._current_user_id)
for step in steps[:max_steps]: for step in steps[:max_steps]:
step_num = step.get('step_number', '?') step_num = step.get('step_number', '?')
@ -621,7 +690,7 @@ class HermesReasoningEngine:
'step_number': step_num, 'step_number': step_num,
'description': step_desc, 'description': step_desc,
'message': f'步骤 {step_num}: {step_desc}' 'message': f'步骤 {step_num}: {step_desc}'
}) }, user_id=self._current_user_id)
for action in step.get('actions', [])[:max_tools]: for action in step.get('actions', [])[:max_tools]:
tool = action.get('tool', '') tool = action.get('tool', '')
@ -635,7 +704,7 @@ class HermesReasoningEngine:
'tool': tool, 'tool': tool,
'parameters': params, 'parameters': params,
'message': f'调用工具: {tool}' 'message': f'调用工具: {tool}'
}) }, user_id=self._current_user_id)
info(f"Executing step {step_num}: {tool}({json.dumps(params, ensure_ascii=False)[:100]})") info(f"Executing step {step_num}: {tool}({json.dumps(params, ensure_ascii=False)[:100]})")
tool_result = await self._execute_tool(tool, params, context) tool_result = await self._execute_tool(tool, params, context)
@ -646,7 +715,7 @@ class HermesReasoningEngine:
'success': tool_result.get('success', False), 'success': tool_result.get('success', False),
'result': str(tool_result)[:1000], 'result': str(tool_result)[:1000],
'message': f'工具 {tool} 执行{"成功" if tool_result.get("success") else "失败"}' 'message': f'工具 {tool} 执行{"成功" if tool_result.get("success") else "失败"}'
}) }, user_id=self._current_user_id)
step_results.append({ step_results.append({
'tool': tool, 'tool': tool,
@ -674,12 +743,12 @@ class HermesReasoningEngine:
'description': step.get('description', ''), 'description': step.get('description', ''),
'action_count': len(step_results), 'action_count': len(step_results),
'message': f'步骤 {step_num} 完成,执行了 {len(step_results)} 个操作' 'message': f'步骤 {step_num} 完成,执行了 {len(step_results)} 个操作'
}) }, user_id=self._current_user_id)
await self._push('execution_complete', { await self._push('execution_complete', {
'message': f'计划执行完成,共 {len(all_results)} 个步骤', 'message': f'计划执行完成,共 {len(all_results)} 个步骤',
'total_steps': len(all_results) 'total_steps': len(all_results)
}) }, user_id=self._current_user_id)
return all_results return all_results

View File

@ -89,11 +89,8 @@ async def _run_reasoning(user_id, request_text):
engine = get_harnessed_reasoning_engine() engine = get_harnessed_reasoning_engine()
# 注入 ws_push 回调到引擎实例 # 注册 per-user ws_push 回调到引擎
async def push_callback(msg): engine.ws_push_callbacks[user_id] = lambda msg: _ws_push(user_id, msg)
await _ws_push(user_id, msg)
engine.ws_push = push_callback
try: try:
result = await engine.reason_and_execute( result = await engine.reason_and_execute(
@ -122,5 +119,5 @@ async def _run_reasoning(user_id, request_text):
} }
}) })
finally: finally:
# 清理回调 # 清理 per-user 回调
engine.ws_push = None engine.ws_push_callbacks.pop(user_id, None)