From 5ba835082baa56610162d8abb50a6cebb0a057b9 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 22 May 2026 11:07:29 +0800 Subject: [PATCH 1/5] update --- b/cntoai/model_management_search.dspy | 23 +++++++++++++++++++---- b/product/get_firstpage_product_tree.dspy | 9 ++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/b/cntoai/model_management_search.dspy b/b/cntoai/model_management_search.dspy index 6893ee2..2547234 100644 --- a/b/cntoai/model_management_search.dspy +++ b/b/cntoai/model_management_search.dspy @@ -26,7 +26,7 @@ def _build_model_dict(ns, include_listing_status=False): async def model_management_search(ns={}): """ 分页查询模型列表,支持按 model_name / model_type / provider 筛选。 - 返回模型总数、待上架总数、已上架总数。 + 返回模型总数、待上架总数、已上架总数,以及厂商列表、模型类型列表。 """ import traceback @@ -35,10 +35,10 @@ async def model_management_search(ns={}): offset = (current_page - 1) * page_size conditions = ['1=1'] - if ns.get('model_name'): - model_name = ns.get('model_name') + if ns.get('display_name'): + display_name = ns.get('display_name') # 模糊查询 - conditions.append(f"model_name LIKE '%%%%{model_name}%%%%'") + conditions.append(f"display_name LIKE '%%%%{display_name}%%%%'") if ns.get('model_type'): conditions.append("model_type = '%s'" % _escape(ns.get('model_type'))) if ns.get('provider'): @@ -55,10 +55,23 @@ async def model_management_search(ns={}): stats_li = await sor.sqlExe(stats_sql, {}) stats = stats_li[0] if stats_li else {} + provider_sql = """ + SELECT DISTINCT provider FROM model_management + WHERE provider IS NOT NULL AND provider != '' + ORDER BY provider; + """ + model_type_sql = """ + SELECT DISTINCT model_type FROM model_management + WHERE model_type IS NOT NULL AND model_type != '' + ORDER BY model_type; + """ + count_sql = """SELECT COUNT(*) AS total_count FROM model_management WHERE %s;""" % where_clause filter_total = (await sor.sqlExe(count_sql, {}))[0]['total_count'] find_sql = """SELECT * FROM model_management WHERE %s ORDER BY sort_order ASC LIMIT %s OFFSET %s;""" % (where_clause, page_size, offset) + provider_rows = await sor.sqlExe(provider_sql, {}) + model_type_rows = await sor.sqlExe(model_type_sql, {}) model_list = await sor.sqlExe(find_sql, {}) return { @@ -68,6 +81,8 @@ async def model_management_search(ns={}): 'total_count': stats.get('total_count', 0), 'pending_count': int(stats.get('pending_count') or 0), 'listed_count': int(stats.get('listed_count') or 0), + 'provider_list': [r['provider'] for r in provider_rows], + 'model_type_list': [r['model_type'] for r in model_type_rows], 'filter_total': filter_total, 'page_size': page_size, 'current_page': current_page, diff --git a/b/product/get_firstpage_product_tree.dspy b/b/product/get_firstpage_product_tree.dspy index 2e899fc..b7fab08 100644 --- a/b/product/get_firstpage_product_tree.dspy +++ b/b/product/get_firstpage_product_tree.dspy @@ -539,6 +539,12 @@ jiajie_ali_products = [ ] async def get_firstpage_product_tree(ns={}): + token_market = await path_call('../cntoai/model_management_customer_search.dspy', ns) + # if token_market.get('status'): + # token_market = token_market.get('data') + # else: + # token_market = None + data = { "product_service": [ { @@ -590,7 +596,8 @@ async def get_firstpage_product_tree(ns={}): # }, # ], }, - ] + ], + 'token_market': token_market }, { 'id': "2", 'firTitle': "元境", 'secMenu': [ From 2e34cc4d8226ed862caa10809d89e4fb454bf1c3 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 22 May 2026 19:18:37 +0800 Subject: [PATCH 2/5] update --- b/cntoai/chat_send.dspy | 160 +++++++++++++++++++ b/cntoai/chat_session_delete.dspy | 39 +++++ b/cntoai/chat_session_list.dspy | 50 ++++++ b/cntoai/chat_session_messages.dspy | 66 ++++++++ b/cntoai/chat_tables.sql | 23 +++ b/cntoai/create_model_apikey.dspy | 4 +- b/cntoai/get_model_api_doc.dspy | 53 ++++++ b/cntoai/get_model_apikey.dspy | 157 +++++++++--------- b/cntoai/get_user_balance.dspy | 29 ++-- b/cntoai/llm_chat_completions.dspy | 239 ++++++++++++++++++++++++++++ b/cntoai/process_user_billing.dspy | 40 +++-- b/cntoai/sync_cn_ai_user.dspy | 21 ++- b/customer/forgotPassword.dspy | 70 ++++++++ b/customer/registerUser.dspy | 41 ++++- b/user/mobilecode.dspy | 12 ++ 15 files changed, 894 insertions(+), 110 deletions(-) create mode 100644 b/cntoai/chat_send.dspy create mode 100644 b/cntoai/chat_session_delete.dspy create mode 100644 b/cntoai/chat_session_list.dspy create mode 100644 b/cntoai/chat_session_messages.dspy create mode 100644 b/cntoai/chat_tables.sql create mode 100644 b/cntoai/get_model_api_doc.dspy create mode 100644 b/cntoai/llm_chat_completions.dspy create mode 100644 b/customer/forgotPassword.dspy diff --git a/b/cntoai/chat_send.dspy b/b/cntoai/chat_send.dspy new file mode 100644 index 0000000..0611dfb --- /dev/null +++ b/b/cntoai/chat_send.dspy @@ -0,0 +1,160 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _title_from_message(ns): + text = ns.get('message') or ns.get('text') or '' + text = str(text).strip().replace('\n', ' ') + if not text: + return '新对话' + return text[:30] + ('...' if len(text) > 30 else '') + + +def _build_user_content(ns): + text_parts = [] + if ns.get('message'): + text_parts.append(str(ns.get('message'))) + if ns.get('text'): + text_parts.append(str(ns.get('text'))) + if ns.get('document_text'): + text_parts.append(str(ns.get('document_text'))) + + parts = [] + merged_text = '\n'.join([p for p in text_parts if p]).strip() + if merged_text: + parts.append({'type': 'text', 'text': merged_text}) + if ns.get('image_url'): + parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}}) + if ns.get('image_base64'): + mime = ns.get('image_mime') or 'image/jpeg' + b64 = ns.get('image_base64') + if not str(b64).startswith('data:'): + b64 = 'data:%s;base64,%s' % (mime, b64) + parts.append({'type': 'image_url', 'image_url': {'url': b64}}) + if ns.get('document_url'): + parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}}) + if not parts: + return '' + if len(parts) == 1 and parts[0]['type'] == 'text': + return parts[0]['text'] + return parts + + +async def _load_session_messages(sor, session_id): + sql = """ + SELECT role, content, content_type + FROM chat_message + WHERE session_id = '%s' + ORDER BY created_at ASC; + """ % _escape(session_id) + rows = await sor.sqlExe(sql, {}) + messages = [] + for row in rows: + content = row.get('content') or '' + if row.get('content_type') == 'mixed': + import json + try: + content = json.loads(content) + except Exception: + pass + messages.append({'role': row['role'], 'content': content}) + return messages + + +async def chat_send(ns={}): + """ + 发送消息并保存多轮对话(需先执行 chat_tables.sql)。 + + 参数:model, message, stream(默认true), session_id, + image_url, image_base64, document_url, document_text + """ + import json + import traceback + + model = ns.get('model') + if not model: + return {'status': False, 'msg': 'model is required'} + + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + user_content = _build_user_content(ns) + if not user_content: + return {'status': False, 'msg': '请输入文本,或提供图片/文档参数'} + + content_type = 'mixed' if isinstance(user_content, list) else 'text' + store_content = json.dumps(user_content, ensure_ascii=False) if content_type == 'mixed' else str(user_content) + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + session_id = ns.get('session_id') + if not session_id: + session_id = uuid() + await sor.C('chat_session', { + 'id': session_id, + 'userid': userid, + 'model': model, + 'title': _title_from_message(ns), + }) + else: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + return {'status': False, 'msg': '会话不存在'} + + await sor.C('chat_message', { + 'id': uuid(), + 'session_id': session_id, + 'role': 'user', + 'content': store_content, + 'content_type': content_type, + }) + + history = await _load_session_messages(sor, session_id) + stream_val = ns.get('stream', True) + if isinstance(stream_val, str): + stream_val = stream_val.lower() in ('1', 'true', 'yes', 'on') + chat_result = await path_call('llm_chat_completions.dspy', { + 'model': model, + 'messages': history, + 'stream': stream_val, + 'userid': userid, + 'api_url': ns.get('api_url'), + 'api_key': ns.get('api_key'), + 'model_id': ns.get('model_id'), + }) + if not chat_result.get('status'): + return chat_result + + reply = chat_result['data']['reply'] + await sor.C('chat_message', { + 'id': uuid(), + 'session_id': session_id, + 'role': 'assistant', + 'content': reply, + 'content_type': 'text', + }) + await sor.sqlExe( + "UPDATE chat_session SET updated_at = NOW() WHERE id = '%s';" + % _escape(session_id), + {}, + ) + + return { + 'status': True, + 'msg': 'send success', + 'data': { + 'session_id': session_id, + 'reply': reply, + 'model': model, + }, + } + except Exception: + return {'status': False, 'msg': 'send failed, %s' % traceback.format_exc()} + + +ret = await chat_send(params_kw) +return ret diff --git a/b/cntoai/chat_session_delete.dspy b/b/cntoai/chat_session_delete.dspy new file mode 100644 index 0000000..bb12134 --- /dev/null +++ b/b/cntoai/chat_session_delete.dspy @@ -0,0 +1,39 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def chat_session_delete(ns={}): + """删除会话及其全部消息""" + session_id = ns.get('session_id') + if not session_id: + return {'status': False, 'msg': 'session_id is required'} + + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + return {'status': False, 'msg': '会话不存在'} + + await sor.sqlExe( + "DELETE FROM chat_message WHERE session_id = '%s';" % _escape(session_id), + {}, + ) + await sor.sqlExe( + "DELETE FROM chat_session WHERE id = '%s' AND userid = '%s';" + % (_escape(session_id), _escape(userid)), + {}, + ) + return {'status': True, 'msg': 'delete success'} + except Exception as e: + return {'status': False, 'msg': 'delete failed, %s' % str(e)} + + +ret = await chat_session_delete(params_kw) +return ret diff --git a/b/cntoai/chat_session_list.dspy b/b/cntoai/chat_session_list.dspy new file mode 100644 index 0000000..c814ef6 --- /dev/null +++ b/b/cntoai/chat_session_list.dspy @@ -0,0 +1,50 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def chat_session_list(ns={}): + """当前用户的对话会话列表(左侧栏历史)""" + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + page_size = int(ns.get('page_size', 50)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + count_sql = """ + SELECT COUNT(*) AS total_count FROM chat_session + WHERE userid = '%s'; + """ % _escape(userid) + total = (await sor.sqlExe(count_sql, {}))[0]['total_count'] + + find_sql = """ + SELECT id, model, title, created_at, updated_at + FROM chat_session + WHERE userid = '%s' + ORDER BY updated_at DESC + LIMIT %s OFFSET %s; + """ % (_escape(userid), page_size, offset) + sessions = await sor.sqlExe(find_sql, {}) + + return { + 'status': True, + 'msg': 'list success', + 'data': { + 'total_count': total, + 'page_size': page_size, + 'current_page': current_page, + 'sessions': sessions, + }, + } + except Exception as e: + return {'status': False, 'msg': 'list failed, %s' % str(e)} + + +ret = await chat_session_list(params_kw) +return ret diff --git a/b/cntoai/chat_session_messages.dspy b/b/cntoai/chat_session_messages.dspy new file mode 100644 index 0000000..d702f8d --- /dev/null +++ b/b/cntoai/chat_session_messages.dspy @@ -0,0 +1,66 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def chat_session_messages(ns={}): + """获取某次会话的全部消息""" + session_id = ns.get('session_id') + if not session_id: + return {'status': False, 'msg': 'session_id is required'} + + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + return {'status': False, 'msg': '会话不存在'} + + sql = """ + SELECT id, role, content, content_type, created_at + FROM chat_message + WHERE session_id = '%s' + ORDER BY created_at ASC; + """ % _escape(session_id) + rows = await sor.sqlExe(sql, {}) + + messages = [] + for row in rows: + content = row.get('content') or '' + if row.get('content_type') == 'mixed': + import json + try: + content = json.loads(content) + except Exception: + pass + if isinstance(content, list): + text_parts = [p.get('text', '') for p in content if p.get('type') == 'text'] + display = '\n'.join([t for t in text_parts if t]) or '[多媒体消息]' + else: + display = content + messages.append({ + 'id': row['id'], + 'role': row['role'], + 'content': display, + 'created_at': row.get('created_at'), + }) + + return { + 'status': True, + 'msg': 'get messages success', + 'data': { + 'session': sessions[0], + 'messages': messages, + }, + } + except Exception as e: + return {'status': False, 'msg': 'get messages failed, %s' % str(e)} + + +ret = await chat_session_messages(params_kw) +return ret diff --git a/b/cntoai/chat_tables.sql b/b/cntoai/chat_tables.sql new file mode 100644 index 0000000..f84777e --- /dev/null +++ b/b/cntoai/chat_tables.sql @@ -0,0 +1,23 @@ +-- 多轮对话:请先执行本脚本创建表后再使用 chat_send / chat_session_* 接口 + +CREATE TABLE IF NOT EXISTS `chat_session` ( + `id` varchar(64) NOT NULL COMMENT '会话ID', + `userid` varchar(64) NOT NULL COMMENT '用户ID', + `model` varchar(128) NOT NULL COMMENT '模型名称', + `title` varchar(255) DEFAULT NULL COMMENT '会话标题(首条问题摘要)', + `created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间', + `updated_at` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新时间', + PRIMARY KEY (`id`), + KEY `idx_userid_updated` (`userid`, `updated_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话会话'; + +CREATE TABLE IF NOT EXISTS `chat_message` ( + `id` varchar(64) NOT NULL COMMENT '消息ID', + `session_id` varchar(64) NOT NULL COMMENT '会话ID', + `role` varchar(32) NOT NULL COMMENT '角色: user / assistant / system', + `content` mediumtext COMMENT '消息内容(纯文本或JSON)', + `content_type` varchar(32) DEFAULT 'text' COMMENT 'text / mixed', + `created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间', + PRIMARY KEY (`id`), + KEY `idx_session_id` (`session_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话消息'; diff --git a/b/cntoai/create_model_apikey.dspy b/b/cntoai/create_model_apikey.dspy index 5068403..9969df8 100644 --- a/b/cntoai/create_model_apikey.dspy +++ b/b/cntoai/create_model_apikey.dspy @@ -60,6 +60,7 @@ async def create_model_apikey(ns={}): async with session.post(url, headers=headers, data=json.dumps(payload)) as response: # 打印响应状态码 debug(f"create_model_apikey状态码: {response.status}") + debug(f"create_model_apikey响应: {await response.text()}") result_sysnc = await response.json() if not result_sysnc.get('status') == 'ok': @@ -76,12 +77,13 @@ async def create_model_apikey(ns={}): remote_table_id = result_sysnc['data'].get('id') name = result_sysnc['data'].get('name') secretkey = result_sysnc['data'].get('secretkey') + apikey = result_sysnc['data'].get('apikey') await sor.C('user_api_keys', { 'userid': ns['userid'], 'remote_table_id': remote_table_id, 'name': name, - 'opc_apikey': 1, + 'opc_apikey': apikey, 'secretkey': secretkey, 'action': 'user_self_create', }) diff --git a/b/cntoai/get_model_api_doc.dspy b/b/cntoai/get_model_api_doc.dspy new file mode 100644 index 0000000..b0a1ebd --- /dev/null +++ b/b/cntoai/get_model_api_doc.dspy @@ -0,0 +1,53 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def get_model_api_doc(ns={}): + """ + 根据 model_id 查询模型 API 文档。 + + 参数: + model_id (str) 模型ID,必填 + + 返回 data 字段: + id, model_id, curl_code, python_code, created_at, updated_at + """ + model_id = ns.get('id') + if not model_id: + return {'status': False, 'msg': 'model id is required'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + # 通过model_id从model_management表中查询model_name + model_name_sql = """ + SELECT model_name FROM model_management WHERE id = '%s' LIMIT 1; + """ % _escape(model_id) + model_name = await sor.sqlExe(model_name_sql, {}) + if not model_name: + return {'status': False, 'msg': 'model not found'} + model_name = model_name[0]['model_name'] + + find_sql = """ + SELECT id, api_url, model_id, curl_code, python_code, created_at, updated_at + FROM model_api_doc + WHERE model_id = '%s' + LIMIT 1; + """ % _escape(model_id) + result = await sor.sqlExe(find_sql, {}) + if not result: + return {'status': False, 'msg': 'api doc not found'} + result[0]['model_name'] = model_name + return { + 'status': True, + 'msg': 'get model api doc success', + 'data': result[0], + } + except Exception as e: + return {'status': False, 'msg': 'get model api doc failed, %s' % str(e)} + + +ret = await get_model_api_doc(params_kw) +return ret diff --git a/b/cntoai/get_model_apikey.dspy b/b/cntoai/get_model_apikey.dspy index 30ba719..9dda40e 100644 --- a/b/cntoai/get_model_apikey.dspy +++ b/b/cntoai/get_model_apikey.dspy @@ -10,90 +10,99 @@ async def get_model_apikey(ns={}): 'msg': '未找到用户' } + action = ns.get('action') + if not action: + action = 'user_self_create' + # 通过userid从user_api_keys表中查询opc_apikey db = DBPools() async with db.sqlorContext('kboss') as sor: - records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': 'sync'}) + records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': action}) if not records: return { 'status': False, - 'msg': '未找到用户opc_apikey' + 'msg': 'apikey不存在' } - - already_sync_user_key = records[0]['opc_apikey'] - already_sync_user_appid = records[0]['appid'] - - # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 - db = DBPools() - async with db.sqlorContext('kboss') as sor: - domain = await sor.R('params', {'pname': 'cntoai_domain'}) - if domain: - domain = domain[0]['pvalue'] - else: - debug(f"get_model_apikey未找到域名") - return { - 'status': False, - 'msg': '未找到域名' - } - - # 目标URL - url = f"{domain}/dapi/downapps.dspy" - # 请求头 - headers = { - "Content-Type": "application/json", - "Authorization": "Bearer %s" % already_sync_user_key + return { + 'status': True, + 'msg': '获取模型apikey成功', + 'data': records } - - try: - # 创建一个异步会话 - result_sysnc = None - async with aiohttp.ClientSession() as session: - # 发送GET请求 - async with session.get(url, headers=headers) as response: - # 打印响应状态码 - debug(f"get_model_apikey状态码: {response.status}") - result_sysnc = await response.json() - - if not result_sysnc.get('status') == 'ok': - debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}") - return { - 'status': False, - 'msg': f"获取模型apikey失败: {result_sysnc}" - } - - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # user_api_keys表格 userid/opc_apikey - # 首先判断apikey是否存在 - apikeys = result_sysnc['data']['apikeys'] - # 遍历apikeys,如果apikey不存在,则创建, 如果存在则做更新 根据userid和remote_table_id判断 - for apikey_item in apikeys: - remote_table_id = apikey_item.get('id') - name = '' if not apikey_item.get('name') else apikey_item.get('name') - apikeyid = apikey_item.get('apikeyid') - exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id}) - if exist_record: - update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'" - await sor.sqlExe(update_sql, {}) - else: - await sor.C('user_api_keys', { - 'userid': ns['userid'], - 'remote_table_id': remote_table_id, - 'name': name, - 'opc_apikey': apikeyid, - 'action': 'user_self_create', - }) - - result_sysnc['status'] = True - return result_sysnc + # already_sync_user_key = records[0]['opc_apikey'] + # already_sync_user_appid = records[0]['appid'] - except Exception as e: - debug(f"get_model_apikey获取模型apikey失败: {e}") - return { - 'status': False, - 'msg': f"get_model_apikey获取模型apikey失败: {e}" - } + # # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + # db = DBPools() + # async with db.sqlorContext('kboss') as sor: + # domain = await sor.R('params', {'pname': 'cntoai_domain'}) + # if domain: + # domain = domain[0]['pvalue'] + # else: + # debug(f"get_model_apikey未找到域名") + # return { + # 'status': False, + # 'msg': '未找到域名' + # } + + # # 目标URL + # url = f"{domain}/dapi/downapps.dspy" + + # # 请求头 + # headers = { + # "Content-Type": "application/json", + # "Authorization": "Bearer %s" % already_sync_user_key + # } + + # try: + # # 创建一个异步会话 + # result_sysnc = None + # async with aiohttp.ClientSession() as session: + # # 发送GET请求 + # async with session.get(url, headers=headers) as response: + # # 打印响应状态码 + # debug(f"get_model_apikey状态码: {response.status}") + # result_sysnc = await response.json() + + # if not result_sysnc.get('status') == 'ok': + # debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}") + # return { + # 'status': False, + # 'msg': f"获取模型apikey失败: {result_sysnc}" + # } + + # db = DBPools() + # async with db.sqlorContext('kboss') as sor: + # # user_api_keys表格 userid/opc_apikey + # # 首先判断apikey是否存在 + # apikeys = result_sysnc['data']['apikeys'] + # # 遍历apikeys,如果apikey不存在,则创建, 如果存在则做更新 根据userid和remote_table_id判断 + # for apikey_item in apikeys: + # remote_table_id = apikey_item.get('id') + # name = '' if not apikey_item.get('name') else apikey_item.get('name') + # apikeyid = apikey_item.get('apikeyid') + # exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id}) + # if exist_record: + # update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'" + # await sor.sqlExe(update_sql, {}) + # else: + # await sor.C('user_api_keys', { + # 'userid': ns['userid'], + # 'remote_table_id': remote_table_id, + # 'name': name, + # 'opc_apikey': apikeyid, + # 'action': 'user_self_create', + # }) + + # result_sysnc['status'] = True + # return result_sysnc + + # except Exception as e: + # debug(f"get_model_apikey获取模型apikey失败: {e}") + # return { + # 'status': False, + # 'msg': f"get_model_apikey获取模型apikey失败: {e}" + # } ret = await get_model_apikey(params_kw) diff --git a/b/cntoai/get_user_balance.dspy b/b/cntoai/get_user_balance.dspy index 16ed070..53c6ce8 100644 --- a/b/cntoai/get_user_balance.dspy +++ b/b/cntoai/get_user_balance.dspy @@ -6,22 +6,27 @@ async def get_user_balance(ns={}): :return: 账户余额(与 getCustomerBalance 返回值一致) """ debug(ns) - apikey = ns.get('apikey') + # apikey = ns.get('apikey') userid = ns.get('userid') db = DBPools() async with db.sqlorContext('kboss') as sor: - if not apikey: - return { - 'status': 'error', - 'msg': 'apikey is required' - } - userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) - if not userid_li: - return { - 'status': 'error', - 'msg': 'apikey无效,请联系管理员' - } + # if not apikey: + # return { + # 'status': 'error', + # 'msg': 'apikey is required' + # } + # userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) + # if not userid_li: + # return { + # 'status': 'error', + # 'msg': 'apikey无效,请联系管理员' + # } # userid = userid_li[0]['userid'] + if not userid: + return { + 'status': 'error', + 'msg': 'userid is required' + } user = await sor.R('users', {'id': userid}) if not user: return { diff --git a/b/cntoai/llm_chat_completions.dspy b/b/cntoai/llm_chat_completions.dspy new file mode 100644 index 0000000..d910986 --- /dev/null +++ b/b/cntoai/llm_chat_completions.dspy @@ -0,0 +1,239 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _parse_bool(value, default=True): + if value is None or value == '': + return default + if isinstance(value, bool): + return value + return str(value).lower() in ('1', 'true', 'yes', 'on') + + +def _parse_messages(ns): + """解析历史消息:支持 list 或 JSON 字符串""" + raw = ns.get('messages') + if not raw: + return [] + if isinstance(raw, list): + return raw + if isinstance(raw, str): + import json + try: + return json.loads(raw) + except Exception: + return [] + return [] + + +def build_user_content(ns): + """ + 构建单条 user 消息的 content,支持文本 / 图片 / 文档链接。 + + 参数(可组合): + message / text 文本 + image_url 图片 URL + image_base64 图片 base64(不含 data: 前缀) + document_url 文档 URL(以 file 类型传给兼容接口) + document_text 文档纯文本(拼入 text) + """ + text_parts = [] + if ns.get('message'): + text_parts.append(str(ns.get('message'))) + if ns.get('text'): + text_parts.append(str(ns.get('text'))) + if ns.get('document_text'): + text_parts.append(str(ns.get('document_text'))) + + parts = [] + merged_text = '\n'.join([p for p in text_parts if p]).strip() + if merged_text: + parts.append({'type': 'text', 'text': merged_text}) + + if ns.get('image_url'): + parts.append({ + 'type': 'image_url', + 'image_url': {'url': ns.get('image_url')}, + }) + if ns.get('image_base64'): + mime = ns.get('image_mime') or 'image/jpeg' + b64 = ns.get('image_base64') + if not str(b64).startswith('data:'): + b64 = 'data:%s;base64,%s' % (mime, b64) + parts.append({ + 'type': 'image_url', + 'image_url': {'url': b64}, + }) + if ns.get('document_url'): + parts.append({ + 'type': 'file', + 'file': {'file_url': ns.get('document_url')}, + }) + + if not parts: + return '' + if len(parts) == 1 and parts[0]['type'] == 'text': + return parts[0]['text'] + return parts + + +async def _resolve_chat_config(ns, sor): + """解析 API 地址与 Bearer Token""" + api_url = ns.get('api_url') + api_key = ns.get('api_key') + + if not api_url and ns.get('model_id'): + doc_rows = await sor.sqlExe( + "SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;" + % _escape(ns.get('model_id')), + {}, + ) + if doc_rows and doc_rows[0].get('api_url'): + api_url = doc_rows[0]['api_url'] + if not str(api_url).endswith('/chat/completions'): + api_url = str(api_url).rstrip('/') + '/chat/completions' + + if not api_url: + param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'}) + if param_rows: + api_url = param_rows[0]['pvalue'] + else: + domain_rows = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain_rows: + api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions' + else: + api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions' + + if not api_key: + userid = ns.get('userid') or await get_user() + if userid: + action = ns.get('apikey_action') or 'user_self_create' + keys = await sor.R('user_api_keys', {'userid': userid, 'action': action}) + if not keys: + keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'}) + if keys: + api_key = keys[0].get('opc_apikey') + if not api_key: + key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'}) + if key_rows: + api_key = key_rows[0]['pvalue'] + + return api_url, api_key + + +async def _read_stream_response(response): + """解析 SSE 流式响应,汇总 assistant 文本""" + import json + chunks = [] + buffer = '' + async for raw in response.content: + buffer += raw.decode('utf-8', errors='ignore') + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() + if not line.startswith('data:'): + continue + data = line[5:].strip() + if data == '[DONE]': + return ''.join(chunks) + try: + payload = json.loads(data) + choice = (payload.get('choices') or [{}])[0] + delta = choice.get('delta') or {} + piece = delta.get('content') or '' + if piece: + chunks.append(piece) + except Exception: + continue + return ''.join(chunks) + + +async def llm_chat_completions(ns={}): + """ + OpenAI 兼容 chat/completions(aiohttp)。 + + 参数: + model (str) 模型名,必填 + message / text 当前用户文本 + messages 历史消息 JSON 数组或 list,多轮对话 + stream (bool) 是否流式,默认 True + image_url / image_base64 图片 + document_url / document_text 文档 + api_url / api_key 可覆盖默认配置 + model_id 从 model_api_doc 读取 api_url + userid 用于查 user_api_keys + """ + import aiohttp + import json + import traceback + + model = ns.get('model') + if not model: + return {'status': False, 'msg': 'model is required'} + + stream = _parse_bool(ns.get('stream'), True) + history = _parse_messages(ns) + user_content = build_user_content(ns) + if not user_content and not history: + return {'status': False, 'msg': 'message is required'} + + messages = list(history) + if user_content: + messages.append({'role': 'user', 'content': user_content}) + + payload = { + 'model': model, + 'stream': stream, + 'messages': messages, + } + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + api_url, api_key = await _resolve_chat_config(ns, sor) + if not api_key: + return {'status': False, 'msg': '未找到 API Key,请先创建或配置 cntoai_llm_api_key'} + + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer %s' % api_key, + } + + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=payload) as response: + if response.status != 200: + err_text = await response.text() + return { + 'status': False, + 'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]), + } + + if stream: + reply = await _read_stream_response(response) + usage = {} + else: + body = await response.json() + choice = (body.get('choices') or [{}])[0] + msg = choice.get('message') or {} + reply = msg.get('content') or '' + usage = body.get('usage') or {} + + return { + 'status': True, + 'msg': 'chat success', + 'data': { + 'model': model, + 'reply': reply, + 'messages': messages + [{'role': 'assistant', 'content': reply}], + 'usage': usage, + 'stream': stream, + }, + } + except Exception: + return {'status': False, 'msg': 'chat failed, %s' % traceback.format_exc()} + + +ret = await llm_chat_completions(params_kw) +return ret diff --git a/b/cntoai/process_user_billing.dspy b/b/cntoai/process_user_billing.dspy index 03964fd..9204e7c 100644 --- a/b/cntoai/process_user_billing.dspy +++ b/b/cntoai/process_user_billing.dspy @@ -26,6 +26,7 @@ async def _charge_order(sor, orderid, order_type='NEW'): """ order_rows = await sor.R('bz_order', {'id': orderid}) if not order_rows: + debug(f"订单不存在") return {'status': 'error', 'msg': '订单不存在'} order_row = order_rows[0] @@ -38,10 +39,11 @@ async def _charge_order(sor, orderid, order_type='NEW'): count = 0 if count - float(order_row['amount']) < 0: pricedifference = count - round(order_row['amount'], 2) + debug(f"账户余额不足,订单金额: {order_row['amount']}, 账户余额: {count}, 差额: {pricedifference}") return { 'status': 'error', 'msg': '账户余额不足', - 'pricedifference': round(pricedifference, 2), + 'pricedifference': round(pricedifference, 10), } await order2bill(orderid, sor) @@ -121,8 +123,10 @@ async def _charge_order(sor, orderid, order_type='NEW'): # ) # await sor.C('customer_goods', nss) + debug(f"支付成功") return {'status': True, 'msg': '支付成功'} except Exception as error: + debug(f"支付失败: {error}") return {'status': 'error', 'msg': str(error)} async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1): @@ -142,11 +146,14 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit supply_price = abs(float(supply_price)) quantity = int(quantity) except (TypeError, ValueError): + debug(f"calc_price_by_saleprotocol supply_price / quantity 必须为有效数字") return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'} if supply_price <= 0: + debug(f"calc_price_by_saleprotocol supply_price 必须大于 0") return {'status': 'error', 'msg': 'supply_price 必须大于 0'} if quantity <= 0: + debug(f"calc_price_by_saleprotocol quantity 必须大于 0") return {'status': 'error', 'msg': 'quantity 必须大于 0'} saleprotocol_to_person = await sor.R( @@ -193,12 +200,13 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit ) if not product_salemode: + debug(f"calc_price_by_saleprotocol 还未上线这个产品的协议配置") return {'status': 'error', 'msg': '还未上线这个产品的协议配置'} discount = product_salemode[0]['discount'] list_price = supply_price - price = abs(round(list_price * discount, 2)) - amount = abs(round(price * quantity, 2)) + price = abs(round(list_price * discount, 12)) + amount = abs(round(price * quantity, 12)) return { 'status': True, @@ -246,17 +254,20 @@ async def process_user_billing(ns={}): llmid = ns.get('llmid') if not llmid: + debug(f"{userid} process_user_billing llmid必传") return { 'status': 'error', 'msg': 'llmid必传' } try: - amount = round(float(amount), 2) + amount = round(float(amount), 12) except (TypeError, ValueError): + debug(f"{userid} process_user_billing amount 必须为有效数字") return {'status': 'error', 'msg': 'amount 必须为有效数字'} if amount <= 0: + debug(f"{userid} process_user_billing amount 必须大于 0") return {'status': 'error', 'msg': 'amount 必须大于 0'} db = DBPools() @@ -264,6 +275,7 @@ async def process_user_billing(ns={}): try: product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'}) if not product_li: + debug(f"{userid} process_user_billing 未找到对应产品,请确认") return { 'status': 'error', 'msg': '未找到对应产品,请确认' @@ -273,26 +285,30 @@ async def process_user_billing(ns={}): providerid = product['providerid'] providername_list = await sor.R('organization', {'id': providerid}) if not providername_list: + debug(f"{userid} process_user_billing 厂商不存在 %s" % providername) return { 'status': 'error', 'msg': '厂商不存在 %s' % providername } providername = providername_list[0]['orgname'] - userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) - if not userid_li: - return { - 'status': 'error', - 'msg': 'apikey无效,请联系管理员' - } + # userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) + # if not userid_li: + # debug(f"{userid} process_user_billing apikey无效,请联系管理员") + # return { + # 'status': 'error', + # 'msg': 'apikey无效,请联系管理员' + # } # userid = userid_li[0]['userid'] user_list = await sor.R('users', {'id': userid}) if not user_list: + debug(f"{userid} process_user_billing 用户不存在 %s" % userid) return {'status': 'error', 'msg': '用户不存在 %s' % userid} org_list = await sor.R('organization', {'id': user_list[0]['orgid']}) if not org_list: + debug(f"{userid} process_user_billing 用户所属机构不存在") return {'status': 'error', 'msg': '用户所属机构不存在'} customerid = org_list[0]['id'] @@ -329,7 +345,7 @@ async def process_user_billing(ns={}): return { 'status': 'error', 'msg': '账户余额不足', - 'pricedifference': round(balance - amount, 2), + 'pricedifference': round(balance - amount, 12), } order_id = uuid() @@ -343,7 +359,7 @@ async def process_user_billing(ns={}): 'order_date': now_str, 'source': providername, 'amount': amount, - 'originalprice': round(originalprice, 2), + 'originalprice': round(originalprice, 12), 'ordertype': 'prepay', 'servicename': productname, } diff --git a/b/cntoai/sync_cn_ai_user.dspy b/b/cntoai/sync_cn_ai_user.dspy index 96e9181..f5ab279 100644 --- a/b/cntoai/sync_cn_ai_user.dspy +++ b/b/cntoai/sync_cn_ai_user.dspy @@ -19,9 +19,6 @@ async def sync_cn_ai_user(ns={}): email = user_info[0]['email'] debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}") - already_sync_user_key = '2i68AZ81di_q5f8AySDrJ' - already_sync_user_dappid = 'cndemo' - # 目标URL # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 domain = None @@ -36,6 +33,24 @@ async def sync_cn_ai_user(ns={}): 'status': False, 'msg': '未找到域名' } + already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'}) + if already_sync_user_key: + already_sync_user_key = already_sync_user_key[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户key") + return { + 'status': False, + 'msg': '未找到已同步用户key' + } + already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'}) + if already_sync_user_dappid: + already_sync_user_dappid = already_sync_user_dappid[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户dappid") + return { + 'status': False, + 'msg': '未找到已同步用户dappid' + } url = f"{domain}/rbac/usersync" diff --git a/b/customer/forgotPassword.dspy b/b/customer/forgotPassword.dspy new file mode 100644 index 0000000..b12e8b5 --- /dev/null +++ b/b/customer/forgotPassword.dspy @@ -0,0 +1,70 @@ +async def forgotPassword(ns): + """ + 忘记密码:校验短信验证码后重置密码。 + + 参数: + id (str) 用户ID(找回验证码接口返回的 userid) + password (str) 新密码 + codeid (str) 验证码ID + vcode (str) 验证码 + + 也可传 mobile 或 username 定位用户(未传 id 时)。 + """ + import re + import traceback + + if not ns.get('password'): + return {'status': False, 'msg': '新密码不能为空'} + if len(ns.get('password')) < 8 or not re.search(r'[a-zA-Z]', ns.get('password')) or not re.search(r'[0-9]', ns.get('password')): + return {'status': False, 'msg': '密码至少8位,包含大小写字母、特殊字符、数字'} + if not ns.get('codeid'): + return {'status': False, 'msg': '验证码ID不能为空'} + if not ns.get('vcode'): + return {'status': False, 'msg': '验证码不能为空'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')}) + if code: + create_at = code[0]['create_at'] + now = datetime.datetime.now() + create_at_dt = datetime.datetime.strptime(create_at, "%Y-%m-%d %H:%M:%S") + if (now - create_at_dt).seconds > 500: + return {'status': False, 'msg': '验证码过期'} + else: + return {'status': False, 'msg': '验证码不正确'} + + user = None + if ns.get('id'): + users = await sor.R('users', {'id': ns.get('id'), 'del_flg': '0'}) + if users: + user = users[0] + elif ns.get('mobile'): + users = await sor.R('users', {'mobile': ns.get('mobile'), 'del_flg': '0'}) + if users: + user = users[0] + elif ns.get('username'): + users = await sor.R('users', {'username': ns.get('username'), 'del_flg': '0'}) + if not users: + users = await sor.R('users', {'mobile': ns.get('username'), 'del_flg': '0'}) + if users: + user = users[0] + else: + return {'status': False, 'msg': '用户标识不能为空'} + + if not user: + return {'status': False, 'msg': '用户不存在'} + + new_password = password_encode(ns['password']) + update_sql = """UPDATE users SET password = '%s' WHERE id = '%s';""" % (new_password, user['id']) + await sor.sqlExe(update_sql, {}) + return {'status': True, 'msg': '密码重置成功'} + except Exception as error: + debug(f"forgotPassword 错误: {error}") + debug(f"forgotPassword 错误堆栈: {traceback.format_exc()}") + return {'status': False, 'msg': '密码重置失败, %s' % str(error)} + + +ret = await forgotPassword(params_kw) +return ret diff --git a/b/customer/registerUser.dspy b/b/customer/registerUser.dspy index 0118a5a..7fe3897 100644 --- a/b/customer/registerUser.dspy +++ b/b/customer/registerUser.dspy @@ -1,14 +1,40 @@ async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, email=None): import aiohttp - debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}") - already_sync_user_key = '2i68AZ81di_q5f8AySDrJ' - already_sync_user_dappid = 'cndemo' - - # 目标URL - url = "https://ai.atvoe.com/rbac/usersync" - # url = 'https://ai.atvoe.com/tmp/env.dspy' + # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + domain = None + db = DBPools() + async with db.sqlorContext('kboss') as sor: + domain = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain: + domain = domain[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到域名") + return { + 'status': False, + 'msg': '未找到域名' + } + already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'}) + if already_sync_user_key: + already_sync_user_key = already_sync_user_key[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户key") + return { + 'status': False, + 'msg': '未找到已同步用户key' + } + already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'}) + if already_sync_user_dappid: + already_sync_user_dappid = already_sync_user_dappid[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户dappid") + return { + 'status': False, + 'msg': '未找到已同步用户dappid' + } + + url = f"{domain}/rbac/usersync" # 请求头 headers = { @@ -82,7 +108,6 @@ async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, ema 'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}" } - async def registerUser(ns): """ 用户注册 diff --git a/b/user/mobilecode.dspy b/b/user/mobilecode.dspy index 5c23f9a..3bb2a31 100644 --- a/b/user/mobilecode.dspy +++ b/b/user/mobilecode.dspy @@ -112,6 +112,18 @@ async def mobilecode(ns): return {'status': False, 'msg': '发送失败'} else: return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'} + + # 忘记密码逻辑:检查手机号是否存在 + elif action_type == 'forgotpassword': + if len(userreacs) >= 1: + code = await generate_vcode() + nss = await send_vcode(userreacs[0]['mobile'], '用户注册登录验证', {'SMSvCode': code.get('vcode')}) + if nss['status']: + return {'status': True, 'msg': '验证码发送成功', 'codeid': code.get('id')} + else: + return {'status': False, 'msg': '发送失败'} + else: + return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'} # 原有逻辑:如果没有指定action_type,保持原有逻辑 else: From a0ff6ba2c5ef8007ec39f4eef973c8d85cb635dc Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Sat, 23 May 2026 09:27:03 +0800 Subject: [PATCH 3/5] update --- b/cntoai/chat.html | 727 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 727 insertions(+) create mode 100644 b/cntoai/chat.html diff --git a/b/cntoai/chat.html b/b/cntoai/chat.html new file mode 100644 index 0000000..9dae92e --- /dev/null +++ b/b/cntoai/chat.html @@ -0,0 +1,727 @@ + + + + + + 模型对话测试 · cntoai + + + +
+ + +
+
+ 测试说明 + 左侧可手动填写 api_urlapi_keyuserid 直接联调;未填 userid 时持久化接口会失败。Dspy 网关默认 https://dev.opencomputing.cn,路径为 /cntoai/*.dspy。须已执行 chat_tables.sql。 +
+ +
+
+ + +
+
+ + +
+ + +
+ +
+
+ +
+ + +
+
+ + Ctrl+Enter 发送 +
+ +
+
+
+
+ + + + From f60c761735a739f2bdc3732494d4d7c009fe6a0d Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Sat, 23 May 2026 11:26:30 +0800 Subject: [PATCH 4/5] update --- b/cntoai/chat_send.dspy | 5 +- b/cntoai/chat_session_list.dspy | 4 +- b/cntoai/llm_chat_completions.dspy | 7 +- b/cntoai/test_chat.py | 262 +++++++++++++++++++++++++++++ 4 files changed, 272 insertions(+), 6 deletions(-) create mode 100644 b/cntoai/test_chat.py diff --git a/b/cntoai/chat_send.dspy b/b/cntoai/chat_send.dspy index 0611dfb..ba93073 100644 --- a/b/cntoai/chat_send.dspy +++ b/b/cntoai/chat_send.dspy @@ -72,8 +72,9 @@ async def chat_send(ns={}): """ import json import traceback - - model = ns.get('model') + + # model = ns.get('model') + model = 'deepseek-v4-pro' if not model: return {'status': False, 'msg': 'model is required'} diff --git a/b/cntoai/chat_session_list.dspy b/b/cntoai/chat_session_list.dspy index c814ef6..e4bddfc 100644 --- a/b/cntoai/chat_session_list.dspy +++ b/b/cntoai/chat_session_list.dspy @@ -10,8 +10,8 @@ async def chat_session_list(ns={}): if not userid: return {'status': False, 'msg': '未找到用户'} - page_size = int(ns.get('page_size', 50)) - current_page = int(ns.get('current_page', 1)) + page_size = int(ns.get('page_size')) if ns.get('page_size') else 100 + current_page = int(ns.get('current_page')) if ns.get('current_page') else 1 offset = (current_page - 1) * page_size db = DBPools() diff --git a/b/cntoai/llm_chat_completions.dspy b/b/cntoai/llm_chat_completions.dspy index d910986..6303999 100644 --- a/b/cntoai/llm_chat_completions.dspy +++ b/b/cntoai/llm_chat_completions.dspy @@ -81,8 +81,11 @@ def build_user_content(ns): async def _resolve_chat_config(ns, sor): """解析 API 地址与 Bearer Token""" - api_url = ns.get('api_url') - api_key = ns.get('api_key') + api_url = 'https://api.deepseek.com/chat/completions' + api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909' + + # api_url = ns.get('api_url') + # api_key = ns.get('api_key') if not api_url and ns.get('model_id'): doc_rows = await sor.sqlExe( diff --git a/b/cntoai/test_chat.py b/b/cntoai/test_chat.py new file mode 100644 index 0000000..3432eac --- /dev/null +++ b/b/cntoai/test_chat.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +cntoai 对话相关接口联调测试(dev.opencomputing.cn) + +用法: + pip install requests + set CNTOAI_USERID=你的用户id + set CNTOAI_API_KEY=你的api_key + python test_chat.py + +可选环境变量: + CNTOAI_BASE_URL 默认 https://dev.opencomputing.cn + CNTOAI_MODEL 默认 qwen3.6-plus + CNTOAI_LLM_API_URL 默认 https://ai.atvoe.com/llmage/v1/chat/completions + CNTOAI_COOKIE 浏览器 Cookie(未传 userid 时用于鉴权) + +单测: + python test_chat.py --only models + python test_chat.py --only completions + python test_chat.py --only send + python test_chat.py --only session +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from typing import Any, Dict, Optional, Tuple + +try: + import requests +except ImportError: + print("请先安装依赖: pip install requests") + sys.exit(1) + + +BASE_URL = "https://dev.opencomputing.cn" +USERID = "hSqZuekZ1yKmhKmCN9UAK" +API_KEY = "sk-c22d6573e85a4d3fa8ab932386cf2909" +# API_URL = "https://ai.atvoe.com/llmage/v1/chat/completions" +API_URL = "https://api.deepseek.com/chat/completions" +# MODEL = "qwen3.6-plus" +MODEL = "deepseek-v4-pro" +COOKIE = "".strip() +TIMEOUT = int(120) + + +class ChatApiClient: + def __init__(self, base_url: str = BASE_URL): + self.base_url = base_url.rstrip("/") + self.session = requests.Session() + self.session.headers.update({"Accept": "application/json"}) + if COOKIE: + self.session.headers["Cookie"] = COOKIE + + def _url(self, path: str) -> str: + path = path if path.startswith("/") else f"/{path}" + return f"{self.base_url}{path}" + + def _auth(self, extra: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + params: Dict[str, Any] = {} + if USERID: + params["userid"] = USERID + if API_KEY: + params["api_key"] = API_KEY + if API_URL: + params["api_url"] = API_URL + if extra: + params.update(extra) + return params + + def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + resp = self.session.get(self._url(path), params=self._auth(params), timeout=TIMEOUT) + return self._parse(resp) + + def post(self, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + resp = self.session.post( + self._url(path), + json=self._auth(data), + headers={"Content-Type": "application/json"}, + timeout=TIMEOUT, + ) + return self._parse(resp) + + @staticmethod + def _parse(resp: requests.Response) -> Dict[str, Any]: + print(f" HTTP {resp.status_code} {resp.url[:120]}...") + try: + return resp.json() + except Exception: + return {"status": False, "msg": f"非 JSON 响应: {resp.text[:300]}"} + + +def ok(name: str, data: Dict[str, Any]) -> bool: + passed = data.get("status") is True + tag = "PASS" if passed else "FAIL" + print(f"\n[{tag}] {name}") + print(json.dumps(data, ensure_ascii=False, indent=2)[:2000]) + if not passed: + print(f" -> {data.get('msg', 'unknown error')}") + return passed + + +def test_model_list(client: ChatApiClient) -> bool: + print("\n=== GET /cntoai/model_management_customer_search.dspy ===") + data = client.get("/cntoai/model_management_customer_search.dspy", { + "page_size": 20, + "current_page": 1, + }) + if ok("模型列表", data) and data.get("data"): + models = data["data"].get("model_list") or [] + print(f" 共 {len(models)} 个模型") + if models: + m0 = models[0] + print(f" 首个: {m0.get('model_name')} / {m0.get('display_name')}") + return data.get("status") is True + + +def test_llm_chat_completions(client: ChatApiClient) -> bool: + print("\n=== POST /cntoai/llm_chat_completions.dspy ===") + data = client.post("/cntoai/llm_chat_completions.dspy", { + "model": MODEL, + "message": "用一句话介绍你自己", + "stream": True, + }) + if ok("直连模型", data) and data.get("data"): + print(f" 回复摘要: {(data['data'].get('reply') or '')[:200]}") + return data.get("status") is True + + +def test_chat_send( + client: ChatApiClient, + session_id: Optional[str] = None, +) -> Tuple[bool, Optional[str]]: + print("\n=== POST /cntoai/chat_send.dspy ===") + payload: Dict[str, Any] = { + "model": MODEL, + "message": ( + "你好,这是 test_chat.py 自动化测试" + if not session_id + else "继续,用一句话回复我" + ), + "stream": True, + } + if session_id: + payload["session_id"] = session_id + data = client.post("/cntoai/chat_send.dspy", payload) + if ok("发送消息", data) and data.get("data"): + sid = data["data"].get("session_id") + print(f" session_id: {sid}") + print(f" 回复摘要: {(data['data'].get('reply') or '')[:200]}") + return True, sid + return False, session_id + + +def test_chat_session_list(client: ChatApiClient) -> bool: + print("\n=== GET /cntoai/chat_session_list.dspy ===") + data = client.get("/cntoai/chat_session_list.dspy", {"page_size": 10}) + if ok("会话列表", data) and data.get("data"): + sessions = data["data"].get("sessions") or [] + print(f" 共 {data['data'].get('total_count', len(sessions))} 条会话") + for s in sessions[:3]: + print(f" - {s.get('id')} | {s.get('title')}") + return data.get("status") is True + + +def test_chat_session_messages(client: ChatApiClient, session_id: str) -> bool: + print("\n=== GET /cntoai/chat_session_messages.dspy ===") + data = client.get("/cntoai/chat_session_messages.dspy", {"session_id": session_id}) + if ok("会话消息", data) and data.get("data"): + msgs = data["data"].get("messages") or [] + print(f" 消息数: {len(msgs)}") + for m in msgs: + print(f" [{m.get('role')}] {str(m.get('content') or '')[:80]}") + return data.get("status") is True + + +def test_chat_session_delete(client: ChatApiClient, session_id: str) -> bool: + print("\n=== GET /cntoai/chat_session_delete.dspy ===") + data = client.get("/cntoai/chat_session_delete.dspy", {"session_id": session_id}) + return ok("删除会话", data) + + +def check_config(require_userid: bool = True) -> bool: + print("配置:") + print(f" BASE_URL = {BASE_URL}") + print(f" MODEL = {MODEL}") + print(f" API_URL = {API_URL or '(走服务端配置)'}") + print(f" USERID = {USERID or '(未设置)'}") + print(f" API_KEY = {'已设置' if API_KEY else '(未设置)'}") + print(f" COOKIE = {'已设置' if COOKIE else '(未设置)'}") + + if require_userid and not USERID and not COOKIE: + print("\n错误: 持久化接口需要 CNTOAI_USERID 或 CNTOAI_COOKIE") + return False + if not API_KEY: + print("\n警告: 未设置 CNTOAI_API_KEY,将依赖服务端 Key") + return True + + +def main() -> int: + parser = argparse.ArgumentParser(description="cntoai chat API 联调测试") + parser.add_argument( + "--only", + choices=["models", "completions", "send", "session", "delete", "all"], + default="all", + ) + parser.add_argument("--keep-session", action="store_true", help="不删除测试会话") + parser.add_argument("--base-url", default=BASE_URL) + args = parser.parse_args() + + client = ChatApiClient(base_url=args.base_url) + results = [] + session_id: Optional[str] = None + + if args.only in ("all", "models"): + results.append(("models", test_model_list(client))) + + if args.only in ("all", "completions"): + if check_config(require_userid=False): + results.append(("completions", test_llm_chat_completions(client))) + else: + results.append(("completions", False)) + + if args.only in ("all", "send", "session", "delete"): + if not check_config(require_userid=True): + return 1 + + if args.only in ("all", "send"): + passed, session_id = test_chat_send(client) + results.append(("send_1", passed)) + if passed and session_id: + time.sleep(1) + passed2, session_id = test_chat_send(client, session_id=session_id) + results.append(("send_2_multiturn", passed2)) + + if args.only in ("all", "session") and session_id: + results.append(("session_list", test_chat_session_list(client))) + results.append(("session_messages", test_chat_session_messages(client, session_id))) + elif args.only == "session": + results.append(("session_list", test_chat_session_list(client))) + + if args.only in ("all", "delete") and session_id and not args.keep_session: + results.append(("delete", test_chat_session_delete(client, session_id))) + elif session_id and args.keep_session: + print(f"\n保留测试会话: {session_id}") + + print("\n" + "=" * 50) + print("汇总:") + failed = sum(1 for _, p in results if not p) + for name, passed in results: + print(f" {'OK' if passed else 'FAIL'} {name}") + print("=" * 50) + return 1 if failed else 0 + + +if __name__ == "__main__": + sys.exit(main()) From 7bb029a66e21ad850a5909d7bc766d94f3333eb2 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Sat, 23 May 2026 14:22:30 +0800 Subject: [PATCH 5/5] update --- b/cntoai/chat_send.dspy | 26 +- b/cntoai/chat_send_stream.dspy | 311 ++++++++++++++++++++++ b/cntoai/llm_chat_completions.dspy | 57 +++- b/cntoai/llm_chat_completions_stream.dspy | 241 +++++++++++++++++ b/cntoai/test_demo.py | 247 +++++++++++++++++ 5 files changed, 869 insertions(+), 13 deletions(-) create mode 100644 b/cntoai/chat_send_stream.dspy create mode 100644 b/cntoai/llm_chat_completions_stream.dspy create mode 100644 b/cntoai/test_demo.py diff --git a/b/cntoai/chat_send.dspy b/b/cntoai/chat_send.dspy index ba93073..60059c1 100644 --- a/b/cntoai/chat_send.dspy +++ b/b/cntoai/chat_send.dspy @@ -4,6 +4,14 @@ def _escape(value): return str(value).replace("'", "''") +def _parse_bool(value, default=True): + if value is None or value == '': + return default + if isinstance(value, bool): + return value + return str(value).lower() in ('1', 'true', 'yes', 'on') + + def _title_from_message(ns): text = ns.get('message') or ns.get('text') or '' text = str(text).strip().replace('\n', ' ') @@ -68,11 +76,15 @@ async def chat_send(ns={}): 发送消息并保存多轮对话(需先执行 chat_tables.sql)。 参数:model, message, stream(默认true), session_id, - image_url, image_base64, document_url, document_text + image_url, image_base64, document_url, document_text, + with_chunks(true时返回上游 SSE 分片列表,便于确认流式) + + 说明:本接口(chat_send.dspy)为 JSON 一次性返回。 + 需要浏览器端实时流式请调用 chat_send_stream.dspy(SSE)。 """ import json import traceback - + # model = ns.get('model') model = 'deepseek-v4-pro' if not model: @@ -115,9 +127,7 @@ async def chat_send(ns={}): }) history = await _load_session_messages(sor, session_id) - stream_val = ns.get('stream', True) - if isinstance(stream_val, str): - stream_val = stream_val.lower() in ('1', 'true', 'yes', 'on') + stream_val = _parse_bool(ns.get('stream'), True) chat_result = await path_call('llm_chat_completions.dspy', { 'model': model, 'messages': history, @@ -126,11 +136,14 @@ async def chat_send(ns={}): 'api_url': ns.get('api_url'), 'api_key': ns.get('api_key'), 'model_id': ns.get('model_id'), + 'with_chunks': ns.get('with_chunks', True), }) if not chat_result.get('status'): return chat_result reply = chat_result['data']['reply'] + chunks = chat_result['data'].get('chunks') or [] + chunk_count = chat_result['data'].get('chunk_count', 0) await sor.C('chat_message', { 'id': uuid(), 'session_id': session_id, @@ -151,6 +164,9 @@ async def chat_send(ns={}): 'session_id': session_id, 'reply': reply, 'model': model, + 'stream': stream_val, + 'chunk_count': chunk_count, + 'chunks': chunks if ns.get('with_chunks', True) else None, }, } except Exception: diff --git a/b/cntoai/chat_send_stream.dspy b/b/cntoai/chat_send_stream.dspy new file mode 100644 index 0000000..b98d116 --- /dev/null +++ b/b/cntoai/chat_send_stream.dspy @@ -0,0 +1,311 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _parse_bool(value, default=True): + if value is None or value == '': + return default + if isinstance(value, bool): + return value + return str(value).lower() in ('1', 'true', 'yes', 'on') + + +def _title_from_message(ns): + text = ns.get('message') or ns.get('text') or '' + text = str(text).strip().replace('\n', ' ') + if not text: + return '新对话' + return text[:30] + ('...' if len(text) > 30 else '') + + +def _build_user_content(ns): + text_parts = [] + if ns.get('message'): + text_parts.append(str(ns.get('message'))) + if ns.get('text'): + text_parts.append(str(ns.get('text'))) + if ns.get('document_text'): + text_parts.append(str(ns.get('document_text'))) + + parts = [] + merged_text = '\n'.join([p for p in text_parts if p]).strip() + if merged_text: + parts.append({'type': 'text', 'text': merged_text}) + if ns.get('image_url'): + parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}}) + if ns.get('image_base64'): + mime = ns.get('image_mime') or 'image/jpeg' + b64 = ns.get('image_base64') + if not str(b64).startswith('data:'): + b64 = 'data:%s;base64,%s' % (mime, b64) + parts.append({'type': 'image_url', 'image_url': {'url': b64}}) + if ns.get('document_url'): + parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}}) + if not parts: + return '' + if len(parts) == 1 and parts[0]['type'] == 'text': + return parts[0]['text'] + return parts + + +async def _load_session_messages(sor, session_id): + sql = """ + SELECT role, content, content_type + FROM chat_message + WHERE session_id = '%s' + ORDER BY created_at ASC; + """ % _escape(session_id) + rows = await sor.sqlExe(sql, {}) + messages = [] + for row in rows: + content = row.get('content') or '' + if row.get('content_type') == 'mixed': + import json + try: + content = json.loads(content) + except Exception: + pass + messages.append({'role': row['role'], 'content': content}) + return messages + + +async def _resolve_chat_config(ns, sor): + # api_url = ns.get('api_url') + # api_key = ns.get('api_key') + api_url = 'https://api.deepseek.com/chat/completions' + api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909' + if not api_url and ns.get('model_id'): + doc_rows = await sor.sqlExe( + "SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;" + % _escape(ns.get('model_id')), + {}, + ) + if doc_rows and doc_rows[0].get('api_url'): + api_url = doc_rows[0]['api_url'] + if not str(api_url).endswith('/chat/completions'): + api_url = str(api_url).rstrip('/') + '/chat/completions' + if not api_url: + param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'}) + if param_rows: + api_url = param_rows[0]['pvalue'] + else: + domain_rows = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain_rows: + api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions' + else: + api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions' + if not api_key: + userid = ns.get('userid') or await get_user() + if userid: + action = ns.get('apikey_action') or 'user_self_create' + keys = await sor.R('user_api_keys', {'userid': userid, 'action': action}) + if not keys: + keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'}) + if keys: + api_key = keys[0].get('opc_apikey') + if not api_key: + key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'}) + if key_rows: + api_key = key_rows[0]['pvalue'] + return api_url, api_key + + +def _extract_stream_piece(payload): + choice = (payload.get('choices') or [{}])[0] + delta = choice.get('delta') or {} + message = choice.get('message') or {} + piece = ( + delta.get('content') + or delta.get('reasoning_content') + or message.get('content') + or choice.get('text') + or payload.get('content') + or '' + ) + if piece is None: + return '' + return str(piece) + + +def _sse_event(obj): + import json + return 'data: %s\n\n' % json.dumps(obj, ensure_ascii=False) + + +async def _iter_upstream_stream(api_url, api_key, payload): + import aiohttp + import json + + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer %s' % api_key, + } + payload = dict(payload) + payload['stream'] = True + + buffer = '' + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600)) as session: + async with session.post(api_url, headers=headers, json=payload) as response: + if response.status != 200: + err_text = await response.text() + yield {'type': 'error', 'msg': 'HTTP %s: %s' % (response.status, err_text[:500])} + return + + async for raw in response.content: + buffer += raw.decode('utf-8', errors='ignore') + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() + if not line or line.startswith(':') or not line.startswith('data:'): + continue + data = line[5:].strip() + if data == '[DONE]': + return + try: + payload_obj = json.loads(data) + piece = _extract_stream_piece(payload_obj) + if piece: + yield {'type': 'content', 'content': piece} + except Exception: + continue + + tail = buffer.strip() + if tail: + try: + body = json.loads(tail) + choice = (body.get('choices') or [{}])[0] + msg = choice.get('message') or {} + piece = msg.get('content') or choice.get('text') or '' + if piece: + yield {'type': 'content', 'content': str(piece)} + except Exception: + pass + + +async def inference_generator(request, params_kw=None, **kw): + """ + 流式 chat_send:先存 user 消息,SSE 推送 assistant 片段,结束后存库。 + + SSE 事件: + {"type":"meta","session_id":"...","model":"..."} + {"type":"content","content":"片段"} + {"type":"done","session_id":"...","reply":"完整文本","model":"..."} + {"type":"error","msg":"..."} + """ + import json + import traceback + + ns = params_kw or {} + # model = ns.get('model') + model = 'deepseek-v4-pro' + if not model: + yield _sse_event({'type': 'error', 'msg': 'model is required'}) + yield 'data: [DONE]\n\n' + return + + userid = ns.get('userid') or await get_user() + if not userid: + yield _sse_event({'type': 'error', 'msg': '未找到用户'}) + yield 'data: [DONE]\n\n' + return + + user_content = _build_user_content(ns) + if not user_content: + yield _sse_event({'type': 'error', 'msg': '请输入文本,或提供图片/文档参数'}) + yield 'data: [DONE]\n\n' + return + + content_type = 'mixed' if isinstance(user_content, list) else 'text' + store_content = json.dumps(user_content, ensure_ascii=False) if content_type == 'mixed' else str(user_content) + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + session_id = ns.get('session_id') + if not session_id: + session_id = uuid() + await sor.C('chat_session', { + 'id': session_id, + 'userid': userid, + 'model': model, + 'title': _title_from_message(ns), + }) + else: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + yield _sse_event({'type': 'error', 'msg': '会话不存在'}) + yield 'data: [DONE]\n\n' + return + + await sor.C('chat_message', { + 'id': uuid(), + 'session_id': session_id, + 'role': 'user', + 'content': store_content, + 'content_type': content_type, + }) + + history = await _load_session_messages(sor, session_id) + api_url, api_key = await _resolve_chat_config(ns, sor) + if not api_key: + yield _sse_event({'type': 'error', 'msg': '未找到 API Key'}) + yield 'data: [DONE]\n\n' + return + + yield _sse_event({ + 'type': 'meta', + 'session_id': session_id, + 'model': model, + 'stream': True, + }) + + parts = [] + async for evt in _iter_upstream_stream(api_url, api_key, { + 'model': model, + 'messages': history, + }): + if evt.get('type') == 'error': + yield _sse_event(evt) + yield 'data: [DONE]\n\n' + return + if evt.get('type') == 'content': + parts.append(evt['content']) + yield _sse_event(evt) + + reply = ''.join(parts) + await sor.C('chat_message', { + 'id': uuid(), + 'session_id': session_id, + 'role': 'assistant', + 'content': reply, + 'content_type': 'text', + }) + await sor.sqlExe( + "UPDATE chat_session SET updated_at = NOW() WHERE id = '%s';" + % _escape(session_id), + {}, + ) + + yield _sse_event({ + 'type': 'done', + 'session_id': session_id, + 'reply': reply, + 'model': model, + }) + yield 'data: [DONE]\n\n' + except Exception: + yield _sse_event({'type': 'error', 'msg': traceback.format_exc()}) + yield 'data: [DONE]\n\n' + + +async def inference(request, *args, params_kw=None, **kw): + from functools import partial + env = request._run_ns.copy() + f = partial(inference_generator, request, params_kw=params_kw, **kw) + return await env.stream_response(request, f, content_type='text/event-stream') + + +ret = await inference(request, params_kw=params_kw) +return ret diff --git a/b/cntoai/llm_chat_completions.dspy b/b/cntoai/llm_chat_completions.dspy index 6303999..980be9a 100644 --- a/b/cntoai/llm_chat_completions.dspy +++ b/b/cntoai/llm_chat_completions.dspy @@ -81,6 +81,7 @@ def build_user_content(ns): async def _resolve_chat_config(ns, sor): """解析 API 地址与 Bearer Token""" + api_url = 'https://api.deepseek.com/chat/completions' api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909' @@ -126,8 +127,26 @@ async def _resolve_chat_config(ns, sor): return api_url, api_key +def _extract_stream_piece(payload): + """从 SSE chunk 中提取文本(兼容 OpenAI / Qwen 等格式)""" + choice = (payload.get('choices') or [{}])[0] + delta = choice.get('delta') or {} + message = choice.get('message') or {} + piece = ( + delta.get('content') + or delta.get('reasoning_content') + or message.get('content') + or choice.get('text') + or payload.get('content') + or '' + ) + if piece is None: + return '' + return str(piece) + + async def _read_stream_response(response): - """解析 SSE 流式响应,汇总 assistant 文本""" + """解析 SSE 流式响应;若上游未按 SSE 返回则回退解析整段 JSON""" import json chunks = [] buffer = '' @@ -136,21 +155,38 @@ async def _read_stream_response(response): while '\n' in buffer: line, buffer = buffer.split('\n', 1) line = line.strip() + if not line or line.startswith(':'): + continue if not line.startswith('data:'): continue data = line[5:].strip() if data == '[DONE]': - return ''.join(chunks) + return ''.join(chunks), chunks try: payload = json.loads(data) - choice = (payload.get('choices') or [{}])[0] - delta = choice.get('delta') or {} - piece = delta.get('content') or '' + piece = _extract_stream_piece(payload) if piece: chunks.append(piece) except Exception: continue - return ''.join(chunks) + + reply = ''.join(chunks) + if reply: + return reply, chunks + + # 上游可能忽略 stream=true,直接返回完整 JSON + tail = buffer.strip() + if tail: + try: + body = json.loads(tail) + choice = (body.get('choices') or [{}])[0] + msg = choice.get('message') or {} + reply = msg.get('content') or choice.get('text') or '' + if reply: + return str(reply), [str(reply)] + except Exception: + pass + return reply, chunks async def llm_chat_completions(ns={}): @@ -204,7 +240,9 @@ async def llm_chat_completions(ns={}): 'Authorization': 'Bearer %s' % api_key, } - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=600), + ) as session: async with session.post(api_url, headers=headers, json=payload) as response: if response.status != 200: err_text = await response.text() @@ -213,8 +251,9 @@ async def llm_chat_completions(ns={}): 'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]), } + stream_chunks = [] if stream: - reply = await _read_stream_response(response) + reply, stream_chunks = await _read_stream_response(response) usage = {} else: body = await response.json() @@ -232,6 +271,8 @@ async def llm_chat_completions(ns={}): 'messages': messages + [{'role': 'assistant', 'content': reply}], 'usage': usage, 'stream': stream, + 'chunk_count': len(stream_chunks), + 'chunks': stream_chunks if ns.get('with_chunks') else None, }, } except Exception: diff --git a/b/cntoai/llm_chat_completions_stream.dspy b/b/cntoai/llm_chat_completions_stream.dspy new file mode 100644 index 0000000..24600ff --- /dev/null +++ b/b/cntoai/llm_chat_completions_stream.dspy @@ -0,0 +1,241 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _parse_bool(value, default=True): + if value is None or value == '': + return default + if isinstance(value, bool): + return value + return str(value).lower() in ('1', 'true', 'yes', 'on') + + +def _parse_messages(ns): + raw = ns.get('messages') + if not raw: + return [] + if isinstance(raw, list): + return raw + if isinstance(raw, str): + import json + try: + return json.loads(raw) + except Exception: + return [] + return [] + + +def build_user_content(ns): + text_parts = [] + if ns.get('message'): + text_parts.append(str(ns.get('message'))) + if ns.get('text'): + text_parts.append(str(ns.get('text'))) + if ns.get('document_text'): + text_parts.append(str(ns.get('document_text'))) + + parts = [] + merged_text = '\n'.join([p for p in text_parts if p]).strip() + if merged_text: + parts.append({'type': 'text', 'text': merged_text}) + if ns.get('image_url'): + parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}}) + if ns.get('image_base64'): + mime = ns.get('image_mime') or 'image/jpeg' + b64 = ns.get('image_base64') + if not str(b64).startswith('data:'): + b64 = 'data:%s;base64,%s' % (mime, b64) + parts.append({'type': 'image_url', 'image_url': {'url': b64}}) + if ns.get('document_url'): + parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}}) + if not parts: + return '' + if len(parts) == 1 and parts[0]['type'] == 'text': + return parts[0]['text'] + return parts + + +async def _resolve_chat_config(ns, sor): + api_url = ns.get('api_url') + api_key = ns.get('api_key') + if not api_url and ns.get('model_id'): + doc_rows = await sor.sqlExe( + "SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;" + % _escape(ns.get('model_id')), + {}, + ) + if doc_rows and doc_rows[0].get('api_url'): + api_url = doc_rows[0]['api_url'] + if not str(api_url).endswith('/chat/completions'): + api_url = str(api_url).rstrip('/') + '/chat/completions' + if not api_url: + param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'}) + if param_rows: + api_url = param_rows[0]['pvalue'] + else: + domain_rows = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain_rows: + api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions' + else: + api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions' + if not api_key: + userid = ns.get('userid') or await get_user() + if userid: + action = ns.get('apikey_action') or 'user_self_create' + keys = await sor.R('user_api_keys', {'userid': userid, 'action': action}) + if not keys: + keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'}) + if keys: + api_key = keys[0].get('opc_apikey') + if not api_key: + key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'}) + if key_rows: + api_key = key_rows[0]['pvalue'] + return api_url, api_key + + +def _extract_stream_piece(payload): + choice = (payload.get('choices') or [{}])[0] + delta = choice.get('delta') or {} + message = choice.get('message') or {} + piece = ( + delta.get('content') + or delta.get('reasoning_content') + or message.get('content') + or choice.get('text') + or payload.get('content') + or '' + ) + if piece is None: + return '' + return str(piece) + + +def _sse_event(obj): + import json + return 'data: %s\n\n' % json.dumps(obj, ensure_ascii=False) + + +async def _iter_upstream_stream(api_url, api_key, payload): + """向上游发起流式请求,逐片 yield 文本""" + import aiohttp + import json + + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer %s' % api_key, + } + payload = dict(payload) + payload['stream'] = True + + buffer = '' + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600)) as session: + async with session.post(api_url, headers=headers, json=payload) as response: + if response.status != 200: + err_text = await response.text() + yield {'type': 'error', 'msg': 'HTTP %s: %s' % (response.status, err_text[:500])} + return + + async for raw in response.content: + buffer += raw.decode('utf-8', errors='ignore') + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() + if not line or line.startswith(':') or not line.startswith('data:'): + continue + data = line[5:].strip() + if data == '[DONE]': + return + try: + payload_obj = json.loads(data) + piece = _extract_stream_piece(payload_obj) + if piece: + yield {'type': 'content', 'content': piece} + except Exception: + continue + + tail = buffer.strip() + if tail: + try: + body = json.loads(tail) + choice = (body.get('choices') or [{}])[0] + msg = choice.get('message') or {} + piece = msg.get('content') or choice.get('text') or '' + if piece: + yield {'type': 'content', 'content': str(piece)} + except Exception: + pass + + +async def inference_generator(request, params_kw=None, **kw): + """ + SSE 流式输出,事件格式: + {"type":"meta","model":"..."} + {"type":"content","content":"片段"} + {"type":"done","reply":"完整文本"} + {"type":"error","msg":"..."} + 结束:data: [DONE] + """ + import traceback + + ns = params_kw or {} + model = ns.get('model') + if not model: + yield _sse_event({'type': 'error', 'msg': 'model is required'}) + yield 'data: [DONE]\n\n' + return + + history = _parse_messages(ns) + user_content = build_user_content(ns) + if not user_content and not history: + yield _sse_event({'type': 'error', 'msg': 'message is required'}) + yield 'data: [DONE]\n\n' + return + + messages = list(history) + if user_content: + messages.append({'role': 'user', 'content': user_content}) + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + api_url, api_key = await _resolve_chat_config(ns, sor) + if not api_key: + yield _sse_event({'type': 'error', 'msg': '未找到 API Key'}) + yield 'data: [DONE]\n\n' + return + + yield _sse_event({'type': 'meta', 'model': model, 'stream': True}) + + parts = [] + async for evt in _iter_upstream_stream(api_url, api_key, { + 'model': model, + 'messages': messages, + }): + if evt.get('type') == 'error': + yield _sse_event(evt) + yield 'data: [DONE]\n\n' + return + if evt.get('type') == 'content': + parts.append(evt['content']) + yield _sse_event(evt) + + reply = ''.join(parts) + yield _sse_event({'type': 'done', 'reply': reply, 'model': model}) + yield 'data: [DONE]\n\n' + except Exception: + yield _sse_event({'type': 'error', 'msg': traceback.format_exc()}) + yield 'data: [DONE]\n\n' + + +async def inference(request, *args, params_kw=None, **kw): + from functools import partial + env = request._run_ns.copy() + f = partial(inference_generator, request, params_kw=params_kw, **kw) + return await env.stream_response(request, f, content_type='text/event-stream') + + +ret = await inference(request, params_kw=params_kw) +return ret diff --git a/b/cntoai/test_demo.py b/b/cntoai/test_demo.py new file mode 100644 index 0000000..b04e887 --- /dev/null +++ b/b/cntoai/test_demo.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +chat_send_stream.dspy SSE 流式接口测试 + +用法: + pip install requests + python test_demo.py + +环境变量(可选,覆盖下方默认值): + CNTOAI_BASE_URL / CNTOAI_USERID / CNTOAI_API_KEY + CNTOAI_MODEL / CNTOAI_LLM_API_URL / CNTOAI_MESSAGE +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from typing import Any, Dict, Generator, List, Optional + +try: + import requests +except ImportError: + print("请先安装: pip install requests") + sys.exit(1) + + +BASE_URL = os.environ.get("CNTOAI_BASE_URL", "https://dev.opencomputing.cn").rstrip("/") +USERID = os.environ.get("CNTOAI_USERID", "hSqZuekZ1yKmhKmCN9UAK").strip() +API_KEY = os.environ.get("CNTOAI_API_KEY", "sk-c22d6573e85a4d3fa8ab932386cf2909").strip() +API_URL = os.environ.get("CNTOAI_LLM_API_URL", "https://api.deepseek.com/v1/chat/completions").strip() +MODEL = os.environ.get("CNTOAI_MODEL", "deepseek-chat").strip() +MESSAGE = os.environ.get("CNTOAI_MESSAGE", "你好,请用三句话介绍你自己").strip() +TIMEOUT = int(os.environ.get("CNTOAI_TIMEOUT", "300")) + +STREAM_PATH = "/cntoai/chat_send_stream.dspy" + + +def build_payload(session_id: Optional[str] = None, message: Optional[str] = None) -> Dict[str, Any]: + payload: Dict[str, Any] = { + "model": MODEL, + "message": message or MESSAGE, + "userid": USERID, + "api_key": API_KEY, + "api_url": API_URL, + } + if session_id: + payload["session_id"] = session_id + return payload + + +def parse_sse_text(text: str) -> List[Dict[str, Any]]: + events: List[Dict[str, Any]] = [] + for line in text.splitlines(): + line = line.strip() + if not line.startswith("data:"): + continue + data = line[5:].strip() + if data == "[DONE]": + break + try: + events.append(json.loads(data)) + except json.JSONDecodeError: + print(f"[warn] 无法解析: {line[:200]}") + return events + + +def parse_sse_stream(response: requests.Response) -> Generator[Dict[str, Any], None, None]: + """ + 按字节缓冲解析 SSE。 + 勿用 iter_lines(decode_unicode=True):TCP 分块可能截断 UTF-8 多字节字符,导致乱码和 JSON 解析失败。 + """ + buffer = b"" + for chunk in response.iter_content(chunk_size=4096): + if not chunk: + continue + buffer += chunk + while b"\n" in buffer: + line_bytes, buffer = buffer.split(b"\n", 1) + if not line_bytes.strip(): + continue + line = line_bytes.decode("utf-8").strip() + if not line.startswith("data:"): + continue + data = line[5:].strip() + if data == "[DONE]": + return + try: + yield json.loads(data) + except json.JSONDecodeError: + print(f"\n[warn] JSON 解析失败: {line[:120]}...") + + tail = buffer.strip() + if tail: + line = tail.decode("utf-8", errors="replace").strip() + if line.startswith("data:"): + data = line[5:].strip() + if data and data != "[DONE]": + try: + yield json.loads(data) + except json.JSONDecodeError: + pass + + +def diagnose_empty_response(resp: requests.Response) -> None: + ctype = resp.headers.get("Content-Type", "") + body = resp.content or b"" + print("\n[诊断] 响应体为空或无可解析 SSE") + print(f" Content-Type : {ctype}") + print(f" body 长度 : {len(body)}") + if body: + print(f" body 前 500B : {body[:500]!r}") + if "text/html" in ctype and len(body) == 0: + print("\n 可能原因: chat_send_stream.dspy 未执行 inference 入口。") + print(" 请确认文件末尾包含:") + print(" ret = await inference(request, params_kw=params_kw)") + print(" return ret") + print(" 并重新部署到 dev 后再测。") + + +def test_chat_send_stream(session_id: Optional[str] = None, message: Optional[str] = None) -> Optional[str]: + url = BASE_URL + STREAM_PATH + payload = build_payload(session_id=session_id, message=message) + + print("=" * 60) + print("chat_send_stream.dspy 流式测试") + print(f" URL : {url}") + print(f" MODEL : {MODEL}") + print(f" USERID : {USERID}") + print(f" API_URL : {API_URL}") + print(f" message : {payload.get('message')}") + if session_id: + print(f" session : {session_id}") + print("=" * 60) + + if not USERID: + print("错误: 请设置 CNTOAI_USERID") + return None + + resp = requests.post( + url, + json=payload, + headers={ + "Accept": "text/event-stream", + "Content-Type": "application/json", + }, + stream=True, + timeout=TIMEOUT, + ) + + ctype = resp.headers.get("Content-Type", "") + print(f"\nHTTP {resp.status_code} Content-Type: {ctype}\n") + + if resp.status_code != 200: + print(resp.text[:500]) + return None + + if "text/event-stream" not in ctype: + raw = resp.content + diagnose_empty_response(resp) + if raw: + for evt in parse_sse_text(raw.decode("utf-8", errors="ignore")): + print("[parsed]", evt) + return None + + session_out: Optional[str] = session_id + full_reply: List[str] = [] + has_content = False + event_count = 0 + + print("--- 流式输出 ---") + for evt in parse_sse_stream(resp): + event_count += 1 + etype = evt.get("type") + + if etype == "meta": + session_out = evt.get("session_id") or session_out + print(f"[meta] session_id={session_out} model={evt.get('model')}") + continue + + if etype == "content": + piece = evt.get("content") or "" + has_content = True + full_reply.append(piece) + print(piece, end="", flush=True) + continue + + if etype == "done": + session_out = evt.get("session_id") or session_out + reply = evt.get("reply") or "" + print(f"\n\n[done] session_id={session_out}") + print(f"[done] reply 长度={len(reply)}") + if reply and not has_content: + print(reply) + continue + + if etype == "error": + print(f"\n[error] {evt.get('msg')}") + return session_out + + print(f"\n[unknown] {evt}") + + print("\n--- 结束 ---") + if event_count == 0: + diagnose_empty_response(resp) + elif full_reply: + joined = "".join(full_reply) + print(f"拼接回复({len(joined)}字): {joined[:300]}...") + return session_out + + +def main() -> int: + if sys.platform == "win32": + try: + sys.stdout.reconfigure(encoding="utf-8") + except Exception: + pass + + parser = argparse.ArgumentParser(description="chat_send_stream.dspy SSE 测试") + parser.add_argument("--session-id", help="续聊会话 ID") + parser.add_argument("--message", "-m", help="覆盖默认 message") + parser.add_argument("--twice", action="store_true", help="同一会话连发两条") + args = parser.parse_args() + + sid = test_chat_send_stream(session_id=args.session_id, message=args.message) + if sid is None: + return 1 + + if args.twice and sid: + print("\n" + "=" * 60) + print("第二轮(多轮续聊)") + sid2 = test_chat_send_stream( + session_id=sid, + message=args.message or "继续,用一句话总结上面内容", + ) + if sid2 is None: + return 1 + + if sid: + print(f"\n提示: 续聊 python test_demo.py --session-id {sid}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())