uapi/README.md

407 lines
12 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# uapi 模块开发文档
## 模块概述
uapi (Universe API) 是 Hermes Agent 平台的**外部 API 网关与管理模块**,提供对第三方/上游系统 API 的统一配置、认证、调用和异步任务追踪能力。
核心职责:
- 管理上位系统upapp的注册与 API 密钥分配
- 将外部 HTTP API 抽象为数据库配置(无需改代码即可接入新 API
- 支持流式响应SSE与同步调用两种模式
- 提供 Deerer 和 Bearer 两种认证方案
- 支持异步长任务uptask的创建、状态追踪与回调反馈
---
## 目录结构
```
uapi/
├── pyproject.toml # 构建配置setuptools
├── setup.cfg # 包元数据name=uapi, version=0.0.1
├── requirements.txt # 运行时依赖aiohttp
├── README.md
├── uapi/ # Python 源码包
│ ├── __init__.py # 空
│ ├── init.py # 模块初始化,注册函数到 ServerEnv
│ ├── appapi.py # deerer/bearer 认证 + 辅助查询函数
│ ├── uapi.py # UpAppApi 类API 调用核心类)
│ ├── apidata.py # UAPIData 单例缓存
│ └── uptask.py # 异步长任务管理
├── json/ # CRUD 定义bricks-framework
│ ├── upapp.json # 上位系统管理
│ ├── upappkey.json # API 密钥管理
│ ├── uapi.json # API 定义
│ ├── uapiset.json # API 集合管理
│ ├── uapiio.json # API 输入输出定义
│ └── build.sh # 构建脚本xls2ui -m ../models -o ../wwwroot uapi *.json
├── models/ # 表定义xlsx 格式)
│ ├── upapp.xlsx
│ ├── upappkey.xlsx
│ ├── uapi.xlsx
│ ├── uapiset.xlsx
│ ├── uapiio.xlsx
│ └── uptask.xlsx
├── wwwroot/ # Web 前端资源(由 xls2ui 生成 + 手写 .dspy
│ ├── jump_in.dspy # 上位系统跳转逻辑
│ ├── uptask_callback.dspy # 任务回调处理
│ ├── minimax_callback.dspy # MiniMax 回调
│ └── viducallback/
│ └── index.dspy # Vidu 视频生成回调
├── script/
│ └── perms.json # RBAC 权限配置
└── test/
└── t.py # 调用示例
```
---
## 数据库表结构
### 表关系
```
uapiset (API集合) ──1:N──> uapi (API定义)
upapp (上位系统) ──N:1──> uapiset (每个upapp关联一个apiset)
upapp (上位系统) ──1:N──> upappkey (API密钥)
```
### 核心表说明
| 表名 | 说明 | 关键字段 |
|------|------|----------|
| **uapiset** | API集合定义 | id, name, auth_apiname可选的认证API名 |
| **uapi** | 单个API定义 | id, apisetid, name, httpmethod, path, headers, params, data, response, chunk_match |
| **upapp** | 上位系统注册 | id, name, apisetid, baseurl, myappid, ownerid, appownerid, secretkey, dynamic_func |
| **upappkey** | API密钥分配 | id, upappid, ownerid, orgid, apikey, secretkey |
| **uapiio** | API输入输出定义 | id, name, ... |
| **uptask** | 长任务追踪 | id, userid, executor_taskid, convert_func_name, status, start_timestamp, end_timestamp, response_data |
---
## Python API
### 模块初始化
`uapi/init.py` 中,通过 `load_uapi()` 将所有公开函数和类注册到 `ServerEnv`
```python
def load_uapi():
g = ServerEnv()
g.UpAppApi = UpAppApi
g.uapi_data = UAPIData()
g.get_deerer = get_deerer
g.deerer = deerer
g.get_callerid = get_callerid
g.sor_get_callerid = sor_get_callerid
g.sor_get_uapi_by_appname_apiname = sor_get_uapi_by_appname_apiname
g.bearer = bearer
g.check_uptask_status = check_uptask_status
g.get_my_uptasks = get_my_uptasks
g.uptask_feedback = uptask_feedback
g.uptask_started = uptask_started
```
其他模块的 `.dspy` 文件可通过 `globals()` 直接使用这些函数。
### UpAppApi 类uapi.py
**基于 UAPIData 缓存的 API 调用类。**
```python
# 在 .dspy 中使用
api = UpAppApi(request)
# 调用方式 1流式调用返回生成器
async for chunk in api(upappid, apiname, callerid, params={}):
# chunk 是 bytes
# 调用方式 2一次性获取全部响应
result = await api.call(upappid, apiname, callerid, params={})
# result 是 bytes
# 调用方式 3逐行流式处理自动过滤 chunk_match 前缀)
async for line in api.stream_linify(upappid, apiname, callerid, params={}):
# line 是 str已去除 chunk_match 前缀,经 response 模板渲染
```
**构造参数:**
- `request`: HTTP 请求对象(可选,用于获取运行时命名空间)
**工作流程:**
1. 通过 `UAPIData` 获取 API 定义(内存缓存,不直接查库)
2. 如果 API 定义了 `auth_apiname`,先执行认证 API`do_auth`),将结果注入 `self.env`
3. 通过 `get_userapikey()` 获取调用者的 API 密钥信息
4. 渲染 path/headers/data/params 模板,组装 HTTP 请求
5. 如果配置了 `dynamic_func_name`,执行动态函数
6. 通过 `StreamHttpClient` 发起 HTTP 请求并流式返回
```
### UAPIData 类apidata.py
**API 定义和密钥的内存缓存单例。**
```python
@SingletonDecorator
class UAPIData:
async def get_api(appid, apiname) # 获取 API 定义(带缓存)
async def get_userapikey(appid, callerid) # 获取用户 API 密钥
async def get_apiusers(appid, orgid=None) # 获取某 app 的所有授权用户
async def get_calluserid(appid, orgid=None) # 随机返回一个调用者 userid负载均衡
```
### 认证辅助函数
#### deerer — 自定义 Deerer 认证
```python
def deerer(myappid, apikey, secretkey):
t = time()
txt = f'{t}:{apikey}'
cyber = aes_encode_b64(secretkey, txt)
return f'Deerer {myappid}-:-{cyber}'
```
在 Header 模板中使用:`{{deerer(myappid, apikey, secretkey)}}`
```python
# 异步版本:自动查询密钥
d = await get_deerer(upappid, callerid) # 返回 "Deerer xxx-:-yyy" 去掉 "Deerer " 前缀的部分
```
#### bearer — 标准 Bearer Token 认证
```python
def bearer(apikey):
return f'Bearer {apikey}'
```
在 Header 模板中使用:`{{bearer(apikey)}}`
### 辅助查询函数
```python
# 通过 appname + apiname 查询 API不需要 upappid
api = await sor_get_uapi_by_appname_apiname(sor, appname, apiname)
# 获取某 org 的 callerid随机选取
callerid = await get_callerid(orgid)
callerid = await sor_get_callerid(sor, orgid)
```
### 异步长任务管理uptask.py
用于追踪远端异步任务的生命周期。
```python
# 创建任务记录
task_record_id = await uptask_started(taskid, userid, convert_func_name)
# convert_func_name: 注册函数名,用于将远端回调数据转换为标准格式
# 任务回调更新状态
await uptask_feedback(task_id, resp_data)
# resp_data 经 convert_func 处理后status 字段映射为 SUCCEEDED/FAILED
# 查询任务状态
result = await check_uptask_status(task_id)
# 返回 DictObject(status='SUCCEEDED'|'FAILED'|'started'|..., response_data=...)
# 查询某用户某业务日期的任务
tasks = await get_my_uptasks(userid, biz_date)
```
**convert_func 规范:**
```python
# 注册函数签名
async def my_convert(resp_data):
return {
'status': 'SUCCEEDED', # 或 'FAILED'
'identify_code': 'xxx',
# ... 其他需要保存的字段
}
```
---
## 在 .dspy 中的使用示例
### 示例 1调用外部 API
```python
# jump_in.dspy — 跳转到上位系统
userid = await get_user()
d = await get_deerer(params_kw.id, userid)
if d is None:
return UiError(title='跳转', message='当前用户没有上位系统apikey')
return {
"widgettype": "NewWindow",
"options": {
"name": "upappid",
"url": params_kw.baseurl + "/dapi/jumpin.dspy?deerer=" + quote(d)
}
}
```
### 示例 2处理异步任务回调
```python
# uptask_callback.dspy
if params_kw.task_id is None:
raise Exception('need a task_id')
try:
resp = await uptask_feedback(task_id, params_kw)
except Exception as e:
exception(f'{e}')
return json_response({'text': 'ok'})
```
### 示例 3独立脚本测试
```python
# test/t.py
import asyncio
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools
from ahserver.serverenv import ServerEnv
from uapi.uapi import UpAppApi
def get_module_dbname(mn):
return 'sage'
async def main():
workdir = os.getcwd()
config = getConfig(workdir, {'workdir': workdir})
DBPools(config.databases)
env = ServerEnv()
env.get_module_dbname = get_module_dbname
api = UpAppApi()
params = {
'baseurl': 'https://qianfan.baidubce.com',
'model': 'deepseek-v3',
'prompt': '北京今天天气如何,适合跑步吗?',
}
upapiid = 'R47xUJay76dCCt1sLmWvE'
async for line in api.stream_linify(upapiid, '0', callerid, params=params):
print(line)
if __name__ == '__main__':
asyncio.new_event_loop().run_until_complete(main())
```
---
## JSON CRUD 定义
每个 `.json` 文件定义一个表的 bricks-framework CRUD 页面。
### upapp.json — 上位系统管理
-`ownerid`(登录用户所在组织)过滤
- `secretkey` 字段设为 `confidential_fields`(前端脱敏显示)
- 子表:`upappkey`APIKEY 列表)
- 工具栏:`jumpin` 按钮,跳转到 `/uapi/jump_in.dspy`
### upappkey.json — API 密钥管理
-`ownerid`/`orgid` 过滤
- `apikey``apipasswd` 为保密字段
- 编辑时 `upappid``ownerid``orgid` 不可修改
### uapi.json — API 定义
-`apisetid``name` 排序
- 工具栏:`api测试` 按钮,弹出测试窗口
### uapiset.json — API 集合管理
- 子表:`uapi`(定义该集合下的所有 API
### uapiio.json — API 输入输出定义
---
## 构建流程
```bash
# 在 json/ 目录下执行
cd ~/repos/uapi/json
bash build.sh
# 等价于:
xls2ui -m ../models -o ../wwwroot uapi *.json
```
该命令读取 `models/*.xlsx` 表定义和 `json/*.json` CRUD 配置,生成 `wwwroot/` 下的 `.ui` 前端文件。
**注意:** `.dspy` 文件是手写的,不会被 `xls2ui` 覆盖。
---
## 权限配置
`script/perms.json` 中定义 RBAC 权限:
```json
[
{
"path": "/uapi/upapp",
"perms": [
{"orgtype": "customer", "roles": ["operator"]},
{"orgtype": "owner", "roles": ["operator"]}
]
},
{
"path": "/uapi/jsonhttpapi",
"perms": [
{"orgtype": "customer", "roles": ["operator"]},
{"orgtype": "owner", "roles": ["operator"]}
]
},
{
"path": "/uapi/upappkey",
"perm": [
{"orgtype": "customer", "roles": ["operator"]},
{"orgtype": "owner", "roles": ["operator"]}
]
}
]
```
- `customer` 组织和 `owner` 组织的 `operator` 角色可访问上位系统和密钥管理页面
---
## 关键设计要点
1. **模板引擎渲染**path/headers/data/params/response 全部支持模板语法,可在配置中动态组装
2. **流式响应支持**:通过 `StreamHttpClient``stream_linify` 支持 SSE 场景(如 LLM API
3. **chunk_match 过滤**:自动去除流式响应中的前缀行(如 `data:`),只提取有效数据
4. **response 模板**:可定义响应数据转换模板,将上游 JSON 映射为前端需要的格式
5. **AES 加密密钥**`secretkey` 在数据库中 AES 加密存储,运行时通过 `password_decode()` 解密
6. **Deerer 认证**:自定义认证头,时间戳 + apikey AES 加密,防重放攻击
7. **动态函数扩展**`dynamic_func` 允许在 HTTP 请求前执行自定义逻辑(通过 RegisterFunction
---
## 依赖关系
```
uapi
├── sqlor # 数据库 ORM
├── apppublic # 工具库日志、AES、HTTP 客户端、模板引擎等)
├── ahserver # Web 服务器框架ServerEnv、password_decode 等)
└── aiohttp # 异步 HTTPrequirements.txt
```
---
## 开发注意事项
1. **dbname 获取**:必须通过 `get_serverenv('get_module_dbname')('uapi')` 动态获取,禁止硬编码
2. **sqlor 使用**`sor.R(tablename, ns)` 的 ns 字典同时包含过滤条件和选项
3. **密钥解密**:从数据库读取的 apikey/secretkey 必须通过 `password_decode()` 解密
4. **模板渲染异常**headers/body 模板渲染后需 json.loads 验证,渲染失败会抛出异常
5. **uptask 回调**convert_func 返回 None 时会抛异常,必须返回包含 status 字段的字典