diff --git a/skillagent/skillengine.py b/skillagent/skillengine.py index 1bec519..e40323e 100644 --- a/skillagent/skillengine.py +++ b/skillagent/skillengine.py @@ -2,21 +2,16 @@ import os import re import asyncio import yaml -import logging -import subprocess import hashlib from pathlib import Path from datetime import datetime from typing import List, Dict, Any form ahserver.serverenv import ServerEnv from appPublic.dictObject import DictObject +from appPublic.log import info, debug, error, exception +from appPublic.uniqueID import getID # 配置审计日志 -logging.basicConfig( - filename='skill_audit.log', - level=logging.INFO, - format='%(asctime)s [%(levelname)s] %(message)s' -) class LLMHandler: def __init__(self, request, llmid, apikey=None): @@ -32,8 +27,11 @@ class LLMHandler: "prompt": prompt } kw = DictObject(**kw) - r = await env.inference(self, request, params_kw=kw) - return r.content + txt = '' + async for d in env.inference_generator(self, request, params_kw=kw): + debug(f'{d=}, {type(d)=}') + txt += d.content + return txt async def run_subprocess(command, cwd, env, timeout=30.0): try: @@ -69,28 +67,29 @@ async def run_subprocess(command, cwd, env, timeout=30.0): return None class IndustrialSkillEngine: - def __init__(self, request, skills_dir: str, llmid, apikey=None): + def __init__(self, request, llmid: str, apikey: str=None): + config = getConfig() + skills_dir = config.skills_dir self.root = Path(skills_dir).resolve() self.llm = LLMHandler(request, llmid, apikey=apikey) self.registry = {} self.task_queue = asyncio.Queue(maxsize=20) - self.session_id = hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8] # 状态机:记录当前任务执行到的步骤 self.state = {"current_skill": None, "history": [], "pending_params": []} async def write_output(self, data=None): await self.task_queue.put(data) - async def gen_output(self): - while True: - data = await self.task_queue.get() - if not data: - break; - yield data - asyncio.sleep(0.1) - # --- 1. 工业级初始化:依赖检查与索引 --- - def boot(self): + def boot(self, refresh=False): + env = self.request._run_ns + userid = await env.get_user() + key = f'skillregister_{userid}' + skills = await env.session_getvalue(key) + if not refresh and skills: + self.self.registry = skills + return + for skill_md in self.root.glob("**/SKILL.md"): with open(skill_md, 'r', encoding='utf-8') as f: content = f.read() @@ -107,7 +106,7 @@ class IndustrialSkillEngine: "content": content, "has_deps": has_deps } - logging.info(f"Engine Booted. Session: {self.session_id}. Skills: {list(self.registry.keys())}") + await session_setvalue(key, self.registry) # --- 2. 自动化依赖环境隔离 (venv 思想) --- async def _ensure_dependencies(self, skill_name: str): @@ -128,12 +127,7 @@ class IndustrialSkillEngine: if any(f in command for f in forbidden): return "🚫 安全风险:检测到非法指令,执行被拦截。" - # 权限确认 (Y/N) - # print(f"\n\033[1;33m[Audit ID: {self.session_id}]\033[0m 请求执行: {command}") - # confirm = input("是否授权执行?(y/n/skip): ").lower() - # if confirm != 'y': return "Execution skipped by user." - - logging.info(f"Executing: {command} in {skill_name}") + info(f"Executing: {command} in {skill_name}") try: env = os.environ.copy() @@ -143,20 +137,25 @@ class IndustrialSkillEngine: if res.return_code != 0: # 工业级特性:自动将错误回传给 LLM 进行自愈 (Self-healing) - logging.error(f"Command failed: {res.stderr}") + error(f"Command failed: {res.stderr}") if retry_count > 0: print(f"⚠️ 执行失败,尝试让 AI 自愈修复参数...") new_prompt = f"命令 '{command}' 失败,错误信息: {res.stderr}。请根据错误重新生成正确的命令,或提示用户补全参数。" # 这里会递归调用逻辑进行修复 return await self.run(new_prompt, is_retry=True) - return f"Error: {res.stderr}" + + await self.write_output({ + "status": "FAILED", + "error": f"Error: {res.stderr}" + }) + raise Exception(f"Error: {res.stderr}") return res.stdout except Exception as e: return str(e) # --- 4. 递归文档注入与意图路由 --- - async def _get_expanded_context(self, skill_name: str, user_prompt: str): + async def _get_expanded_context(self, skill_name: str, user_prompt: str, context=None): skill = self.registry[skill_name] base_content = skill["content"] @@ -170,7 +169,7 @@ class IndustrialSkillEngine: if found_docs: # 仅在模型认为有必要时,“点餐式”加载 - choice = await self.llm(f"用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)") + choice = await self.llm(f"上下文:{context or ''}\n用户问题: {user_prompt}\n可选深入文档: {found_docs}\n需要读取哪个?(仅返回路径,不需则返回 None)") if choice != "None" and any(choice in doc for doc in found_docs): with open(skill["root"] / choice, 'r') as f: await self.write_output({ @@ -181,9 +180,21 @@ class IndustrialSkillEngine: return base_content + async def reference(self, user_prompt:str, context: str=None, is_retry: boolean=False): + f = partial(self.run, user_prompt, context=context, is_retry=is_retry) + background_coro(f) + while True: + data = await self.task_queue.get() + if not data: + break; + yield data + asyncio.sleep(0.1) + + # --- 5. 主运行接口 --- - async def run(self, user_prompt: str, is_retry=False): + async def run(self, user_prompt: str, context=None, is_retry=False): # 如果是重试,跳过技能选择 + self.boot() if not is_retry: await self.write_output({ "status": "PROCESSING", @@ -203,26 +214,38 @@ class IndustrialSkillEngine: "status": "FAILED", "error": f"技能名{skill_name}未注册" }) - return "Skill not found." + raise Exception("Skill not found.") # 获取递归上下文 - context = await self._get_expanded_context(skill_name, user_prompt) + context = await self._get_expanded_context(skill_name, user_prompt, context=context) # 决策:是直接回答还是执行脚本 decision = await self.llm(f"上下文: {context}\n问题: {user_prompt}\n决定动作:EXEC: 或 ANSWER: 或 REPLY: ") if "REPLY" in decision: + sessionkey = getID() await self.write_output({ "status": "REPLY", - "quest": decision.split("REPLY:")[1].strip() + "reply": { + "questionkey": sessionkey(), + "question": decision.split("REPLY:")[1].strip() + } }) - if "EXEC:" in decision: + env = self.request._run_ns + user_reply = await env.session_getvalue(sessionkey) + while not user_reply: + asyncio.sleep(0.5) + user_reply = await env.session_getvalue(sessionkey) + prompt = f"{user_prompt}\n补充输入:{user_reply}" + await self.run(prompt, context=context, is_retry=True) + return + if "EXEC:" in decision: cmd = decision.split("EXEC:")[1].strip() output = await self._execute_with_retry(cmd, skill_name) - if not is_retry: await self.write_output(output) + await self.write_output(output) + return + if "ANSWER:" in decision + output = decision.replace("ANSWER:", "").strip() + await self.write_output(output) return output - - output = decision.replace("ANSWER:", "").strip() - await self.write_output(output) - return output - + debug(f' undefined decision:{decision}') diff --git a/wwwroot/inference.dspy b/wwwroot/inference.dspy new file mode 100644 index 0000000..58a5ad7 --- /dev/null +++ b/wwwroot/inference.dspy @@ -0,0 +1,5 @@ +prompt = params_kw.prompt +llmid = params_kw.llmid +engine = IndustrialSkillEngine(request, llmid) +f = partial(engine.reference, prompt) +return await env.stream_response(request, f) diff --git a/wwwroot/question_answered.dspy b/wwwroot/question_answered.dspy new file mode 100644 index 0000000..f4bbb5b --- /dev/null +++ b/wwwroot/question_answered.dspy @@ -0,0 +1,6 @@ +questionkey = params_kw.questionkey +answer = params_kw.answer +await session_setvalue(questionkey, answer) +return { + "status":"OK" +}