From 6abdbc6538998f194d195c35e2762fbe0f5cb9d5 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Mon, 26 Jan 2026 11:04:09 +0800 Subject: [PATCH] bugfix --- skillagent/skillengine.py | 139 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 skillagent/skillengine.py diff --git a/skillagent/skillengine.py b/skillagent/skillengine.py new file mode 100644 index 0000000..de76da4 --- /dev/null +++ b/skillagent/skillengine.py @@ -0,0 +1,139 @@ +import os +import re +import yaml +import logging +import subprocess +import hashlib +from pathlib import Path +from datetime import datetime +from typing import List, Dict, Any + +# 配置审计日志 +logging.basicConfig( + filename='skill_audit.log', + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(message)s' +) + +class IndustrialSkillEngine: + def __init__(self, skills_dir: str, llm_handler): + self.root = Path(skills_dir).resolve() + self.llm = llm_handler + self.registry = {} + self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8] + # 状态机:记录当前任务执行到的步骤 + self.state = {"current_skill": None, "history": [], "pending_params": []} + + # --- 1. 工业级初始化:依赖检查与索引 --- + def boot(self): + for skill_md in self.root.glob("**/SKILL.md"): + with open(skill_md, 'r', encoding='utf-8') as f: + content = f.read() + meta = yaml.safe_load(re.search(r'^---(.*?)---', content, re.DOTALL).group(1)) + name = meta.get('name') + + # 检查依赖 (Pre-flight check) + req_file = skill_md.parent / "requirements.txt" + has_deps = req_file.exists() + + self.registry[name] = { + "root": skill_md.parent, + "meta": meta, + "content": content, + "has_deps": has_deps + } + logging.info(f"Engine Booted. Session: {self.session_id}. Skills: {list(self.registry.keys())}") + + # --- 2. 自动化依赖环境隔离 (venv 思想) --- + def _ensure_dependencies(self, skill_name: str): + skill = self.registry[skill_name] + if skill["has_deps"]: + # 工业级引擎通常会检查一个隐藏的 .installed 标识 + if not (skill["root"] / ".deps_installed").exists(): + print(f"📦 正在为技能 {skill_name} 安装必要依赖...") + subprocess.run(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"]) + (skill["root"] / ".deps_installed").touch() + + # --- 3. 增强版安全执行器:带重试逻辑与审计 --- + def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str: + self._ensure_dependencies(skill_name) + + # 安全预检:禁止敏感命令 + forbidden = ["rm ", "> /dev/", "chmod", "sudo"] + if any(f in command for f in forbidden): + return "🚫 安全风险:检测到非法指令,执行被拦截。" + + # 权限确认 (Y/N) + print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}") + confirm = input("是否授权执行?(y/n/skip): ").lower() + if confirm != 'y': return "Execution skipped by user." + + logging.info(f"Executing: {command} in {skill_name}") + + try: + env = os.environ.copy() + env["SKILL_CONTEXT"] = skill_name + + res = subprocess.run(command, shell=True, cwd=self.registry[skill_name]["root"], + env=env, capture_output=True, text=True, timeout=30) + + if res.return_code != 0: + # 工业级特性:自动将错误回传给 LLM 进行自愈 (Self-healing) + logging.error(f"Command failed: {res.stderr}") + if retry_count > 0: + print(f"⚠️ 执行失败,尝试让 AI 自愈修复参数...") + new_prompt = f"命令 '{command}' 失败,错误信息: {res.stderr}。请根据错误重新生成正确的命令,或提示用户补全参数。" + # 这里会递归调用逻辑进行修复 + return self.run(new_prompt, is_retry=True) + return f"Error: {res.stderr}" + + return res.stdout + except Exception as e: + return str(e) + + # --- 4. 递归文档注入与意图路由 --- + def _get_expanded_context(self, skill_name: str, user_prompt: str): + skill = self.registry[skill_name] + base_content = skill["content"] + + # 扫描目录下的辅助文件夹 + sub_dirs = ["reference", "examples"] + found_docs = [] + for d in sub_dirs: + path = skill["root"] / d + if path.exists(): + found_docs.extend([f"{d}/{f.name}" for f in path.glob("*.md")]) + + if found_docs: + # 仅在模型认为有必要时,“点餐式”加载 + choice = self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)") + if choice != "None" and any(choice in doc for doc in found_docs): + with open(skill["root"] / choice, 'r') as f: + print(f"📂 深度加载: {choice}") + base_content += f"\n\n--- 深度参考 ({choice}) ---\n{f.read()}" + + return base_content + + # --- 5. 主运行接口 --- + def run(self, user_prompt: str, is_retry=False): + # 如果是重试,跳过技能选择 + if not is_retry: + skill_map = {n: v["description"] for n, v in self.registry.items()} + target = self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:") + self.state["current_skill"] = target + + skill_name = self.state["current_skill"] + if skill_name not in self.registry: return "Skill not found." + + # 获取递归上下文 + context = self._get_expanded_context(skill_name, user_prompt) + + # 决策:是直接回答还是执行脚本 + decision = self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作:EXEC: 或 ANSWER: ") + + if "EXEC:" in decision: + cmd = decision.split("EXEC:")[1].strip() + return self._execute_with_retry(cmd, skill_name) + + return decision.replace("ANSWER:", "").strip() +