This commit is contained in:
yumoqing 2026-02-10 18:19:32 +08:00
parent 96242f49b3
commit 77ab6ba30c
2 changed files with 115 additions and 23 deletions

View File

@ -1,5 +1,8 @@
from ahserver.ServerEnv import ServerEnv
from .agent import skillagent
from skillengine import IndustrialSkillEngine
def load_skillagent():
env = ServerEnv()
env.engine = IndustrialSkillEngine
env.skillagent = skillagent

View File

@ -1,5 +1,6 @@
import os
import re
import asyncio
import yaml
import logging
import subprocess
@ -7,6 +8,8 @@ import hashlib
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
form ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
# 配置审计日志
logging.basicConfig(
@ -15,15 +18,77 @@ logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s'
)
class LLMHandler:
def __init__(self, request, llmid, apikey=None):
self.llmid = llmid
self.apikey = apikey
self.request = request
async def __call__(self, prompt):
env = ServerEnv()
kw = {
"stream": False,
"llmid": self.llmid,
"prompt": prompt
}
kw = DictObject(**kw)
r = await env.inference(self, request, params_kw=kw)
return r.content
async def run_subprocess(command, cwd, env, timeout=30.0):
try:
# 启动子进程
# stdout/stderr 设置为 PIPE 对应原来的 capture_output=True
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=env,
# 注意asyncio 默认处理字节流,如果需要文本,后面需手动 decode
)
# 使用 wait_for 实现 timeout 功能
try:
# wait_for 会在超时后抛出 TimeoutError 并自动 cancel 协程
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
except asyncio.TimeoutError:
process.kill() # 超时强制杀掉进程
await process.wait() # 确保资源回收
raise Exception(f"{command} 执行超时")
# 对应 text=True手动解码
return {
"stdout": stdout.decode().strip(),
"stderr": stderr.decode().strip(),
"return_code": process.returncode
}
except Exception as e:
print(f"{command} 执行异常: {e}")
return None
class IndustrialSkillEngine:
def __init__(self, skills_dir: str, llm_handler):
def __init__(self, request, skills_dir: str, llmid, apikey=None):
self.root = Path(skills_dir).resolve()
self.llm = llm_handler
self.llm = LLMHandler(request, llmid, apikey=apikey)
self.registry = {}
self.task_queue = asyncio.Queue(maxsize=20)
self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8]
# 状态机:记录当前任务执行到的步骤
self.state = {"current_skill": None, "history": [], "pending_params": []}
async def write_output(self, data=None):
await self.task_queue.put(data)
async def gen_output(self):
while True:
data = await self.task_queue.get()
if not data:
break;
yield data
asyncio.sleep(0.1)
# --- 1. 工业级初始化:依赖检查与索引 ---
def boot(self):
for skill_md in self.root.glob("**/SKILL.md"):
@ -44,19 +109,19 @@ class IndustrialSkillEngine:
}
logging.info(f"Engine Booted. Session: {self.session_id}. Skills: {list(self.registry.keys())}")
# --- 2. 自动化依赖环境隔离 (venv 思想) ---
def _ensure_dependencies(self, skill_name: str):
# --- 2. 自动化依赖环境隔离 (venv 思想) ---
async def _ensure_dependencies(self, skill_name: str):
skill = self.registry[skill_name]
if skill["has_deps"]:
# 工业级引擎通常会检查一个隐藏的 .installed 标识
if not (skill["root"] / ".deps_installed").exists():
print(f"📦 正在为技能 {skill_name} 安装必要依赖...")
subprocess.run(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"])
await run_subprocess(["pip", "install", "-r", "requirements.txt"], cwd=skill["root"])
(skill["root"] / ".deps_installed").touch()
# --- 3. 增强版安全执行器:带重试逻辑与审计 ---
def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str:
self._ensure_dependencies(skill_name)
async def _execute_with_retry(self, command: str, skill_name: str, retry_count=1) -> str:
await self._ensure_dependencies(skill_name)
# 安全预检:禁止敏感命令
forbidden = ["rm ", "> /dev/", "chmod", "sudo"]
@ -64,9 +129,9 @@ class IndustrialSkillEngine:
return "🚫 安全风险:检测到非法指令,执行被拦截。"
# 权限确认 (Y/N)
print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}")
confirm = input("是否授权执行?(y/n/skip): ").lower()
if confirm != 'y': return "Execution skipped by user."
# print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}")
# confirm = input("是否授权执行?(y/n/skip): ").lower()
# if confirm != 'y': return "Execution skipped by user."
logging.info(f"Executing: {command} in {skill_name}")
@ -74,8 +139,7 @@ class IndustrialSkillEngine:
env = os.environ.copy()
env["SKILL_CONTEXT"] = skill_name
res = subprocess.run(command, shell=True, cwd=self.registry[skill_name]["root"],
env=env, capture_output=True, text=True, timeout=30)
res = await run_subprocess(command, cwd=self.registry[skill_name]["root"], env=env, timeout=30)
if res.return_code != 0:
# 工业级特性:自动将错误回传给 LLM 进行自愈 (Self-healing)
@ -84,7 +148,7 @@ class IndustrialSkillEngine:
print(f"⚠️ 执行失败,尝试让 AI 自愈修复参数...")
new_prompt = f"命令 '{command}' 失败,错误信息: {res.stderr}。请根据错误重新生成正确的命令,或提示用户补全参数。"
# 这里会递归调用逻辑进行修复
return self.run(new_prompt, is_retry=True)
return await self.run(new_prompt, is_retry=True)
return f"Error: {res.stderr}"
return res.stdout
@ -92,7 +156,7 @@ class IndustrialSkillEngine:
return str(e)
# --- 4. 递归文档注入与意图路由 ---
def _get_expanded_context(self, skill_name: str, user_prompt: str):
async def _get_expanded_context(self, skill_name: str, user_prompt: str):
skill = self.registry[skill_name]
base_content = skill["content"]
@ -106,34 +170,59 @@ class IndustrialSkillEngine:
if found_docs:
# 仅在模型认为有必要时,“点餐式”加载
choice = self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)")
choice = await self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)")
if choice != "None" and any(choice in doc for doc in found_docs):
with open(skill["root"] / choice, 'r') as f:
print(f"📂 深度加载: {choice}")
await self.write_output({
"status": "PROCESSING",
"hint": f"📂 深度加载: {choice}"
})
base_content += f"\n\n--- 深度参考 ({choice}) ---\n{f.read()}"
return base_content
# --- 5. 主运行接口 ---
def run(self, user_prompt: str, is_retry=False):
async def run(self, user_prompt: str, is_retry=False):
# 如果是重试,跳过技能选择
if not is_retry:
await self.write_output({
"status": "PROCESSING",
"hint": "寻找合适的skill"
})
skill_map = {n: v["description"] for n, v in self.registry.items()}
target = self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:")
target = await self.llm(f"用户意图: {user_prompt}\n可选技能清单: {skill_map}\n请返回匹配的技能名:")
self.state["current_skill"] = target
await self.write_output({
"status":"PROCESSING",
"hint": f"找到skill={target}"
})
skill_name = self.state["current_skill"]
if skill_name not in self.registry: return "Skill not found."
if skill_name not in self.registry:
await self.write_output({
"status": "FAILED",
"error": f"技能名{skill_name}未注册"
})
return "Skill not found."
# 获取递归上下文
context = self._get_expanded_context(skill_name, user_prompt)
context = await self._get_expanded_context(skill_name, user_prompt)
# 决策:是直接回答还是执行脚本
decision = self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作EXEC: <command> 或 ANSWER: <text>")
decision = await self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作EXEC: <command> 或 ANSWER: <text> 或 REPLY: <question>")
if "REPLY" in decision:
await self.write_output({
"status": "REPLY",
"quest": decision.split("REPLY:")[1].strip()
})
if "EXEC:" in decision:
cmd = decision.split("EXEC:")[1].strip()
return self._execute_with_retry(cmd, skill_name)
output = await self._execute_with_retry(cmd, skill_name)
if not is_retry: await self.write_output(output)
return output
return decision.replace("ANSWER:", "").strip()
output = decision.replace("ANSWER:", "").strip()
await self.write_output(output)
return output