This commit is contained in:
yumoqing 2026-01-20 18:27:11 +08:00
parent 7fbf7f7a31
commit 6dda573aed
2 changed files with 133 additions and 128 deletions

View File

@ -5,6 +5,7 @@ from dataclasses import dataclass, field
from pydantic import BaseModel, Field, ValidationError from pydantic import BaseModel, Field, ValidationError
from typing import Literal from typing import Literal
from appPublic.worker import awaitify from appPublic.worker import awaitify
from appPublic.streamhttpclient import StreamHttpClient, liner
from .skillkit_wrapper import SkillkitWrapper from .skillkit_wrapper import SkillkitWrapper
# --------------------------- # ---------------------------
@ -45,7 +46,7 @@ class LLM:
class DummyLLM(LLM): class DummyLLM(LLM):
def __init__(self, llmid, apikey): def __init__(self, llmid, apikey):
self.llmid = llmid self.llmid = llmid
self.akikey = apikey self.apikey = apikey
async def complete(self, prompt: str) -> str: async def complete(self, prompt: str) -> str:
hc = StreamHttpClient() hc = StreamHttpClient()
@ -57,6 +58,7 @@ class DummyLLM(LLM):
'llmid': self.llmid, 'llmid': self.llmid,
'prompt': prompt 'prompt': prompt
} }
url = 'https://opencomputing.ai/v1/llm'
reco = hc('POST', url, headers=headers, data=json.dumps(d)) reco = hc('POST', url, headers=headers, data=json.dumps(d))
doc = '' doc = ''
async for chunk in liner(reco): async for chunk in liner(reco):
@ -68,8 +70,8 @@ class DummyLLM(LLM):
if d.get('content'): if d.get('content'):
doc = f'{doc}{d["content"]}' doc = f'{doc}{d["content"]}'
else: else:
print(f'{f}:{d} error') print(f'{d} error')
return json.loads(doc) return doc
# --------------------------- # ---------------------------
# Agent 实现 # Agent 实现
@ -127,7 +129,7 @@ class Agent:
# --------------------------- # ---------------------------
async def resume(self, state: PlanState, user_reply: str): async def resume(self, state: PlanState, user_reply: str):
skill_spec = next(s for s in self.skills if s.name == state.skill) skill_spec = next(s for s in self.skills if s.name == state.skill)
schema_fields = next(s.params for s in skill.scripts if s.name==state.script) schema_fields = self.skillkit.get_script_params(state.skill, state.script)
if schema_fields is None: if schema_fields is None:
schema_fields = [] schema_fields = []
prompt = f""" prompt = f"""
@ -175,14 +177,17 @@ Task:
# 内部方法 # 内部方法
# --------------------------- # ---------------------------
def scripts_info(self, skill): def scripts_info(self, skill_name):
d = [] d = []
skill = self.skillkit.load_skill(skill_name)
for s in skill.scripts: for s in skill.scripts:
d.append( f'name:{s.name}, description:{s.description}, params:{str(s.params}' params = self.skillkit.get_script_params(skill_name, s.name)
return "Scripts: '::'.join(d) print(f'{params=}')
d.append( f'name:{s.name}, description:{s.description}, params:{str(params)}')
return "Scripts: " + '::'.join(d)
async def _candidate_skills(self, user_text: str): async def _candidate_skills(self, user_text: str):
skill_list = "\n".join(f"- skillname:{s.name}({s.description}): {self.scripts_info(s)}" for s in self.skills) skill_list = "\n".join(f"- skillname:{s.name}({s.description}): {self.scripts_info(s.name)}" for s in self.skills)
prompt = f""" prompt = f"""
User request: User request:
\"\"\"{user_text}\"\"\" \"\"\"{user_text}\"\"\"
@ -256,9 +261,9 @@ Ask the user a concise clarification question.
# --------------------------- # ---------------------------
# 测试运行 # 测试运行
# --------------------------- # ---------------------------
async def skillagent(llm, apikey, user_skillroot, sys_skillroot): async def skillagent(llm, apikey, user_skillroot, sys_skillroot=None):
llm = DummyLLM('8L4hFJ4QpSMyu1UP03Juo', 'eYgNuD6sVQgbj-khOOUNU') llm = DummyLLM('8L4hFJ4QpSMyu1UP03Juo', 'eYgNuD6sVQgbj-khOOUNU')
skillkit = SkillKitWrapper(skill_rootpath) skillkit = SkillkitWrapper(user_skillroot)
agent = Agent(llm, skillkit) agent = Agent(llm, skillkit)
while True: while True:

View File

@ -3,6 +3,7 @@ from skillkit import SkillManager
import yaml import yaml
from pathlib import Path from pathlib import Path
from typing import Dict, Any from typing import Dict, Any
from appPublic.dictObject import DictObject
def find_missing_params( def find_missing_params(
input_schema: Dict[str, Any], input_schema: Dict[str, Any],
@ -19,7 +20,7 @@ def find_missing_params(
return missing return missing
def load_schemas(yaml_path: str) -> Dict[str, Any]: def load_schemas(path) -> Dict[str, Any]:
""" """
YAML 文件中读取 script 输入参数定义 YAML 文件中读取 script 输入参数定义
@ -32,47 +33,46 @@ def load_schemas(yaml_path: str) -> Dict[str, Any]:
} }
} }
""" """
path = Path(yaml_path)
if not path.exists(): if not path.exists():
raise FileNotFoundError(f"Script yaml not found: {yaml_path}") raise FileNotFoundError(f"Script yaml not found: {yaml_path}")
with path.open("r", encoding="utf-8") as f: with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) data = yaml.safe_load(f)
if "script" not in data or "inputs" not in data: return DictObject(data)
raise ValueError("Invalid script yaml format")
return {
"script": data["script"],
"description": data.get("description", ""),
"inputs": data["inputs"],
}
class SkillkitWrapper: class SkillkitWrapper:
def __init__(self, user_skillsroot, sys_skillsroot=None): def __init__(self, user_skillsroot, sys_skillsroot=None):
self.client = SkillManager(project_skill_dir=skillroot, self.client = SkillManager(project_skill_dir=user_skillsroot,
anthropic_config_dir=sys_skillsroot) anthropic_config_dir=sys_skillsroot)
self.client.discover() self.client.discover()
self.schemas = {}
def list_skills(self): def list_skills(self):
return self.client.list_skills() return self.client.list_skills()
def load_skill(self, skillname): def load_skill(self, skill_name):
skill = self.client.load_skill(skill_name) skill = self.client.load_skill(skill_name)
if not hasattr(skill, 'schemas'): print(skill, dir(skill))
fp = os.path.join(skill.base_dir, 'schemas.yaml') schemaspath = skill.base_directory / 'schemas.yaml'
if os.path.exists(fp): if schemaspath.exists():
data = load_schema(fp) if not self.schemas.get(skill_name):
skill.schemas = data data = load_schemas(schemaspath)
for s in skill.scripts: self.schemas[skill_name] = data
s.params = next(sch.inputs for sch in skill.schemas if sch.script==script_name) print(f'{data=}, {str(schemaspath)}')
return skill return skill
def get_script_params(self, skill_name, script_name): def get_script_params(self, skill_name, script_name):
skill = self.load_skill(skill_name) skill = self.load_skill(skill_name)
return next(s.params for s in skill.scripts if s.name==script_name) d = self.schemas.get('skill_name')
if not d:
return []
m = d.scripts.get(script_name)
if not m:
return []
return m.inputs
def get_skill_scripts(self, skill_name): def get_skill_scripts(self, skill_name):
skill = self.load_skill(skill_name) skill = self.load_skill(skill_name)