def _escape(value): if value is None: return None return str(value).replace("'", "''") def _parse_bool(value, default=True): if value is None or value == '': return default if isinstance(value, bool): return value return str(value).lower() in ('1', 'true', 'yes', 'on') def _parse_messages(ns): """解析历史消息:支持 list 或 JSON 字符串""" raw = ns.get('messages') if not raw: return [] if isinstance(raw, list): return raw if isinstance(raw, str): import json try: return json.loads(raw) except Exception: return [] return [] def build_user_content(ns): """ 构建单条 user 消息的 content,支持文本 / 图片 / 文档链接。 参数(可组合): message / text 文本 image_url 图片 URL image_base64 图片 base64(不含 data: 前缀) document_url 文档 URL(以 file 类型传给兼容接口) document_text 文档纯文本(拼入 text) """ text_parts = [] if ns.get('message'): text_parts.append(str(ns.get('message'))) if ns.get('text'): text_parts.append(str(ns.get('text'))) if ns.get('document_text'): text_parts.append(str(ns.get('document_text'))) parts = [] merged_text = '\n'.join([p for p in text_parts if p]).strip() if merged_text: parts.append({'type': 'text', 'text': merged_text}) if ns.get('image_url'): parts.append({ 'type': 'image_url', 'image_url': {'url': ns.get('image_url')}, }) if ns.get('image_base64'): mime = ns.get('image_mime') or 'image/jpeg' b64 = ns.get('image_base64') if not str(b64).startswith('data:'): b64 = 'data:%s;base64,%s' % (mime, b64) parts.append({ 'type': 'image_url', 'image_url': {'url': b64}, }) if ns.get('document_url'): parts.append({ 'type': 'file', 'file': {'file_url': ns.get('document_url')}, }) if not parts: return '' if len(parts) == 1 and parts[0]['type'] == 'text': return parts[0]['text'] return parts async def _resolve_chat_config(ns, sor): """解析 API 地址与 Bearer Token""" api_url = ns.get('api_url') api_key = ns.get('api_key') if not api_url and ns.get('model_id'): doc_rows = await sor.sqlExe( "SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;" % _escape(ns.get('model_id')), {}, ) if doc_rows and doc_rows[0].get('api_url'): api_url = doc_rows[0]['api_url'] if not str(api_url).endswith('/chat/completions'): api_url = str(api_url).rstrip('/') + '/chat/completions' if not api_url: param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'}) if param_rows: api_url = param_rows[0]['pvalue'] else: domain_rows = await sor.R('params', {'pname': 'cntoai_domain'}) if domain_rows: api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions' else: api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions' if not api_key: userid = ns.get('userid') or await get_user() if userid: action = ns.get('apikey_action') or 'user_self_create' keys = await sor.R('user_api_keys', {'userid': userid, 'action': action}) if not keys: keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'}) if keys: api_key = keys[0].get('opc_apikey') if not api_key: key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'}) if key_rows: api_key = key_rows[0]['pvalue'] return api_url, api_key async def _read_stream_response(response): """解析 SSE 流式响应,汇总 assistant 文本""" import json chunks = [] buffer = '' async for raw in response.content: buffer += raw.decode('utf-8', errors='ignore') while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() if not line.startswith('data:'): continue data = line[5:].strip() if data == '[DONE]': return ''.join(chunks) try: payload = json.loads(data) choice = (payload.get('choices') or [{}])[0] delta = choice.get('delta') or {} piece = delta.get('content') or '' if piece: chunks.append(piece) except Exception: continue return ''.join(chunks) async def llm_chat_completions(ns={}): """ OpenAI 兼容 chat/completions(aiohttp)。 参数: model (str) 模型名,必填 message / text 当前用户文本 messages 历史消息 JSON 数组或 list,多轮对话 stream (bool) 是否流式,默认 True image_url / image_base64 图片 document_url / document_text 文档 api_url / api_key 可覆盖默认配置 model_id 从 model_api_doc 读取 api_url userid 用于查 user_api_keys """ import aiohttp import json import traceback model = ns.get('model') if not model: return {'status': False, 'msg': 'model is required'} stream = _parse_bool(ns.get('stream'), True) history = _parse_messages(ns) user_content = build_user_content(ns) if not user_content and not history: return {'status': False, 'msg': 'message is required'} messages = list(history) if user_content: messages.append({'role': 'user', 'content': user_content}) payload = { 'model': model, 'stream': stream, 'messages': messages, } db = DBPools() async with db.sqlorContext('kboss') as sor: try: api_url, api_key = await _resolve_chat_config(ns, sor) if not api_key: return {'status': False, 'msg': '未找到 API Key,请先创建或配置 cntoai_llm_api_key'} headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer %s' % api_key, } async with aiohttp.ClientSession() as session: async with session.post(api_url, headers=headers, json=payload) as response: if response.status != 200: err_text = await response.text() return { 'status': False, 'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]), } if stream: reply = await _read_stream_response(response) usage = {} else: body = await response.json() choice = (body.get('choices') or [{}])[0] msg = choice.get('message') or {} reply = msg.get('content') or '' usage = body.get('usage') or {} return { 'status': True, 'msg': 'chat success', 'data': { 'model': model, 'reply': reply, 'messages': messages + [{'role': 'assistant', 'content': reply}], 'usage': usage, 'stream': stream, }, } except Exception: return {'status': False, 'msg': 'chat failed, %s' % traceback.format_exc()} ret = await llm_chat_completions(params_kw) return ret