skillagent/skillagent/skillengine.py
2026-02-12 13:31:20 +08:00

261 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import asyncio
import yaml
from functools import partial
import hashlib
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from appPublic.log import info, debug, error, exception
from appPublic.jsonConfig import getConfig
from appPublic.uniqueID import getID
# 配置审计日志
class LLMHandler:
def __init__(self, request, llmid, apikey=None):
self.llmid = llmid
self.apikey = apikey
self.request = request
async def __call__(self, prompt):
env = ServerEnv()
kw = {
"stream": False,
"llmid": self.llmid,
"prompt": prompt
}
kw = DictObject(**kw)
txt = ''
async for d in env.inference_generator(self, request, params_kw=kw):
debug(f'{d=}, {type(d)=}')
txt += d.content
return txt
async def run_subprocess(command, cwd, env, timeout=30.0):
try:
# 启动子进程
# stdout/stderr 设置为 PIPE 对应原来的 capture_output=True
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=env,
# 注意asyncio 默认处理字节流,如果需要文本,后面需手动 decode
)
# 使用 wait_for 实现 timeout 功能
try:
# wait_for 会在超时后抛出 TimeoutError 并自动 cancel 协程
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
except asyncio.TimeoutError:
process.kill() # 超时强制杀掉进程
await process.wait() # 确保资源回收
raise Exception(f"{command} 执行超时")
# 对应 text=True手动解码
return {
"stdout": stdout.decode().strip(),
"stderr": stderr.decode().strip(),
"return_code": process.returncode
}
except Exception as e:
print(f"{command} 执行异常: {e}")
return None
class IndustrialSkillEngine:
def __init__(self, request, llmid: str, apikey: str=None):
self.request = request
config = getConfig()
skills_dir = config.skills_dir
self.root = Path(skills_dir).resolve()
self.llm = LLMHandler(request, llmid, apikey=apikey)
self.registry = {}
self.task_queue = asyncio.Queue(maxsize=20)
# 状态机:记录当前任务执行到的步骤
self.state = {"current_skill": None, "history": [], "pending_params": []}
async def write_output(self, data=None):
await self.task_queue.put(data)
# --- 1. 工业级初始化:依赖检查与索引 ---
async def boot(self, refresh=False):
env = self.request._run_ns
userid = await env.get_user()
key = f'skillregister_{userid}'
skills = await env.session_getvalue(key)
if not refresh and skills:
self.self.registry = skills
return
for skill_md in self.root.glob("**/SKILL.md"):
with open(skill_md, 'r', encoding='utf-8') as f:
content = f.read()
meta = yaml.safe_load(re.search(r'^---(.*?)---', content, re.DOTALL).group(1))
name = meta.get('name')
# 检查依赖 (Pre-flight check)
req_file = skill_md.parent / "requirements.txt"
has_deps = req_file.exists()
self.registry[name] = DictObject(**{
"root": skill_md.parent,
"meta": meta,
"content": content,
"has_deps": has_deps
})
await env.session_setvalue(key, self.registry)
# --- 2. 自动化依赖环境隔离 (venv 思想) ---
async def _ensure_dependencies(self, skill_name: str):
skill = self.registry[skill_name]
if skill["has_deps"]:
# 工业级引擎通常会检查一个隐藏的 .installed 标识
if not (skill["root"] / ".deps_installed").exists():
print(f"📦 正在为技能 {skill_name} 安装必要依赖...")
await run_subprocess(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"])
(skill["root"] / ".deps_installed").touch()
# --- 3. 增强版安全执行器:带重试逻辑与审计 ---
async def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str:
await self._ensure_dependencies(skill_name)
# 安全预检:禁止敏感命令
forbidden = ["rm ", "> /dev/", "chmod", "sudo"]
if any(f in command for f in forbidden):
return "🚫 安全风险:检测到非法指令,执行被拦截。"
info(f"Executing: {command} in {skill_name}")
try:
env = os.environ.copy()
env["SKILL_CONTEXT"] = skill_name
res = await run_subprocess(command, cwd=self.registry[skill_name]["root"], env=env, timeout=30)
if res.return_code != 0:
# 工业级特性:自动将错误回传给 LLM 进行自愈 (Self-healing)
error(f"Command failed: {res.stderr}")
if retry_count > 0:
print(f"⚠️ 执行失败,尝试让 AI 自愈修复参数...")
new_prompt = f"命令 '{command}' 失败,错误信息: {res.stderr}。请根据错误重新生成正确的命令,或提示用户补全参数。"
# 这里会递归调用逻辑进行修复
return await self._run(new_prompt, is_retry=True)
raise Exception(f"Error: {res.stderr}")
return res.stdout
except Exception as e:
return str(e)
# --- 4. 递归文档注入与意图路由 ---
async def _get_expanded_context(self, skill_name: str, user_prompt: str, context=None):
skill = self.registry[skill_name]
base_content = skill["content"]
# 扫描目录下的辅助文件夹
sub_dirs = ["reference", "examples"]
found_docs = []
for d in sub_dirs:
path = skill["root"] / d
if path.exists():
found_docs.extend([f"{d}/{f.name}" for f in path.glob("*.md")])
if found_docs:
# 仅在模型认为有必要时,“点餐式”加载
choice = await self.llm(f"上下文:{context or ''}\n用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)")
if choice != "None" and any(choice in doc for doc in found_docs):
with open(skill["root"] / choice, 'r') as f:
output = {
"status": "PROCESSING",
"hint": f"📂 深度加载: {choice}"
}
await self.write_output(output)
base_content += f"\n\n--- 深度参考 ({choice}) ---\n{f.read()}"
return base_content
async def reference(self, user_prompt:str, context: str=None, is_retry: bool=False):
f = partial(self.run, user_prompt, context=context, is_retry=is_retry)
asyncio.create_task(f())
while True:
data = await self.task_queue.get()
if not data:
break;
debug(f'{data=}, {type(data)=}')
yield data
await asyncio.sleep(0.1)
# --- 5. 主运行接口 ---
async def run(self, parmas_kw):
try:
user_input = json.dumps(params_kw, ensure_ascii=False)
await self._run(user_input)
except Exception as e:
await self.write_output({
'status': 'FAILED',
'error': f"{e}"
})
self.write_output(None)
async def _run(self, user_prompt: str, context=None, is_retry=False):
# 如果是重试,跳过技能选择
await self.boot()
debug(f'{self.registry=}')
if not is_retry:
await self.write_output({
"status": "PROCESSING",
"hint": "寻找合适的skill"
})
skill_map = {n: v.meta.description for n, v in self.registry.items()}
target = await self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:")
self.state["current_skill"] = target
await self.write_output({
"status":"PROCESSING",
"hint": f"找到skill={target}"
})
skill_name = self.state["current_skill"]
if skill_name not in self.registry:
raise Exception(f"技能名{skill_name}未注册")
# 获取递归上下文
context = await self._get_expanded_context(skill_name, user_prompt, context=context)
# 决策:是直接回答还是执行脚本
decision = await self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作EXEC: <command> 或 ANSWER: <text> 或 REPLY: <question>")
if "REPLY" in decision:
sessionkey = getID()
await self.write_output({
"status": "REPLY",
"reply": {
"questionkey": sessionkey(),
"question": decision.split("REPLY:")[1].strip()
}
})
env = self.request._run_ns
user_reply = await env.session_getvalue(sessionkey)
while not user_reply:
asyncio.sleep(0.5)
user_reply = await env.session_getvalue(sessionkey)
prompt = f"{user_prompt}\n补充输入:{user_reply}"
await self._run(prompt, context=context, is_retry=True)
return
if "EXEC:" in decision:
cmd = decision.split("EXEC:")[1].strip()
output = await self._execute_with_retry(cmd, skill_name)
await self.write_output(output)
return
if "ANSWER:" in decision:
output = decision.replace("ANSWER:", "").strip()
await self.write_output(output)
return output
debug(f' undefined decision:{decision}')