From 77ab6ba30cf3674bcc77d862bf9ab6d9230633da Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 10 Feb 2026 18:19:32 +0800 Subject: [PATCH] bugfix --- skillagent/init.py | 3 + skillagent/skillengine.py | 135 +++++++++++++++++++++++++++++++------- 2 files changed, 115 insertions(+), 23 deletions(-) diff --git a/skillagent/init.py b/skillagent/init.py index 6a6c9c5..2a813d4 100644 --- a/skillagent/init.py +++ b/skillagent/init.py @@ -1,5 +1,8 @@ from ahserver.ServerEnv import ServerEnv from .agent import skillagent +from skillengine import IndustrialSkillEngine + def load_skillagent(): env = ServerEnv() + env.engine = IndustrialSkillEngine env.skillagent = skillagent diff --git a/skillagent/skillengine.py b/skillagent/skillengine.py index de76da4..1bec519 100644 --- a/skillagent/skillengine.py +++ b/skillagent/skillengine.py @@ -1,5 +1,6 @@ import os import re +import asyncio import yaml import logging import subprocess @@ -7,6 +8,8 @@ import hashlib from pathlib import Path from datetime import datetime from typing import List, Dict, Any +form ahserver.serverenv import ServerEnv +from appPublic.dictObject import DictObject # 配置审计日志 logging.basicConfig( @@ -15,15 +18,77 @@ logging.basicConfig( format='%(asctime)s [%(levelname)s] %(message)s' ) +class LLMHandler: + def __init__(self, request, llmid, apikey=None): + self.llmid = llmid + self.apikey = apikey + self.request = request + + async def __call__(self, prompt): + env = ServerEnv() + kw = { + "stream": False, + "llmid": self.llmid, + "prompt": prompt + } + kw = DictObject(**kw) + r = await env.inference(self, request, params_kw=kw) + return r.content + +async def run_subprocess(command, cwd, env, timeout=30.0): + try: + # 启动子进程 + # stdout/stderr 设置为 PIPE 对应原来的 capture_output=True + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=cwd, + env=env, + # 注意:asyncio 默认处理字节流,如果需要文本,后面需手动 decode + ) + + # 使用 wait_for 实现 timeout 功能 + try: + # wait_for 会在超时后抛出 TimeoutError 并自动 cancel 协程 + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout) + except asyncio.TimeoutError: + process.kill() # 超时强制杀掉进程 + await process.wait() # 确保资源回收 + raise Exception(f"{command} 执行超时") + + # 对应 text=True,手动解码 + return { + "stdout": stdout.decode().strip(), + "stderr": stderr.decode().strip(), + "return_code": process.returncode + } + + except Exception as e: + print(f"{command} 执行异常: {e}") + return None + class IndustrialSkillEngine: - def __init__(self, skills_dir: str, llm_handler): + def __init__(self, request, skills_dir: str, llmid, apikey=None): self.root = Path(skills_dir).resolve() - self.llm = llm_handler + self.llm = LLMHandler(request, llmid, apikey=apikey) self.registry = {} + self.task_queue = asyncio.Queue(maxsize=20) self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8] # 状态机:记录当前任务执行到的步骤 self.state = {"current_skill": None, "history": [], "pending_params": []} + async def write_output(self, data=None): + await self.task_queue.put(data) + + async def gen_output(self): + while True: + data = await self.task_queue.get() + if not data: + break; + yield data + asyncio.sleep(0.1) + # --- 1. 工业级初始化:依赖检查与索引 --- def boot(self): for skill_md in self.root.glob("**/SKILL.md"): @@ -44,19 +109,19 @@ class IndustrialSkillEngine: } logging.info(f"Engine Booted. Session: {self.session_id}. Skills: {list(self.registry.keys())}") - # --- 2. 自动化依赖环境隔离 (venv 思想) --- - def _ensure_dependencies(self, skill_name: str): + # --- 2. 自动化依赖环境隔离 (venv 思想) --- + async def _ensure_dependencies(self, skill_name: str): skill = self.registry[skill_name] if skill["has_deps"]: # 工业级引擎通常会检查一个隐藏的 .installed 标识 if not (skill["root"] / ".deps_installed").exists(): print(f"📦 正在为技能 {skill_name} 安装必要依赖...") - subprocess.run(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"]) + await run_subprocess(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"]) (skill["root"] / ".deps_installed").touch() # --- 3. 增强版安全执行器:带重试逻辑与审计 --- - def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str: - self._ensure_dependencies(skill_name) + async def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str: + await self._ensure_dependencies(skill_name) # 安全预检:禁止敏感命令 forbidden = ["rm ", "> /dev/", "chmod", "sudo"] @@ -64,9 +129,9 @@ class IndustrialSkillEngine: return "🚫 安全风险:检测到非法指令,执行被拦截。" # 权限确认 (Y/N) - print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}") - confirm = input("是否授权执行?(y/n/skip): ").lower() - if confirm != 'y': return "Execution skipped by user." + # print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}") + # confirm = input("是否授权执行?(y/n/skip): ").lower() + # if confirm != 'y': return "Execution skipped by user." logging.info(f"Executing: {command} in {skill_name}") @@ -74,8 +139,7 @@ class IndustrialSkillEngine: env = os.environ.copy() env["SKILL_CONTEXT"] = skill_name - res = subprocess.run(command, shell=True, cwd=self.registry[skill_name]["root"], - env=env, capture_output=True, text=True, timeout=30) + res = await run_subprocess(command, cwd=self.registry[skill_name]["root"], env=env, timeout=30) if res.return_code != 0: # 工业级特性:自动将错误回传给 LLM 进行自愈 (Self-healing) @@ -84,7 +148,7 @@ class IndustrialSkillEngine: print(f"⚠️ 执行失败,尝试让 AI 自愈修复参数...") new_prompt = f"命令 '{command}' 失败,错误信息: {res.stderr}。请根据错误重新生成正确的命令,或提示用户补全参数。" # 这里会递归调用逻辑进行修复 - return self.run(new_prompt, is_retry=True) + return await self.run(new_prompt, is_retry=True) return f"Error: {res.stderr}" return res.stdout @@ -92,7 +156,7 @@ class IndustrialSkillEngine: return str(e) # --- 4. 递归文档注入与意图路由 --- - def _get_expanded_context(self, skill_name: str, user_prompt: str): + async def _get_expanded_context(self, skill_name: str, user_prompt: str): skill = self.registry[skill_name] base_content = skill["content"] @@ -106,34 +170,59 @@ class IndustrialSkillEngine: if found_docs: # 仅在模型认为有必要时,“点餐式”加载 - choice = self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)") + choice = await self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)") if choice != "None" and any(choice in doc for doc in found_docs): with open(skill["root"] / choice, 'r') as f: - print(f"📂 深度加载: {choice}") + await self.write_output({ + "status": "PROCESSING", + "hint": f"📂 深度加载: {choice}" + }) base_content += f"\n\n--- 深度参考 ({choice}) ---\n{f.read()}" return base_content # --- 5. 主运行接口 --- - def run(self, user_prompt: str, is_retry=False): + async def run(self, user_prompt: str, is_retry=False): # 如果是重试,跳过技能选择 if not is_retry: + await self.write_output({ + "status": "PROCESSING", + "hint": "寻找合适的skill" + }) skill_map = {n: v["description"] for n, v in self.registry.items()} - target = self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:") + target = await self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:") self.state["current_skill"] = target + await self.write_output({ + "status":"PROCESSING", + "hint": f"找到skill={target}" + }) skill_name = self.state["current_skill"] - if skill_name not in self.registry: return "Skill not found." + if skill_name not in self.registry: + await self.write_output({ + "status": "FAILED", + "error": f"技能名{skill_name}未注册" + }) + return "Skill not found." # 获取递归上下文 - context = self._get_expanded_context(skill_name, user_prompt) + context = await self._get_expanded_context(skill_name, user_prompt) # 决策:是直接回答还是执行脚本 - decision = self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作:EXEC: 或 ANSWER: ") + decision = await self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作:EXEC: 或 ANSWER: 或 REPLY: ") + if "REPLY" in decision: + await self.write_output({ + "status": "REPLY", + "quest": decision.split("REPLY:")[1].strip() + }) if "EXEC:" in decision: cmd = decision.split("EXEC:")[1].strip() - return self._execute_with_retry(cmd, skill_name) + output = await self._execute_with_retry(cmd, skill_name) + if not is_retry: await self.write_output(output) + return output - return decision.replace("ANSWER:", "").strip() + output = decision.replace("ANSWER:", "").strip() + await self.write_output(output) + return output