harnessed_agent/wwwroot/api/agent_execute.dspy

52 lines
1.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Execute agent task - call tool/skill with given parameters"""
import json, time
result = {'widgettype': 'Message', 'options': {'title': 'Error', 'message': 'Invalid request', 'type': 'error'}}
try:
tool_name = params_kw.get('tool_name', '').strip()
parameters = params_kw.get('parameters', '{}').strip()
if not tool_name:
result['options'] = {'title': 'Error', 'message': '请指定工具名称', 'type': 'error'}
else:
user_id = await get_user()
try:
tool_params = json.loads(parameters)
except:
tool_params = {}
context = {'user_id': user_id}
# Call agent tool execution
exec_result = await harnessed_execute_tool(
tool_name=tool_name,
parameters=tool_params,
context=context
)
if exec_result.get('success'):
output = json.dumps(exec_result, ensure_ascii=False, indent=2)
result = {
'widgettype': 'Message',
'options': {
'title': '执行成功',
'message': f'工具: {tool_name}\n\n结果:\n{output}',
'type': 'success'
}
}
else:
result['options'] = {
'title': '执行失败',
'message': exec_result.get('error', '未知错误'),
'type': 'error'
}
except Exception as e:
result['options'] = {'title': 'Error', 'message': '执行异常: ' + str(e), 'type': 'error'}
return json.dumps(result, ensure_ascii=False)