423 lines
13 KiB
Markdown
423 lines
13 KiB
Markdown
# uapi 模块开发文档
|
||
|
||
## 模块概述
|
||
|
||
uapi (Universe API) 是 Hermes Agent 平台的**外部 API 网关与管理模块**,提供对第三方/上游系统 API 的统一配置、认证、调用和异步任务追踪能力。
|
||
|
||
核心职责:
|
||
- 管理上位系统(upapp)的注册与 API 密钥分配
|
||
- 将外部 HTTP API 抽象为数据库配置(无需改代码即可接入新 API)
|
||
- 支持流式响应(SSE)与同步调用两种模式
|
||
- 提供 Deerer 和 Bearer 两种认证方案
|
||
- 支持异步长任务(uptask)的创建、状态追踪与回调反馈
|
||
|
||
---
|
||
|
||
## 目录结构
|
||
|
||
```
|
||
uapi/
|
||
├── pyproject.toml # 构建配置(setuptools)
|
||
├── setup.cfg # 包元数据:name=uapi, version=0.0.1
|
||
├── requirements.txt # 运行时依赖:aiohttp
|
||
├── README.md
|
||
├── uapi/ # Python 源码包
|
||
│ ├── __init__.py # 空
|
||
│ ├── init.py # 模块初始化,注册函数到 ServerEnv
|
||
│ ├── appapi.py # UAPI 核心类 + deerer/bearer 认证
|
||
│ ├── uapi.py # UpAppApi 类(基于 UAPIData 缓存的版本)
|
||
│ ├── apidata.py # UAPIData 单例缓存
|
||
│ └── uptask.py # 异步长任务管理
|
||
├── json/ # CRUD 定义(bricks-framework)
|
||
│ ├── upapp.json # 上位系统管理
|
||
│ ├── upappkey.json # API 密钥管理
|
||
│ ├── uapi.json # API 定义
|
||
│ ├── uapiset.json # API 集合管理
|
||
│ ├── uapiio.json # API 输入输出定义
|
||
│ └── build.sh # 构建脚本:xls2ui -m ../models -o ../wwwroot uapi *.json
|
||
├── models/ # 表定义(xlsx 格式)
|
||
│ ├── upapp.xlsx
|
||
│ ├── upappkey.xlsx
|
||
│ ├── uapi.xlsx
|
||
│ ├── uapiset.xlsx
|
||
│ ├── uapiio.xlsx
|
||
│ └── uptask.xlsx
|
||
├── wwwroot/ # Web 前端资源(由 xls2ui 生成 + 手写 .dspy)
|
||
│ ├── jump_in.dspy # 上位系统跳转逻辑
|
||
│ ├── uptask_callback.dspy # 任务回调处理
|
||
│ ├── minimax_callback.dspy # MiniMax 回调
|
||
│ └── viducallback/
|
||
│ └── index.dspy # Vidu 视频生成回调
|
||
├── script/
|
||
│ └── perms.json # RBAC 权限配置
|
||
└── test/
|
||
└── t.py # 调用示例
|
||
```
|
||
|
||
---
|
||
|
||
## 数据库表结构
|
||
|
||
### 表关系
|
||
|
||
```
|
||
uapiset (API集合) ──1:N──> uapi (API定义)
|
||
upapp (上位系统) ──N:1──> uapiset (每个upapp关联一个apiset)
|
||
upapp (上位系统) ──1:N──> upappkey (API密钥)
|
||
```
|
||
|
||
### 核心表说明
|
||
|
||
| 表名 | 说明 | 关键字段 |
|
||
|------|------|----------|
|
||
| **uapiset** | API集合定义 | id, name, auth_apiname(可选的认证API名) |
|
||
| **uapi** | 单个API定义 | id, apisetid, name, httpmethod, path, headers, params, data, response, chunk_match |
|
||
| **upapp** | 上位系统注册 | id, name, apisetid, baseurl, myappid, ownerid, appownerid, secretkey, dynamic_func |
|
||
| **upappkey** | API密钥分配 | id, upappid, ownerid, orgid, apikey, secretkey |
|
||
| **uapiio** | API输入输出定义 | id, name, ... |
|
||
| **uptask** | 长任务追踪 | id, userid, executor_taskid, convert_func_name, status, start_timestamp, end_timestamp, response_data |
|
||
|
||
---
|
||
|
||
## Python API
|
||
|
||
### 模块初始化
|
||
|
||
在 `uapi/init.py` 中,通过 `load_uapi()` 将所有公开函数和类注册到 `ServerEnv`:
|
||
|
||
```python
|
||
def load_uapi():
|
||
g = ServerEnv()
|
||
g.UAPI = UAPI
|
||
g.UpAppApi = UpAppApi
|
||
g.uapi_data = UAPIData()
|
||
g.get_deerer = get_deerer
|
||
g.deerer = deerer
|
||
g.get_callerid = get_callerid
|
||
g.sor_get_callerid = sor_get_callerid
|
||
g.sor_get_uapi_by_appname_apiname = sor_get_uapi_by_appname_apiname
|
||
g.bearer = bearer
|
||
g.check_uptask_status = check_uptask_status
|
||
g.get_my_uptasks = get_my_uptasks
|
||
g.uptask_feedback = uptask_feedback
|
||
g.uptask_started = uptask_started
|
||
```
|
||
|
||
其他模块的 `.dspy` 文件可通过 `globals()` 直接使用这些函数。
|
||
|
||
### UAPI 类(appapi.py)
|
||
|
||
**直接查数据库调用外部 API 的核心类。**
|
||
|
||
```python
|
||
# 在 .dspy 中使用
|
||
uapi = UAPI(request, DictObject(**globals()))
|
||
|
||
# 调用方式 1:流式调用(返回生成器)
|
||
async for chunk in uapi(upappid, apiname, callerid, params={}):
|
||
# chunk 是 bytes
|
||
|
||
# 调用方式 2:一次性获取全部响应
|
||
result = await uapi.call(upappid, apiname, callerid, params={})
|
||
# result 是 bytes
|
||
|
||
# 调用方式 3:逐行流式处理(自动过滤 chunk_match 前缀)
|
||
async for line in uapi.stream_linify(upappid, apiname, callerid, params={}):
|
||
# line 是 str,已去除 chunk_match 前缀,经 response 模板渲染
|
||
```
|
||
|
||
**构造参数:**
|
||
- `request`: HTTP 请求对象(可选,用于获取运行时命名空间)
|
||
- `env`: DictObject 环境变量(可选,默认从 ServerEnv 获取)
|
||
- `sor`: sqlor 游标(可选,传入后不再自行创建数据库连接)
|
||
|
||
**工作流程:**
|
||
1. 通过 `sor_get_uapi()` 查询 uapi/upapp/uapiset 三表获取 API 配置
|
||
2. 如果 API 定义了 `auth_apiname`,先执行认证 API(`do_auth`),将结果注入 `self.env`
|
||
3. 通过 `get_userapikey()` 获取调用者的 API 密钥信息
|
||
4. 渲染 path/headers/data/params 模板,组装 HTTP 请求
|
||
5. 如果配置了 `dynamic_func_name`,执行动态函数
|
||
6. 通过 `StreamHttpClient` 发起 HTTP 请求并流式返回
|
||
|
||
### UpAppApi 类(uapi.py)
|
||
|
||
**基于 UAPIData 缓存的 API 调用类,接口与 UAPI 完全一致。**
|
||
|
||
区别在于:UAPI 每次都查数据库,UpAppApi 通过 UAPIData 单例缓存 API 定义和密钥信息,适合高频调用场景。
|
||
|
||
```python
|
||
# 在 .dspy 中使用
|
||
api = UpAppApi(request)
|
||
result = await api.call(upappid, apiname, callerid, params={})
|
||
```
|
||
|
||
### UAPIData 类(apidata.py)
|
||
|
||
**API 定义和密钥的内存缓存单例。**
|
||
|
||
```python
|
||
@SingletonDecorator
|
||
class UAPIData:
|
||
async def get_api(appid, apiname) # 获取 API 定义(带缓存)
|
||
async def get_userapikey(appid, callerid) # 获取用户 API 密钥
|
||
async def get_apiusers(appid, orgid=None) # 获取某 app 的所有授权用户
|
||
async def get_calluserid(appid, orgid=None) # 随机返回一个调用者 userid(负载均衡)
|
||
```
|
||
|
||
### 认证辅助函数
|
||
|
||
#### deerer — 自定义 Deerer 认证
|
||
|
||
```python
|
||
def deerer(myappid, apikey, secretkey):
|
||
t = time()
|
||
txt = f'{t}:{apikey}'
|
||
cyber = aes_encode_b64(secretkey, txt)
|
||
return f'Deerer {myappid}-:-{cyber}'
|
||
```
|
||
|
||
在 Header 模板中使用:`{{deerer(myappid, apikey, secretkey)}}`
|
||
|
||
```python
|
||
# 异步版本:自动查询密钥
|
||
d = await get_deerer(upappid, callerid) # 返回 "Deerer xxx-:-yyy" 去掉 "Deerer " 前缀的部分
|
||
```
|
||
|
||
#### bearer — 标准 Bearer Token 认证
|
||
|
||
```python
|
||
def bearer(apikey):
|
||
return f'Bearer {apikey}'
|
||
```
|
||
|
||
在 Header 模板中使用:`{{bearer(apikey)}}`
|
||
|
||
### 辅助查询函数
|
||
|
||
```python
|
||
# 通过 appname + apiname 查询 API(不需要 upappid)
|
||
api = await sor_get_uapi_by_appname_apiname(sor, appname, apiname)
|
||
|
||
# 获取某 org 的 callerid(随机选取)
|
||
callerid = await get_callerid(orgid)
|
||
callerid = await sor_get_callerid(sor, orgid)
|
||
```
|
||
|
||
### 异步长任务管理(uptask.py)
|
||
|
||
用于追踪远端异步任务的生命周期。
|
||
|
||
```python
|
||
# 创建任务记录
|
||
task_record_id = await uptask_started(taskid, userid, convert_func_name)
|
||
# convert_func_name: 注册函数名,用于将远端回调数据转换为标准格式
|
||
|
||
# 任务回调更新状态
|
||
await uptask_feedback(task_id, resp_data)
|
||
# resp_data 经 convert_func 处理后,status 字段映射为 SUCCEEDED/FAILED
|
||
|
||
# 查询任务状态
|
||
result = await check_uptask_status(task_id)
|
||
# 返回 DictObject(status='SUCCEEDED'|'FAILED'|'started'|..., response_data=...)
|
||
|
||
# 查询某用户某业务日期的任务
|
||
tasks = await get_my_uptasks(userid, biz_date)
|
||
```
|
||
|
||
**convert_func 规范:**
|
||
```python
|
||
# 注册函数签名
|
||
async def my_convert(resp_data):
|
||
return {
|
||
'status': 'SUCCEEDED', # 或 'FAILED'
|
||
'identify_code': 'xxx',
|
||
# ... 其他需要保存的字段
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 在 .dspy 中的使用示例
|
||
|
||
### 示例 1:调用外部 API
|
||
|
||
```python
|
||
# jump_in.dspy — 跳转到上位系统
|
||
userid = await get_user()
|
||
d = await get_deerer(params_kw.id, userid)
|
||
if d is None:
|
||
return UiError(title='跳转', message='当前用户没有上位系统apikey')
|
||
return {
|
||
"widgettype": "NewWindow",
|
||
"options": {
|
||
"name": "upappid",
|
||
"url": params_kw.baseurl + "/dapi/jumpin.dspy?deerer=" + quote(d)
|
||
}
|
||
}
|
||
```
|
||
|
||
### 示例 2:处理异步任务回调
|
||
|
||
```python
|
||
# uptask_callback.dspy
|
||
if params_kw.task_id is None:
|
||
raise Exception('need a task_id')
|
||
try:
|
||
resp = await uptask_feedback(task_id, params_kw)
|
||
except Exception as e:
|
||
exception(f'{e}')
|
||
return json_response({'text': 'ok'})
|
||
```
|
||
|
||
### 示例 3:独立脚本测试
|
||
|
||
```python
|
||
# test/t.py
|
||
import asyncio
|
||
from appPublic.jsonConfig import getConfig
|
||
from sqlor.dbpools import DBPools
|
||
from ahserver.serverenv import ServerEnv
|
||
from uapi.appapi import UAPI
|
||
|
||
def get_module_dbname(mn):
|
||
return 'sage'
|
||
|
||
async def main():
|
||
workdir = os.getcwd()
|
||
config = getConfig(workdir, {'workdir': workdir})
|
||
DBPools(config.databases)
|
||
env = ServerEnv()
|
||
env.get_module_dbname = get_module_dbname
|
||
|
||
uapi = UAPI()
|
||
params = {
|
||
'baseurl': 'https://qianfan.baidubce.com',
|
||
'model': 'deepseek-v3',
|
||
'prompt': '北京今天天气如何,适合跑步吗?',
|
||
}
|
||
upapiid = 'R47xUJay76dCCt1sLmWvE'
|
||
|
||
async for line in uapi.stream_linify(upapiid, '0', callerid, params=params):
|
||
print(line)
|
||
|
||
if __name__ == '__main__':
|
||
asyncio.new_event_loop().run_until_complete(main())
|
||
```
|
||
|
||
---
|
||
|
||
## JSON CRUD 定义
|
||
|
||
每个 `.json` 文件定义一个表的 bricks-framework CRUD 页面。
|
||
|
||
### upapp.json — 上位系统管理
|
||
|
||
- 按 `ownerid`(登录用户所在组织)过滤
|
||
- `secretkey` 字段设为 `confidential_fields`(前端脱敏显示)
|
||
- 子表:`upappkey`(APIKEY 列表)
|
||
- 工具栏:`jumpin` 按钮,跳转到 `/uapi/jump_in.dspy`
|
||
|
||
### upappkey.json — API 密钥管理
|
||
|
||
- 按 `ownerid`/`orgid` 过滤
|
||
- `apikey`、`apipasswd` 为保密字段
|
||
- 编辑时 `upappid`、`ownerid`、`orgid` 不可修改
|
||
|
||
### uapi.json — API 定义
|
||
|
||
- 按 `apisetid`、`name` 排序
|
||
- 工具栏:`api测试` 按钮,弹出测试窗口
|
||
|
||
### uapiset.json — API 集合管理
|
||
|
||
- 子表:`uapi`(定义该集合下的所有 API)
|
||
|
||
### uapiio.json — API 输入输出定义
|
||
|
||
---
|
||
|
||
## 构建流程
|
||
|
||
```bash
|
||
# 在 json/ 目录下执行
|
||
cd ~/repos/uapi/json
|
||
bash build.sh
|
||
|
||
# 等价于:
|
||
xls2ui -m ../models -o ../wwwroot uapi *.json
|
||
```
|
||
|
||
该命令读取 `models/*.xlsx` 表定义和 `json/*.json` CRUD 配置,生成 `wwwroot/` 下的 `.ui` 前端文件。
|
||
|
||
**注意:** `.dspy` 文件是手写的,不会被 `xls2ui` 覆盖。
|
||
|
||
---
|
||
|
||
## 权限配置
|
||
|
||
在 `script/perms.json` 中定义 RBAC 权限:
|
||
|
||
```json
|
||
[
|
||
{
|
||
"path": "/uapi/upapp",
|
||
"perms": [
|
||
{"orgtype": "customer", "roles": ["operator"]},
|
||
{"orgtype": "owner", "roles": ["operator"]}
|
||
]
|
||
},
|
||
{
|
||
"path": "/uapi/jsonhttpapi",
|
||
"perms": [
|
||
{"orgtype": "customer", "roles": ["operator"]},
|
||
{"orgtype": "owner", "roles": ["operator"]}
|
||
]
|
||
},
|
||
{
|
||
"path": "/uapi/upappkey",
|
||
"perm": [
|
||
{"orgtype": "customer", "roles": ["operator"]},
|
||
{"orgtype": "owner", "roles": ["operator"]}
|
||
]
|
||
}
|
||
]
|
||
```
|
||
|
||
- `customer` 组织和 `owner` 组织的 `operator` 角色可访问上位系统和密钥管理页面
|
||
|
||
---
|
||
|
||
## 关键设计要点
|
||
|
||
1. **双调用路径**:UAPI(直接查库)和 UpAppApi(缓存版)提供相同接口,按需选择
|
||
2. **模板引擎渲染**:path/headers/data/params/response 全部支持模板语法,可在配置中动态组装
|
||
3. **流式响应支持**:通过 `StreamHttpClient` 和 `stream_linify` 支持 SSE 场景(如 LLM API)
|
||
4. **chunk_match 过滤**:自动去除流式响应中的前缀行(如 `data:`),只提取有效数据
|
||
5. **response 模板**:可定义响应数据转换模板,将上游 JSON 映射为前端需要的格式
|
||
6. **AES 加密密钥**:`secretkey` 在数据库中 AES 加密存储,运行时通过 `password_decode()` 解密
|
||
7. **Deerer 认证**:自定义认证头,时间戳 + apikey AES 加密,防重放攻击
|
||
8. **动态函数扩展**:`dynamic_func` 允许在 HTTP 请求前执行自定义逻辑(通过 RegisterFunction)
|
||
|
||
---
|
||
|
||
## 依赖关系
|
||
|
||
```
|
||
uapi
|
||
├── sqlor # 数据库 ORM
|
||
├── apppublic # 工具库(日志、AES、HTTP 客户端、模板引擎等)
|
||
├── ahserver # Web 服务器框架(ServerEnv、password_decode 等)
|
||
└── aiohttp # 异步 HTTP(requirements.txt)
|
||
```
|
||
|
||
---
|
||
|
||
## 开发注意事项
|
||
|
||
1. **dbname 获取**:必须通过 `get_serverenv('get_module_dbname')('uapi')` 动态获取,禁止硬编码
|
||
2. **sqlor 使用**:`sor.R(tablename, ns)` 的 ns 字典同时包含过滤条件和选项
|
||
3. **密钥解密**:从数据库读取的 apikey/secretkey 必须通过 `password_decode()` 解密
|
||
4. **模板渲染异常**:headers/body 模板渲染后需 json.loads 验证,渲染失败会抛出异常
|
||
5. **uptask 回调**:convert_func 返回 None 时会抛异常,必须返回包含 status 字段的字典
|
||
6. **UAPI 与 UpAppApi 选择**:低频调用用 UAPI(每次查库确保最新),高频调用用 UpAppApi(缓存)
|