This commit is contained in:
yumoqing 2026-01-20 16:17:30 +08:00
parent e5e3bd6c02
commit 25d384693c
3 changed files with 347 additions and 74 deletions

View File

@ -1,43 +1,282 @@
import os, json import json
from typing import Dict import asyncio
from pydantic import ValidationError from typing import List, Optional
from dataclasses import dataclass, field
from pydantic import BaseModel, Field, ValidationError
from typing import Literal
from .skillkit_wrapper import SkillkitWrapper
class DAGNode: # ---------------------------
def __init__(self, skill, script=None, params=None): # Skill Decision / PlanState
# ---------------------------
@dataclass
class SkillDecision:
skill: str
params: dict
reason: Optional[str] = None
@dataclass
class PlanState:
user_intent: str
skill: str
script: str
params: dict
missing: List[str] = field(default_factory=list)
# ---------------------------
# 自定义异常
# ---------------------------
class MissingParams(Exception):
def __init__(self, skill: str, fields: List[str]):
self.skill = skill self.skill = skill
self.script = script self.fields = fields
self.params = params or {}
self.state = "pending"
self.missing = []
class SkillAgent: # ---------------------------
def __init__(self, skill_loader, skillkit_wrapper, llm): # LLM 接口(可替换为你的模型)
self.skill_loader = skill_loader # ---------------------------
self.skillkit = skillkit_wrapper class LLM:
async def complete(self, prompt: str) -> str:
raise NotImplementedError
# ---------------------------
# DummyLLM 示例(测试用)
# ---------------------------
class DummyLLM(LLM):
def __init__(self, llmid, apikey):
self.llmid = llmid
self.akikey = apikey
async def complete(self, prompt: str) -> str:
hc = StreamHttpClient()
headers = {
'Authorization': f'Bearer {self.apikey}',
'Content-Type': 'application/json'
}
d = {
'llmid': self.llmid,
'prompt': prompt
}
reco = hc('POST', url, headers=headers, data=json.dumps(d))
doc = ''
async for chunk in liner(reco):
try:
d = json.loads(chunk)
except Exception as e:
print(f'****{chunk=} error {e} {format_exc()}')
continue
if d.get('content'):
doc = f'{doc}{d["content"]}'
else:
print(f'{f}:{d} error')
return json.loads(doc)
# ---------------------------
# Agent 实现
# ---------------------------
class Agent:
def __init__(self, llm: LLM, skillkit):
self.llm = llm self.llm = llm
self.dag = [] self.skillkit = skillkit
self.skills = None
self.loaded = False
def load_skills(self):
if self.loaded:
return
self.skills = self.skillkit.list_skills()
for s in self.skills:
self.skillkit.load_skill(s.name)
# ---------------------------
# plan: 多 skill 候选 + 参数抽取
# ---------------------------
async def plan(self, user_text: str): async def plan(self, user_text: str):
skills = self.skill_loader.list_skills() self.load_skills()
skill_docs = "\n".join([open(os.path.join(self.skill_loader.skillspath, s, "skill.md"), encoding="utf-8").read() for s in skills]) candidates = await self._candidate_skills(user_text)
prompt = f""" decision = await self._plan_with_candidates(user_text, candidates)
User request: {user_text} try:
Available Skills: validated_params = self._validate_params(decision)
{skill_docs} except MissingParams as e:
question = await self._ask_user_for_params(user_text, decision.skill, e.fields)
state = PlanState(
user_intent=user_text,
skill=decision.skill,
script=decision.script,
params=decision.params,
missing=e.fields
)
return {
"type": "clarification",
"state": state,
"question": question
}
return {
"type": "script_call",
"script": decision.script,
"skill": decision.skill,
"params": validated_params,
"reason": decision.reason
}
Task: choose the most suitable skill and script. Output JSON: {{ "skill": "<skill_name>", "script": "<script_name>" }} def get_scripts(self, skillname):
return self.skillkit.get_skill_scripts(skillname)
# ---------------------------
# resume: 补 missing 参数
# ---------------------------
async def resume(self, state: PlanState, user_reply: str):
skill_spec = next(s for s in self.skills if s.name == state.skill)
schema_fields = next(s.params for s in skill.scripts if s.name==state.script)
if schema_fields is None:
schema_fields = []
prompt = f"""
You are an agent helping a user fill parameters for a skill.
Skill name: {state.skill}
Script name: {state.script}
Skill required parameters: {schema_fields}
User original intent:
\"\"\"{state.user_intent}\"\"\"
Known parameters:
{state.params}
Missing parameters:
{state.missing}
User reply:
\"\"\"{user_reply}\"\"\"
Task:
- Extract values ONLY for missing parameters.
- Do NOT modify existing known parameters.
- All output must match the skill parameter schema.
- Output JSON only with the missing parameters.
""" """
raw = await self.llm.complete(prompt) raw = await self.llm.complete(prompt)
data = json.loads(raw) new_params = json.loads(raw)
node = DAGNode(skill=data["skill"], script=data["script"])
self.dag.append(node)
return node
async def resume(self, node: DAGNode, user_params: dict = None, schema=None): state.params.update(new_params)
# 校验 schema
try: try:
validated = schema(**(user_params or node.params)) validated = self._validate_params(SkillDecision(skill=state.skill, params=state.params))
node.params = validated.dict() except MissingParams as e:
state.missing = e.fields
question = await self._ask_user_for_params(state.user_intent, state.skill, e.fields)
return {"type": "clarification", "state": state, "question": question}
# 参数完整,返回可直接调用 skill
return {"type": "skill_call", "skill": state.skill, "params": validated}
# ---------------------------
# 内部方法
# ---------------------------
def scripts_info(self, skill):
d = []
for s in skill.scripts:
d.append( f'name:{s.name}, description:{s.description}, params:{str(s.params}'
return "Scripts: '::'.join(d)
async def _candidate_skills(self, user_text: str):
skill_list = "\n".join(f"- skillname:{s.name}({s.description}): {self.scripts_info(s)}" for s in self.skills)
prompt = f"""
User request:
\"\"\"{user_text}\"\"\"
Available skills:
{skill_list}
Task:
Select up to 3 most relevant skill's scripts.
Output JSON list only.
"""
raw = await self.llm.complete(prompt)
return json.loads(raw)
async def _plan_with_candidates(self, user_text: str, candidates: list[str]):
specs = [s for s in self.skills if s.name in candidates]
spec_desc = "\n".join(
f"- {s.name}: inputs={list(s.schema.model_fields.keys())}" for s in specs if s.schema
)
prompt = f"""
User request:
\"\"\"{user_text}\"\"\"
Candidate skills:
{spec_desc}
Task:
1. Choose the best skill.
2. Extract parameters strictly matching schema.
Rules:
- If a required parameter is missing, set it to null.
- Output JSON only.
Output:
{{
"skill": "...",
"script": "...",
"params": {{ ... }},
"reason": "..."
}}
"""
raw = await self.llm.complete(prompt)
return SkillDecision(**json.loads(raw))
def _validate_params(self, decision: SkillDecision):
spec = next(s for s in self.skills if s.name == decision.skill)
if not spec.schema:
return decision.params
try:
return spec.schema(**decision.params).dict()
except ValidationError as e: except ValidationError as e:
node.missing = [err['loc'][0] for err in e.errors()] missing = [err["loc"][0] for err in e.errors() if err["type"] == "missing"]
return node.missing if missing:
return self.skillkit.invoke_skill(node.skill, node.script, node.params) raise MissingParams(decision.skill, missing)
raise
async def _ask_user_for_params(self, user_text: str, skill: str, fields: List[str]):
prompt = f"""
User request:
\"\"\"{user_text}\"\"\"
The script "{script} in skill "{skill}" requires the following missing parameters:
{fields}
Ask the user a concise clarification question.
"""
return await self.llm.complete(prompt)
# ---------------------------
# 测试运行
# ---------------------------
async def skillagent(llm, apikey, user_skillroot, sys_skillroot):
llm = DummyLLM('8L4hFJ4QpSMyu1UP03Juo', 'eYgNuD6sVQgbj-khOOUNU')
skillkit = SkillKitWrapper(skill_rootpath)
agent = Agent(llm, skillkit)
while True:
print('What you want to do?')
prompt=input()
if not prompt:
continue
result = await agent.plan(prompt)
while result['type'] == 'clarification':
print(result['question'])
user_reply = input()
result = await agent.resume(result["state"], user_reply)
if result['type'] == 'skill_call':
agent.skillkit.execute_skill_script(
result['skill'],
result['script'],
params=result['params']
)
else:
print(result)

View File

@ -1,25 +0,0 @@
import os, re
from typing import List, Dict
class SkillLoader:
def __init__(self, skillspath: str):
self.skillspath = skillspath
def list_skills(self) -> List[str]:
return [name for name in os.listdir(self.skillspath)
if os.path.isdir(os.path.join(self.skillspath, name))]
def parse_skill_md(self, skill_name: str) -> List[Dict]:
md_path = os.path.join(self.skillspath, skill_name, "skill.md")
if not os.path.exists(md_path):
return []
scripts = []
md_text = open(md_path, encoding="utf-8").read()
script_blocks = re.findall(r"### (.+?)\n- 功能: (.+?)\n- 参数: (.+)", md_text)
for name, desc, params in script_blocks:
scripts.append({
"name": name.strip(),
"description": desc.strip(),
"params": [p.strip() for p in params.split(",")]
})
return scripts

View File

@ -1,7 +1,55 @@
import os
from skillkit import SkillManager from skillkit import SkillManager
import yaml
from pathlib import Path
from typing import Dict, Any
def find_missing_params(
input_schema: Dict[str, Any],
provided_params: Dict[str, Any],
) -> list[str]:
"""
根据 YAML schema 判断缺失的必填参数
"""
missing = []
for name, spec in input_schema.items():
if spec.get("required", False) and name not in provided_params:
missing.append(name)
return missing
def load_schemas(yaml_path: str) -> Dict[str, Any]:
"""
YAML 文件中读取 script 输入参数定义
返回格式
{
"script": "Slider",
"inputs": {
"min": {...},
"max": {...}
}
}
"""
path = Path(yaml_path)
if not path.exists():
raise FileNotFoundError(f"Script yaml not found: {yaml_path}")
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if "script" not in data or "inputs" not in data:
raise ValueError("Invalid script yaml format")
return {
"script": data["script"],
"description": data.get("description", ""),
"inputs": data["inputs"],
}
class SkillkitWrapper: class SkillkitWrapper:
def __init__(self, user_skillsroot, sys_skillsroot): def __init__(self, user_skillsroot, sys_skillsroot=None):
self.client = SkillManager(project_skill_dir=skillroot, self.client = SkillManager(project_skill_dir=skillroot,
anthropic_config_dir=sys_skillsroot) anthropic_config_dir=sys_skillsroot)
@ -10,22 +58,33 @@ class SkillkitWrapper:
def list_skills(self): def list_skills(self):
return self.client.list_skills() return self.client.list_skills()
def invoke_skill(self, skill: str, script: str, params: dict): def load_skill(self, skillname):
print(f"Invoking skill={skill}, script={script}, params={params}") skill = self.client.load_skill(skill_name)
return self.client.invoke_skill(skill, params) if not hasattr(skill, 'schemas'):
fp = os.path.join(skill.base_dir, 'schemas.yaml')
if os.path.exists(fp):
data = load_schema(fp)
skill.schemas = data
for s in skill.scripts:
s.params = next(sch.inputs for sch in skill.schemas if sch.script==script_name)
def parse_skill_md(self, skill_name: str) -> List[Dict]: return skill
md_path = os.path.join(self.skillspath, skill_name, "skill.md")
if not os.path.exists(md_path): def get_script_params(self, skill_name, script_name):
return [] skill = self.load_skill(skill_name)
scripts = [] return next(s.params for s in skill.scripts if s.name==script_name)
md_text = open(md_path, encoding="utf-8").read()
script_blocks = re.findall(r"### (.+?)\n- 功能: (.+?)\n- 参数: (.+)", md_text) def get_skill_scripts(self, skill_name):
for name, desc, params in script_blocks: skill = self.load_skill(skill_name)
scripts.append({ return skill.scripts
"name": name.strip(),
"description": desc.strip(), def execute_skill_script(self, skill_name, script_name, args={}):
"params": [p.strip() for p in params.split(",")] return self.client.execute_skill_script(
}) skill_name=skill_name,
return scripts script_name=script_name,
arguments=args
)
def invoke_skill(self, skill_name: str, script_name: str, params: dict):
print(f"Invoking skill={skill_nmae}, script={script_nmae}, params={params}")
return self.client.invoke_skill(skill_nmae, params)