diff --git a/skillagent/agent.py b/skillagent/agent.py index e57e5e3..6807dc0 100644 --- a/skillagent/agent.py +++ b/skillagent/agent.py @@ -1,43 +1,282 @@ -import os, json -from typing import Dict -from pydantic import ValidationError +import json +import asyncio +from typing import List, Optional +from dataclasses import dataclass, field +from pydantic import BaseModel, Field, ValidationError +from typing import Literal +from .skillkit_wrapper import SkillkitWrapper -class DAGNode: - def __init__(self, skill, script=None, params=None): +# --------------------------- +# Skill Decision / PlanState +# --------------------------- +@dataclass +class SkillDecision: + skill: str + params: dict + reason: Optional[str] = None + +@dataclass +class PlanState: + user_intent: str + skill: str + script: str + params: dict + missing: List[str] = field(default_factory=list) + +# --------------------------- +# 自定义异常 +# --------------------------- +class MissingParams(Exception): + def __init__(self, skill: str, fields: List[str]): self.skill = skill - self.script = script - self.params = params or {} - self.state = "pending" - self.missing = [] + self.fields = fields -class SkillAgent: - def __init__(self, skill_loader, skillkit_wrapper, llm): - self.skill_loader = skill_loader - self.skillkit = skillkit_wrapper +# --------------------------- +# LLM 接口(可替换为你的模型) +# --------------------------- +class LLM: + async def complete(self, prompt: str) -> str: + raise NotImplementedError + +# --------------------------- +# DummyLLM 示例(测试用) +# --------------------------- +class DummyLLM(LLM): + def __init__(self, llmid, apikey): + self.llmid = llmid + self.akikey = apikey + + async def complete(self, prompt: str) -> str: + hc = StreamHttpClient() + headers = { + 'Authorization': f'Bearer {self.apikey}', + 'Content-Type': 'application/json' + } + d = { + 'llmid': self.llmid, + 'prompt': prompt + } + reco = hc('POST', url, headers=headers, data=json.dumps(d)) + doc = '' + async for chunk in liner(reco): + try: + d = json.loads(chunk) + except Exception as e: + print(f'****{chunk=} error {e} {format_exc()}') + continue + if d.get('content'): + doc = f'{doc}{d["content"]}' + else: + print(f'{f}:{d} error') + return json.loads(doc) + +# --------------------------- +# Agent 实现 +# --------------------------- +class Agent: + def __init__(self, llm: LLM, skillkit): self.llm = llm - self.dag = [] + self.skillkit = skillkit + self.skills = None + self.loaded = False + def load_skills(self): + if self.loaded: + return + self.skills = self.skillkit.list_skills() + for s in self.skills: + self.skillkit.load_skill(s.name) + + # --------------------------- + # plan: 多 skill 候选 + 参数抽取 + # --------------------------- async def plan(self, user_text: str): - skills = self.skill_loader.list_skills() - skill_docs = "\n".join([open(os.path.join(self.skill_loader.skillspath, s, "skill.md"), encoding="utf-8").read() for s in skills]) - prompt = f""" -User request: {user_text} -Available Skills: -{skill_docs} + self.load_skills() + candidates = await self._candidate_skills(user_text) + decision = await self._plan_with_candidates(user_text, candidates) + try: + validated_params = self._validate_params(decision) + except MissingParams as e: + question = await self._ask_user_for_params(user_text, decision.skill, e.fields) + state = PlanState( + user_intent=user_text, + skill=decision.skill, + script=decision.script, + params=decision.params, + missing=e.fields + ) + return { + "type": "clarification", + "state": state, + "question": question + } + return { + "type": "script_call", + "script": decision.script, + "skill": decision.skill, + "params": validated_params, + "reason": decision.reason + } -Task: choose the most suitable skill and script. Output JSON: {{ "skill": "", "script": "" }} + def get_scripts(self, skillname): + return self.skillkit.get_skill_scripts(skillname) + + # --------------------------- + # resume: 补 missing 参数 + # --------------------------- + async def resume(self, state: PlanState, user_reply: str): + skill_spec = next(s for s in self.skills if s.name == state.skill) + schema_fields = next(s.params for s in skill.scripts if s.name==state.script) + if schema_fields is None: + schema_fields = [] + prompt = f""" +You are an agent helping a user fill parameters for a skill. + +Skill name: {state.skill} +Script name: {state.script} +Skill required parameters: {schema_fields} + +User original intent: +\"\"\"{state.user_intent}\"\"\" + +Known parameters: +{state.params} + +Missing parameters: +{state.missing} + +User reply: +\"\"\"{user_reply}\"\"\" + +Task: +- Extract values ONLY for missing parameters. +- Do NOT modify existing known parameters. +- All output must match the skill parameter schema. +- Output JSON only with the missing parameters. """ raw = await self.llm.complete(prompt) - data = json.loads(raw) - node = DAGNode(skill=data["skill"], script=data["script"]) - self.dag.append(node) - return node + new_params = json.loads(raw) - async def resume(self, node: DAGNode, user_params: dict = None, schema=None): + state.params.update(new_params) + + # 校验 schema try: - validated = schema(**(user_params or node.params)) - node.params = validated.dict() + validated = self._validate_params(SkillDecision(skill=state.skill, params=state.params)) + except MissingParams as e: + state.missing = e.fields + question = await self._ask_user_for_params(state.user_intent, state.skill, e.fields) + return {"type": "clarification", "state": state, "question": question} + + # 参数完整,返回可直接调用 skill + return {"type": "skill_call", "skill": state.skill, "params": validated} + + # --------------------------- + # 内部方法 + # --------------------------- + + def scripts_info(self, skill): + d = [] + for s in skill.scripts: + d.append( f'name:{s.name}, description:{s.description}, params:{str(s.params}' + return "Scripts: '::'.join(d) + + async def _candidate_skills(self, user_text: str): + skill_list = "\n".join(f"- skillname:{s.name}({s.description}): {self.scripts_info(s)}" for s in self.skills) + prompt = f""" +User request: +\"\"\"{user_text}\"\"\" + +Available skills: +{skill_list} + +Task: +Select up to 3 most relevant skill's scripts. + +Output JSON list only. +""" + raw = await self.llm.complete(prompt) + return json.loads(raw) + + async def _plan_with_candidates(self, user_text: str, candidates: list[str]): + specs = [s for s in self.skills if s.name in candidates] + spec_desc = "\n".join( + f"- {s.name}: inputs={list(s.schema.model_fields.keys())}" for s in specs if s.schema + ) + prompt = f""" +User request: +\"\"\"{user_text}\"\"\" + +Candidate skills: +{spec_desc} + +Task: +1. Choose the best skill. +2. Extract parameters strictly matching schema. + +Rules: +- If a required parameter is missing, set it to null. +- Output JSON only. + +Output: +{{ + "skill": "...", + "script": "...", + "params": {{ ... }}, + "reason": "..." +}} +""" + raw = await self.llm.complete(prompt) + return SkillDecision(**json.loads(raw)) + + def _validate_params(self, decision: SkillDecision): + spec = next(s for s in self.skills if s.name == decision.skill) + if not spec.schema: + return decision.params + try: + return spec.schema(**decision.params).dict() except ValidationError as e: - node.missing = [err['loc'][0] for err in e.errors()] - return node.missing - return self.skillkit.invoke_skill(node.skill, node.script, node.params) + missing = [err["loc"][0] for err in e.errors() if err["type"] == "missing"] + if missing: + raise MissingParams(decision.skill, missing) + raise + + async def _ask_user_for_params(self, user_text: str, skill: str, fields: List[str]): + prompt = f""" +User request: +\"\"\"{user_text}\"\"\" + +The script "{script} in skill "{skill}" requires the following missing parameters: +{fields} + +Ask the user a concise clarification question. +""" + return await self.llm.complete(prompt) + +# --------------------------- +# 测试运行 +# --------------------------- +async def skillagent(llm, apikey, user_skillroot, sys_skillroot): + llm = DummyLLM('8L4hFJ4QpSMyu1UP03Juo', 'eYgNuD6sVQgbj-khOOUNU') + skillkit = SkillKitWrapper(skill_rootpath) + agent = Agent(llm, skillkit) + + while True: + print('What you want to do?') + prompt=input() + if not prompt: + continue + result = await agent.plan(prompt) + while result['type'] == 'clarification': + print(result['question']) + user_reply = input() + result = await agent.resume(result["state"], user_reply) + if result['type'] == 'skill_call': + agent.skillkit.execute_skill_script( + result['skill'], + result['script'], + params=result['params'] + ) + else: + print(result) + + diff --git a/skillagent/skill_loader.py b/skillagent/skill_loader.py deleted file mode 100644 index 9d296cd..0000000 --- a/skillagent/skill_loader.py +++ /dev/null @@ -1,25 +0,0 @@ -import os, re -from typing import List, Dict - -class SkillLoader: - def __init__(self, skillspath: str): - self.skillspath = skillspath - - def list_skills(self) -> List[str]: - return [name for name in os.listdir(self.skillspath) - if os.path.isdir(os.path.join(self.skillspath, name))] - - def parse_skill_md(self, skill_name: str) -> List[Dict]: - md_path = os.path.join(self.skillspath, skill_name, "skill.md") - if not os.path.exists(md_path): - return [] - scripts = [] - md_text = open(md_path, encoding="utf-8").read() - script_blocks = re.findall(r"### (.+?)\n- 功能: (.+?)\n- 参数: (.+)", md_text) - for name, desc, params in script_blocks: - scripts.append({ - "name": name.strip(), - "description": desc.strip(), - "params": [p.strip() for p in params.split(",")] - }) - return scripts diff --git a/skillagent/skillkit_wrapper.py b/skillagent/skillkit_wrapper.py index b8f7f8a..7935464 100644 --- a/skillagent/skillkit_wrapper.py +++ b/skillagent/skillkit_wrapper.py @@ -1,7 +1,55 @@ +import os from skillkit import SkillManager +import yaml +from pathlib import Path +from typing import Dict, Any + +def find_missing_params( + input_schema: Dict[str, Any], + provided_params: Dict[str, Any], +) -> list[str]: + """ + 根据 YAML schema 判断缺失的必填参数 + """ + missing = [] + + for name, spec in input_schema.items(): + if spec.get("required", False) and name not in provided_params: + missing.append(name) + + return missing + +def load_schemas(yaml_path: str) -> Dict[str, Any]: + """ + 从 YAML 文件中读取 script 输入参数定义 + + 返回格式: + { + "script": "Slider", + "inputs": { + "min": {...}, + "max": {...} + } + } + """ + path = Path(yaml_path) + if not path.exists(): + raise FileNotFoundError(f"Script yaml not found: {yaml_path}") + + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + if "script" not in data or "inputs" not in data: + raise ValueError("Invalid script yaml format") + + return { + "script": data["script"], + "description": data.get("description", ""), + "inputs": data["inputs"], + } class SkillkitWrapper: - def __init__(self, user_skillsroot, sys_skillsroot): + def __init__(self, user_skillsroot, sys_skillsroot=None): self.client = SkillManager(project_skill_dir=skillroot, anthropic_config_dir=sys_skillsroot) @@ -10,22 +58,33 @@ class SkillkitWrapper: def list_skills(self): return self.client.list_skills() - def invoke_skill(self, skill: str, script: str, params: dict): - print(f"Invoking skill={skill}, script={script}, params={params}") - return self.client.invoke_skill(skill, params) + def load_skill(self, skillname): + skill = self.client.load_skill(skill_name) + if not hasattr(skill, 'schemas'): + fp = os.path.join(skill.base_dir, 'schemas.yaml') + if os.path.exists(fp): + data = load_schema(fp) + skill.schemas = data + for s in skill.scripts: + s.params = next(sch.inputs for sch in skill.schemas if sch.script==script_name) - def parse_skill_md(self, skill_name: str) -> List[Dict]: - md_path = os.path.join(self.skillspath, skill_name, "skill.md") - if not os.path.exists(md_path): - return [] - scripts = [] - md_text = open(md_path, encoding="utf-8").read() - script_blocks = re.findall(r"### (.+?)\n- 功能: (.+?)\n- 参数: (.+)", md_text) - for name, desc, params in script_blocks: - scripts.append({ - "name": name.strip(), - "description": desc.strip(), - "params": [p.strip() for p in params.split(",")] - }) - return scripts + return skill + + def get_script_params(self, skill_name, script_name): + skill = self.load_skill(skill_name) + return next(s.params for s in skill.scripts if s.name==script_name) + + def get_skill_scripts(self, skill_name): + skill = self.load_skill(skill_name) + return skill.scripts + + def execute_skill_script(self, skill_name, script_name, args={}): + return self.client.execute_skill_script( + skill_name=skill_name, + script_name=script_name, + arguments=args + ) + def invoke_skill(self, skill_name: str, script_name: str, params: dict): + print(f"Invoking skill={skill_nmae}, script={script_nmae}, params={params}") + return self.client.invoke_skill(skill_nmae, params)