skillagent/skillagent/skillengine.py
2026-01-26 11:04:09 +08:00

140 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import yaml
import logging
import subprocess
import hashlib
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
# 配置审计日志
logging.basicConfig(
filename='skill_audit.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
class IndustrialSkillEngine:
def __init__(self, skills_dir: str, llm_handler):
self.root = Path(skills_dir).resolve()
self.llm = llm_handler
self.registry = {}
self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8]
# 状态机:记录当前任务执行到的步骤
self.state = {"current_skill": None, "history": [], "pending_params": []}
# --- 1. 工业级初始化:依赖检查与索引 ---
def boot(self):
for skill_md in self.root.glob("**/SKILL.md"):
with open(skill_md, 'r', encoding='utf-8') as f:
content = f.read()
meta = yaml.safe_load(re.search(r'^---(.*?)---', content, re.DOTALL).group(1))
name = meta.get('name')
# 检查依赖 (Pre-flight check)
req_file = skill_md.parent / "requirements.txt"
has_deps = req_file.exists()
self.registry[name] = {
"root": skill_md.parent,
"meta": meta,
"content": content,
"has_deps": has_deps
}
logging.info(f"Engine Booted. Session: {self.session_id}. Skills: {list(self.registry.keys())}")
# --- 2. 自动化依赖环境隔离 (venv 思想) ---
def _ensure_dependencies(self, skill_name: str):
skill = self.registry[skill_name]
if skill["has_deps"]:
# 工业级引擎通常会检查一个隐藏的 .installed 标识
if not (skill["root"] / ".deps_installed").exists():
print(f"📦 正在为技能 {skill_name} 安装必要依赖...")
subprocess.run(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"])
(skill["root"] / ".deps_installed").touch()
# --- 3. 增强版安全执行器:带重试逻辑与审计 ---
def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str:
self._ensure_dependencies(skill_name)
# 安全预检:禁止敏感命令
forbidden = ["rm ", "> /dev/", "chmod", "sudo"]
if any(f in command for f in forbidden):
return "🚫 安全风险:检测到非法指令,执行被拦截。"
# 权限确认 (Y/N)
print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}")
confirm = input("是否授权执行?(y/n/skip): ").lower()
if confirm != 'y': return "Execution skipped by user."
logging.info(f"Executing: {command} in {skill_name}")
try:
env = os.environ.copy()
env["SKILL_CONTEXT"] = skill_name
res = subprocess.run(command, shell=True, cwd=self.registry[skill_name]["root"],
env=env, capture_output=True, text=True, timeout=30)
if res.return_code != 0:
# 工业级特性:自动将错误回传给 LLM 进行自愈 (Self-healing)
logging.error(f"Command failed: {res.stderr}")
if retry_count > 0:
print(f"⚠️ 执行失败,尝试让 AI 自愈修复参数...")
new_prompt = f"命令 '{command}' 失败,错误信息: {res.stderr}。请根据错误重新生成正确的命令,或提示用户补全参数。"
# 这里会递归调用逻辑进行修复
return self.run(new_prompt, is_retry=True)
return f"Error: {res.stderr}"
return res.stdout
except Exception as e:
return str(e)
# --- 4. 递归文档注入与意图路由 ---
def _get_expanded_context(self, skill_name: str, user_prompt: str):
skill = self.registry[skill_name]
base_content = skill["content"]
# 扫描目录下的辅助文件夹
sub_dirs = ["reference", "examples"]
found_docs = []
for d in sub_dirs:
path = skill["root"] / d
if path.exists():
found_docs.extend([f"{d}/{f.name}" for f in path.glob("*.md")])
if found_docs:
# 仅在模型认为有必要时,“点餐式”加载
choice = self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)")
if choice != "None" and any(choice in doc for doc in found_docs):
with open(skill["root"] / choice, 'r') as f:
print(f"📂 深度加载: {choice}")
base_content += f"\n\n--- 深度参考 ({choice}) ---\n{f.read()}"
return base_content
# --- 5. 主运行接口 ---
def run(self, user_prompt: str, is_retry=False):
# 如果是重试,跳过技能选择
if not is_retry:
skill_map = {n: v["description"] for n, v in self.registry.items()}
target = self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:")
self.state["current_skill"] = target
skill_name = self.state["current_skill"]
if skill_name not in self.registry: return "Skill not found."
# 获取递归上下文
context = self._get_expanded_context(skill_name, user_prompt)
# 决策:是直接回答还是执行脚本
decision = self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作EXEC: <command> 或 ANSWER: <text>")
if "EXEC:" in decision:
cmd = decision.split("EXEC:")[1].strip()
return self._execute_with_retry(cmd, skill_name)
return decision.replace("ANSWER:", "").strip()