# `StreamLlmProxy` 与 LLM 代理类技术文档 本项目提供了一套用于代理调用大语言模型(LLM)API 的异步 Python 类,支持流式和同步请求,并具备参数转换、身份认证、响应处理等功能。该模块适用于构建灵活的 LLM 网关服务。 --- ## 模块依赖 ```python import re import base64 import json from traceback import format_exc from aiohttp import web from appPublic.dictObject import DictObject from appPublic.log import debug, info, exception, error from appPublic.httpclient import HttpClient, RESPONSE_TEXT, RESPONSE_JSON, RESPONSE_BIN, RESPONSE_FILE, RESPONSE_STREAM from appPublic.registerfunction import RegisterFunction from appPublic.argsConvert import ArgsConvert ``` > **说明**: > - 使用 `aiohttp` 实现异步 Web 响应流。 > - `DictObject` 提供字典属性访问语法(如 `obj.key`)。 > - `HttpClient` 是封装的异步 HTTP 客户端。 > - `ArgsConvert` 支持模板字符串替换(类似 `${var}`)。 > - 日志使用 `appPublic.log` 统一输出。 --- ## 工具函数 ### `encode_imagefile(fn)` 将本地图片文件编码为 Base64 字符串,常用于向 LLM API 发送图像输入。 #### 参数: - `fn` (str): 图像文件路径。 #### 返回值: - (str): Base64 编码后的 UTF-8 字符串。 #### 示例: ```python img_b64 = encode_imagefile("example.jpg") ``` --- ## 核心类:`StreamLlmProxy` 一个通用的 LLM 接口代理类,支持流式响应处理。 ### 初始化:`__init__(self, processor, desc)` #### 参数: - `processor`: 包含运行环境上下文的对象(需有 `run_ns` 属性)。 - `desc` (dict-like): 描述目标 LLM API 的配置对象,必须包含 `.name` 字段。 #### 属性初始化: | 属性 | 类型 | 说明 | |------|------|------| | `name` | str | API 名称 | | `processor` | object | 上下文处理器 | | `auth_api` | dict/None | 认证接口定义 | | `desc` | object | 原始描述对象 | | `api_name` | str | 同 `name` | | `data` | DictObject | 存储用户级临时数据(如 token) | | `ac` | ArgsConvert | 模板参数解析器 | > ⚠️ 断言 `desc.name` 必须存在。 --- ### 方法列表 #### `line_chunk_match(l)` 对流式返回的每一行进行正则匹配提取有效 JSON 内容。 ##### 参数: - `l` (str): 输入文本行。 ##### 返回值: - 匹配到的组或原字符串。 ##### 配置来源: 通过 `self.api.chunk_match` 正则表达式控制,例如: ```json "chunk_match": "data: (.*)" ``` --- #### `write_chunk(ll)` 将单条消息写入 HTTP 流响应体中,支持过滤与字段映射。 ##### 参数: - `ll` (str): 待处理的消息字符串。 ##### 功能流程: 1. 跳过 `[DONE]` 标记; 2. 使用 `line_chunk_match` 提取内容; 3. 解析为 `DictObject`; 4. 根据 `api.resp` 映射输出字段; 5. 可选地根据 `chunk_filter` 条件清空某字段; 6. 序列化为 JSON 并写入流。 ##### 异常处理: 捕获所有异常并记录堆栈日志。 --- #### `stream_handle(chunk)` 处理来自 `HttpClient` 的原始字节流,按换行分割并逐行调用 `write_chunk`。 ##### 参数: - `chunk` (bytes): 原始响应片段。 ##### 特性: - 自动拼接跨块不完整行(使用 `self.remain_str` 缓存尾部未完成部分)。 --- #### `get_apikey(apiname)` 从运行环境中获取当前用户的 API Key。 ##### 参数: - `apiname` (str): 目标 API 名称。 ##### 行为: 调用 `processor.run_ns.get_llm_user_apikey(apiname, user)` 获取密钥。 > 若函数未注册,则抛出异常。 --- #### `get_apidata(parts, params={})` 根据配置动态生成请求参数(headers/data/params),支持模板变量替换与自定义转换器。 ##### 参数: - `parts`: 参数定义列表,每项结构如下: ```python { "name": "field_name", "value": "template string with ${user}", "convertor": "optional_function_name" } ``` - `params`: 外部传入参数,优先级高于 `self.data`。 ##### 返回值: - (dict): 构造好的参数字典。 ##### 转换机制: - 使用 `ArgsConvert('${', '}')` 替换 `${key}` 模板; - 若指定了 `convertor`,通过 `RegisterFunction().exe()` 执行转换逻辑。 --- #### `do_auth(request)` 执行前置认证流程(如获取 access_token),通常用于需要 OAuth 或 JWT 的场景。 ##### 流程: 1. 检查是否已认证(避免重复请求); 2. 调用 `get_apikey()` 获取基础凭证; 3. 构造认证请求(URL、method、headers、data); 4. 发起请求并解析响应; 5. 将关键字段保存至 `self.data`; 6. 设置 `authed=True` 并持久化存储。 ##### 数据持久化: 通过 `set_data(key, value)` 存储于应用全局状态。 --- #### `data_key(apiname)` 生成基于用户的身份数据键名。 ##### 规则: ``` {apiname}_a_{username} ``` 若用户为空,默认为 `'anonymous'`。 --- #### `set_data(apiname, data)` / `get_data(apiname)` 封装了应用级别的数据读写操作,基于 `aiohttp.web.Application` 的共享状态。 > 利用 `request.app` 实现跨请求的数据缓存(如 access_token 缓存)。 --- #### `__call__(request, params)` 主入口方法,处理客户端请求并转发至后端 LLM 接口。 ##### 参数: - `request`: aiohttp 请求对象; - `params`: 用户请求参数(DictObject); ##### 关键行为: - 支持流式 (`stream=True`) 和非流模式; - 准备 `StreamResponse`; - 解析 URL、method、headers、data、params; - 使用 `HttpClient` 发起异步请求; - 注册 `stream_func=self.stream_handle` 处理流数据; - 最终返回 `web.StreamResponse` 对象。 ##### 日志调试: 打印完整的请求信息(URL、参数等)便于排查问题。 --- #### `datalize(dic, data={})` 模板变量填充工具函数。 ##### 参数: - `dic`: 包含 `${...}` 模板的字典或字符串; - `data`: 补充变量源。 ##### 优先级: `data` > `self.data` ##### 返回值: 替换后的结果。 --- ## 派生类 ### `SyncLlmProxy(StreamLlmProxy)` 同步版本代理,直接返回完整 JSON 响应。 #### 差异点: - 不使用流式响应; - `response_type=RESPONSE_JSON`; - 返回普通字典而非 `StreamResponse`; - 结果通过 `convert_resp(resp)` 进行字段映射。 #### `convert_resp(resp)` 将原始响应按 `api.resp` 配置投影成标准格式。 ##### 示例配置: ```json "resp": [ {"name": "content", "value": "choices.0.message.content"}, {"name": "finish_reason", "value": "choices.0.finish_reason"} ] ``` --- ### `AsyncLlmProxy(StreamLlmProxy)` 异步但非流式代理,等待完整响应后再返回。 #### 特性: - 仍准备 `StreamResponse`; - 实际以 `RESPONSE_JSON` 获取完整数据; - 再次尝试写入残留字符串(`remain_str`); - 返回 `StreamResponse`(可能无实际流内容); > ❗ 注意:此设计可能存在语义歧义,建议明确区分“异步获取”与“流式传输”。 --- ## 配置结构示例(JSON Schema) ```json { "name": "openai_gpt4", "auth": { "url": "https://api.openai.com/v1/token", "method": "POST", "headers": [ {"name": "Authorization", "value": "Bearer ${api_key}"} ], "data": {}, "set_data": [ {"name": "access_token", "field": "token"} ] }, "chat": { "url": "https://api.openai.com/v1/chat/completions", "method": "POST", "need_auth": true, "headers": [ {"name": "Authorization", "value": "Bearer ${access_token}"}, {"name": "Content-Type", "value": "application/json"} ], "data": [ {"name": "model", "value": "${model}"}, {"name": "messages", "value": "${messages}"}, {"name": "stream", "value": "${stream}"} ], "resp": [ {"name": "text", "value": "choices.0.delta.content"}, {"name": "done", "value": "choices.0.finish_reason"} ], "chunk_match": "data: (.*)", "chunk_filter": { "name": "choices.0.delta.role", "value": "system", "op": "!=", "field": "text" } } } ``` --- ## 使用场景 | 场景 | 推荐类 | |------|--------| | 实时聊天界面(SSE) | `StreamLlmProxy` | | 获取完整回复(CLI 工具) | `SyncLlmProxy` | | 异步任务调度 | `AsyncLlmProxy` | --- ## 日志级别说明 | 级别 | 用途 | |------|------| | `debug()` | 请求详情、内部变量 | | `info()` | 正常流程提示 | | `exception()` | 异常 + 堆栈 | | `error()` | 严重错误 | --- ## 注意事项 1. **安全性**:敏感信息(如 API Key)应在 `get_llm_user_apikey` 中安全获取,不应硬编码。 2. **性能**:`stream_handle` 中避免阻塞操作,确保高吞吐。 3. **兼容性**:不同 LLM 提供商的流格式差异较大,需正确设置 `chunk_match`。 4. **缓存策略**:`set_data/get_data` 基于内存,重启丢失,可扩展至 Redis。 --- ## 总结 该组件实现了高度可配置化的 LLM API 代理层,支持: ✅ 多种调用模式(流式/同步) ✅ 动态参数注入(模板引擎) ✅ 自定义响应映射 ✅ 认证流程自动化 ✅ 插件式转换函数 适合集成进企业级 AI 网关系统,作为统一接入点。