From 89e099ee12c4d1c802975ad1be5973d8f2609c09 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Wed, 13 May 2026 13:41:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E6=8E=A8=E7=90=86?= =?UTF-8?q?=E8=BF=87=E7=A8=8B=E5=AE=9E=E6=97=B6=E5=8F=AF=E8=A7=86=E5=8C=96?= =?UTF-8?q?=EF=BC=88WebSocket=E4=BA=8B=E4=BB=B6=E6=8E=A8=E9=80=81=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 per-user WebSocket 回调机制,支持多用户并发推理会话 - 在每个推理阶段推送实时事件:context/plan/safety/execution/tool_call - reasoning_console.wss 使用 ws_push_callbacks 字典替代共享 ws_push - 新增 _current_user_id/_current_org_id 追踪用于用户隔离 - _execute_tool 传递 context 参数确保工具执行的用户隔离 - 新增文件技能搜索:扫描用户目录和共享目录的 skills,与DB技能去重 --- harnessed_reasoning/core.py | 143 +++++++++++++++++++++++++--------- wwwroot/reasoning_console.wss | 11 +-- 2 files changed, 110 insertions(+), 44 deletions(-) diff --git a/harnessed_reasoning/core.py b/harnessed_reasoning/core.py index 2b55e47..245de23 100644 --- a/harnessed_reasoning/core.py +++ b/harnessed_reasoning/core.py @@ -101,8 +101,8 @@ TOOL_DESCRIPTIONS = "\n".join(f"- {t['name']}: {t['desc']}" for t in AVAILABLE_T class HermesReasoningEngine: """Production reasoning engine that uses LLM and real tool execution.""" - # Websocket push callback (injected by .wss endpoint) - ws_push = None + # Websocket push callbacks keyed by user_id (injected by .wss endpoint) + ws_push_callbacks: Dict[str, callable] = {} DEFAULT_SAFETY_RULES = { "strict": [ @@ -116,20 +116,22 @@ class HermesReasoningEngine: } def __init__(self): - pass + self._current_user_id = None # Set during reason_and_execute execution + self._current_org_id = None # Set during reason_and_execute execution - async def _push(self, event_type: str, data: Dict[str, Any] = None): - """Push a reasoning step event via websocket.""" - if self.ws_push: + async def _push(self, event_type: str, data: Dict[str, Any] = None, user_id: str = None): + """Push a reasoning step event via websocket for a specific user.""" + if user_id and user_id in self.ws_push_callbacks: + ws_push = self.ws_push_callbacks[user_id] msg = { 'event': event_type, 'data': data or {}, 'timestamp': time.time(), } try: - await self.ws_push(msg) + await ws_push(msg) except Exception as e: - error(f"ws_push failed: {e}") + error(f"ws_push failed for user {user_id}: {e}") # -------------------------------------------------------- # Config helpers @@ -178,14 +180,14 @@ class HermesReasoningEngine: async def _get_memory_context(self, user_id: str, request: str, config: Dict) -> Dict[str, Any]: """Get real memory and session context from harnessed_agent.""" - context = {"user_id": user_id, "memory_entries": [], "recent_sessions": [], "skills": []} + context = {"user_id": user_id, "org_id": self._current_org_id, "memory_entries": [], "recent_sessions": [], "skills": []} - await self._push('context_start', {'message': '正在收集上下文...', 'user_id': user_id}) + await self._push('context_start', {'message': '正在收集上下文...', 'user_id': user_id}, user_id=self._current_user_id) try: # Intelligent memory max_tokens = int(config.get('max_context_tokens', 4000)) // 3 - await self._push('context_memory', {'message': '加载记忆上下文...', 'max_tokens': max_tokens}) + await self._push('context_memory', {'message': '加载记忆上下文...', 'max_tokens': max_tokens}, user_id=self._current_user_id) if hasattr(ServerEnv(), 'harnessed_get_intelligent_memory_context'): mem_result = await ServerEnv().harnessed_get_intelligent_memory_context( current_task=request, @@ -196,7 +198,7 @@ class HermesReasoningEngine: await self._push('context_memory_done', { 'message': f'加载 {len(context["memory_entries"])} 条记忆', 'count': len(context['memory_entries']) - }) + }, user_id=self._current_user_id) # Session search if config.get('enable_cross_session_search', '1') == '1': @@ -217,14 +219,50 @@ class HermesReasoningEngine: return context + def _search_skill_dir(self, skills_dir: str, keywords: set, source: str = "user") -> List[Dict[str, Any]]: + """Search a skill directory for skills matching keywords.""" + import os + results = [] + if not os.path.exists(skills_dir): + return results + for skill_name in os.listdir(skills_dir): + skill_path = os.path.join(skills_dir, skill_name) + skill_file = os.path.join(skill_path, "SKILL.md") + if os.path.isdir(skill_path) and os.path.exists(skill_file): + matches = False + for kw in keywords: + if kw in skill_name.lower(): + matches = True + break + if not matches: + with open(skill_file, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read().lower() + for kw in keywords: + if kw in content: + matches = True + break + if matches: + results.append({ + 'id': f'{source}_{skill_name}', + 'name': skill_name, + 'description': f'{source} skill: {skill_name}', + 'content_preview': '', + 'source': source + }) + return results + async def _find_relevant_skills(self, user_id: str, request: str) -> List[Dict[str, Any]]: - """Find skills relevant to the request via keyword search.""" + """Find skills relevant to the request via keyword search. + Searches both DB-based skills and file-based user skills directory.""" + import os keywords = set() for word in request.lower().split(): if len(word) > 2: keywords.add(word) skills = [] + + # 1. Search DB-based skills (harnessed_agent module's manage_skills) try: env = ServerEnv() dbname = env.get_module_dbname('harnessed_reasoning') @@ -242,17 +280,34 @@ class HermesReasoningEngine: }) rows = (rows or [])[:2] skills.extend(rows) - - # Deduplicate - seen = set() - unique = [] - for s in skills: - if s['id'] not in seen: - seen.add(s['id']) - unique.append(s) - return unique[:5] except Exception: - return [] + pass + + # 2. Search file-based user skills directory (~/.hermes/users/{user_id}/skills/) + try: + hermes_dir = os.path.expanduser("~/.hermes") + user_skills_dir = os.path.join(hermes_dir, "users", str(user_id), "skills") + skills.extend(self._search_skill_dir(user_skills_dir, keywords, source="user")) + except Exception: + pass + + # 3. Search shared skills directory (~/.hermes/skills/) + try: + hermes_dir = os.path.expanduser("~/.hermes") + shared_skills_dir = os.path.join(hermes_dir, "skills") + skills.extend(self._search_skill_dir(shared_skills_dir, keywords, source="shared")) + except Exception: + pass + + # Deduplicate by name + seen = set() + unique = [] + for s in skills: + name = s.get('name', '') + if name not in seen: + seen.add(name) + unique.append(s) + return unique[:5] # -------------------------------------------------------- # Safety checks @@ -315,7 +370,7 @@ class HermesReasoningEngine: async def _generate_plan(self, request: str, context: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: """Use LLM to analyze request and generate execution plan.""" - await self._push('plan_start', {'message': 'LLM 正在分析请求并生成执行计划...', 'request': request[:100]}) + await self._push('plan_start', {'message': 'LLM 正在分析请求并生成执行计划...', 'request': request[:100]}, user_id=self._current_user_id) # Build context summary ctx_parts = [] @@ -352,7 +407,7 @@ class HermesReasoningEngine: if 'error' in result: error(f"LLM planning failed: {result['error'].get('message')}") - await self._push('plan_error', {'message': f'LLM 调用失败: {result["error"].get("message")}'}) + await self._push('plan_error', {'message': f'LLM 调用失败: {result["error"].get("message")}'}, user_id=self._current_user_id) return { 'analysis': 'LLM 调用失败,无法生成计划', 'steps': [], @@ -370,7 +425,7 @@ class HermesReasoningEngine: 'analysis': plan.get('analysis', ''), 'step_count': steps_count, 'steps': plan.get('steps', []) - }) + }, user_id=self._current_user_id) return plan def _parse_plan_json(self, text: str) -> Dict[str, Any]: @@ -439,6 +494,7 @@ class HermesReasoningEngine: return await env.harnessed_execute_tool( tool_name=tool_name, parameters=parameters, + context=context, ) # Fallback: try to call the tool function directly if registered on ServerEnv @@ -473,6 +529,16 @@ class HermesReasoningEngine: if not user_id: user_id = "anonymous" + self._current_user_id = user_id # Set for user-isolated ws_push + + # Get org_id from ServerEnv for shared skill permission checks + self._current_org_id = None + try: + env = ServerEnv() + self._current_org_id = getattr(env, 'orgid', None) or getattr(env, 'org_id', None) + except Exception: + pass + config = await self._get_config(user_id) safety_mode = config.get('safety_mode', 'strict') @@ -492,14 +558,14 @@ class HermesReasoningEngine: 'user_id': user_id, 'request': request, 'message': '推理引擎启动' - }) + }, user_id=self._current_user_id) context = await self._get_memory_context(user_id, request, config) context['user_id'] = user_id # Ensure user_id is available for tool execution await self._push('context_complete', { 'message': self._context_summary(context), 'summary': self._context_summary(context) - }) + }, user_id=self._current_user_id) # Step 2: LLM-based planning plan = await self._generate_plan(request, context, config) @@ -514,9 +580,9 @@ class HermesReasoningEngine: await self._push('safety_violation', { 'violations': violations, 'message': f'安全检查发现 {len(violations)} 个违规' - }) + }, user_id=self._current_user_id) else: - await self._push('safety_pass', {'message': '安全检查通过'}) + await self._push('safety_pass', {'message': '安全检查通过'}, user_id=self._current_user_id) # Step 4: Store session await self._store_session(session_id, user_id, request, plan, violations, "planned") @@ -565,12 +631,15 @@ class HermesReasoningEngine: await self._push('reasoning_error', { 'error': str(e), 'message': f'推理失败: {str(e)}' - }) + }, user_id=user_id) try: await self._update_session_status(session_id, "failed") except Exception: pass + finally: + self._current_user_id = None # Clean up + self._current_org_id = None # Clean up return result @@ -610,7 +679,7 @@ class HermesReasoningEngine: await self._push('execution_start', { 'message': f'开始执行计划,共 {len(steps)} 个步骤', 'total_steps': len(steps) - }) + }, user_id=self._current_user_id) for step in steps[:max_steps]: step_num = step.get('step_number', '?') @@ -621,7 +690,7 @@ class HermesReasoningEngine: 'step_number': step_num, 'description': step_desc, 'message': f'步骤 {step_num}: {step_desc}' - }) + }, user_id=self._current_user_id) for action in step.get('actions', [])[:max_tools]: tool = action.get('tool', '') @@ -635,7 +704,7 @@ class HermesReasoningEngine: 'tool': tool, 'parameters': params, 'message': f'调用工具: {tool}' - }) + }, user_id=self._current_user_id) info(f"Executing step {step_num}: {tool}({json.dumps(params, ensure_ascii=False)[:100]})") tool_result = await self._execute_tool(tool, params, context) @@ -646,7 +715,7 @@ class HermesReasoningEngine: 'success': tool_result.get('success', False), 'result': str(tool_result)[:1000], 'message': f'工具 {tool} 执行{"成功" if tool_result.get("success") else "失败"}' - }) + }, user_id=self._current_user_id) step_results.append({ 'tool': tool, @@ -674,12 +743,12 @@ class HermesReasoningEngine: 'description': step.get('description', ''), 'action_count': len(step_results), 'message': f'步骤 {step_num} 完成,执行了 {len(step_results)} 个操作' - }) + }, user_id=self._current_user_id) await self._push('execution_complete', { 'message': f'计划执行完成,共 {len(all_results)} 个步骤', 'total_steps': len(all_results) - }) + }, user_id=self._current_user_id) return all_results diff --git a/wwwroot/reasoning_console.wss b/wwwroot/reasoning_console.wss index e4bf58b..ad522a3 100644 --- a/wwwroot/reasoning_console.wss +++ b/wwwroot/reasoning_console.wss @@ -89,11 +89,8 @@ async def _run_reasoning(user_id, request_text): engine = get_harnessed_reasoning_engine() - # 注入 ws_push 回调到引擎实例 - async def push_callback(msg): - await _ws_push(user_id, msg) - - engine.ws_push = push_callback + # 注册 per-user ws_push 回调到引擎 + engine.ws_push_callbacks[user_id] = lambda msg: _ws_push(user_id, msg) try: result = await engine.reason_and_execute( @@ -122,5 +119,5 @@ async def _run_reasoning(user_id, request_text): } }) finally: - # 清理回调 - engine.ws_push = None + # 清理 per-user 回调 + engine.ws_push_callbacks.pop(user_id, None)