diff --git a/b/cntoai/create_model_apikey.dspy b/b/cntoai/create_model_apikey.dspy index 5dd88ed..5068403 100644 --- a/b/cntoai/create_model_apikey.dspy +++ b/b/cntoai/create_model_apikey.dspy @@ -22,10 +22,22 @@ async def create_model_apikey(ns={}): already_sync_user_key = records[0]['opc_apikey'] already_sync_user_appid = records[0]['appid'] + + # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + db = DBPools() + async with db.sqlorContext('kboss') as sor: + domain = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain: + domain = domain[0]['pvalue'] + else: + debug(f"create_model_apikey未找到域名") + return { + 'status': False, + 'msg': '未找到域名' + } # 目标URL - base_url = 'https://ai.atvoe.com' - url = f"{base_url}/dapi/apply_apikey.dspy" + url = f"{domain}/dapi/apply_apikey.dspy" # 请求头 headers = { @@ -35,10 +47,11 @@ async def create_model_apikey(ns={}): # 请求体数据 payload = { - "appname": "cn_ai_user", - "description": "", + "appname": ns.get('appname'), + "description": ns.get('description'), } + # 正常返回的是 {'status': 'ok', 'data': {'id': 'HlEQmcbCA1dX0qjhffA_K', 'name': 'cn_ai_user', 'description': '', 'secretkey': 'QUZVcXg5V1p1STMybG5Ia4r9NHBpkeRw558aATmohvZ7GYptvg==', 'allowedips': None, 'orgid': 'KHtWKY2LENTU4hYYim1Ks'}} try: # 创建一个异步会话 result_sysnc = None @@ -49,7 +62,7 @@ async def create_model_apikey(ns={}): debug(f"create_model_apikey状态码: {response.status}") result_sysnc = await response.json() - if not result_sysnc.get('status') == 'success': + if not result_sysnc.get('status') == 'ok': debug(f"create_model_apikey创建模型apikey失败: {result_sysnc}") return { 'status': False, @@ -60,24 +73,22 @@ async def create_model_apikey(ns={}): async with db.sqlorContext('kboss') as sor: # user_api_keys表格 userid/opc_apikey # 首先判断apikey是否存在 - apikey = result_sysnc['data'][0].get('apikey') - appid = result_sysnc['data'][0].get('appid') - secretkey = result_sysnc['data'][0].get('secretkey') + remote_table_id = result_sysnc['data'].get('id') + name = result_sysnc['data'].get('name') + secretkey = result_sysnc['data'].get('secretkey') await sor.C('user_api_keys', { - 'userid': userid, - 'opc_apikey': apikey, - 'appid': appid, + 'userid': ns['userid'], + 'remote_table_id': remote_table_id, + 'name': name, + 'opc_apikey': 1, 'secretkey': secretkey, 'action': 'user_self_create', - 'expire_time': None, }) - - debug(f"sync_cn_ai_user用户{payload['user']['id']}同步成功") - return { - 'status': True, - 'msg': '用户同步成功' - } + return { + 'status': True, + 'msg': '创建模型apikey成功' + } except Exception as e: debug(f"sync_cn_ai_user{userid}同步用户失败: {e}") diff --git a/b/cntoai/get_model_apikey.dspy b/b/cntoai/get_model_apikey.dspy index 5fea137..30ba719 100644 --- a/b/cntoai/get_model_apikey.dspy +++ b/b/cntoai/get_model_apikey.dspy @@ -23,9 +23,21 @@ async def get_model_apikey(ns={}): already_sync_user_key = records[0]['opc_apikey'] already_sync_user_appid = records[0]['appid'] + # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + db = DBPools() + async with db.sqlorContext('kboss') as sor: + domain = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain: + domain = domain[0]['pvalue'] + else: + debug(f"get_model_apikey未找到域名") + return { + 'status': False, + 'msg': '未找到域名' + } + # 目标URL - base_url = 'https://ai.atvoe.com' - url = f"{base_url}/dapi/apply_apikey.dspy" + url = f"{domain}/dapi/downapps.dspy" # 请求头 headers = { @@ -43,7 +55,7 @@ async def get_model_apikey(ns={}): debug(f"get_model_apikey状态码: {response.status}") result_sysnc = await response.json() - if not result_sysnc.get('status') == 'success': + if not result_sysnc.get('status') == 'ok': debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}") return { 'status': False, @@ -54,30 +66,33 @@ async def get_model_apikey(ns={}): async with db.sqlorContext('kboss') as sor: # user_api_keys表格 userid/opc_apikey # 首先判断apikey是否存在 - apikey = result_sysnc['data'][0].get('apikey') - appid = result_sysnc['data'][0].get('appid') - secretkey = result_sysnc['data'][0].get('secretkey') + apikeys = result_sysnc['data']['apikeys'] + # 遍历apikeys,如果apikey不存在,则创建, 如果存在则做更新 根据userid和remote_table_id判断 + for apikey_item in apikeys: + remote_table_id = apikey_item.get('id') + name = '' if not apikey_item.get('name') else apikey_item.get('name') + apikeyid = apikey_item.get('apikeyid') + exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id}) + if exist_record: + update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'" + await sor.sqlExe(update_sql, {}) + else: + await sor.C('user_api_keys', { + 'userid': ns['userid'], + 'remote_table_id': remote_table_id, + 'name': name, + 'opc_apikey': apikeyid, + 'action': 'user_self_create', + }) - await sor.C('user_api_keys', { - 'userid': userid, - 'opc_apikey': apikey, - 'appid': appid, - 'secretkey': secretkey, - 'action': 'user_self_create', - 'expire_time': None, - }) - - debug(f"sync_cn_ai_user用户{payload['user']['id']}同步成功") - return { - 'status': True, - 'msg': '用户同步成功' - } + result_sysnc['status'] = True + return result_sysnc except Exception as e: - debug(f"sync_cn_ai_user{userid}同步用户失败: {e}") + debug(f"get_model_apikey获取模型apikey失败: {e}") return { 'status': False, - 'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}" + 'msg': f"get_model_apikey获取模型apikey失败: {e}" } diff --git a/b/cntoai/get_user_balance.dspy b/b/cntoai/get_user_balance.dspy index dfc3d55..16ed070 100644 --- a/b/cntoai/get_user_balance.dspy +++ b/b/cntoai/get_user_balance.dspy @@ -5,32 +5,33 @@ async def get_user_balance(ns={}): :param userid: 用户 ID :return: 账户余额(与 getCustomerBalance 返回值一致) """ + debug(ns) apikey = ns.get('apikey') userid = ns.get('userid') db = DBPools() async with db.sqlorContext('kboss') as sor: if not apikey: return { - 'status': False, + 'status': 'error', 'msg': 'apikey is required' } userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) if not userid_li: return { - 'status': False, + 'status': 'error', 'msg': 'apikey无效,请联系管理员' } # userid = userid_li[0]['userid'] user = await sor.R('users', {'id': userid}) if not user: return { - 'status': False, + 'status': 'error', 'msg': '用户不存在' } orgid = await sor.R('organization', {'id': user[0]['orgid']}) balance = await getCustomerBalance(sor, orgid[0]['id']) return { - 'status': True, + 'status': 'ok', 'balance': balance } diff --git a/b/cntoai/model_management_add.dspy b/b/cntoai/model_management_add.dspy index 3f0a114..a9c0312 100644 --- a/b/cntoai/model_management_add.dspy +++ b/b/cntoai/model_management_add.dspy @@ -4,7 +4,7 @@ _MODEL_FIELDS = ( 'context_length', 'input_token_price', 'output_token_price', 'cache_hit_input_price', 'billing_method', 'billing_unit', 'capabilities', 'limitations', 'highlights', 'is_active', - 'description', 'listing_status', + 'description', 'listing_status', ) diff --git a/b/cntoai/model_management_customer_search.dspy b/b/cntoai/model_management_customer_search.dspy new file mode 100644 index 0000000..f8e066d --- /dev/null +++ b/b/cntoai/model_management_customer_search.dspy @@ -0,0 +1,94 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + +# 客户侧可见字段(不含 listing_status、is_active 等运营字段) +_CUSTOMER_MODEL_COLUMNS = """ + id, llmid, provider, model_name, display_name, model_type, + context_length, input_token_price, output_token_price, + cache_hit_input_price, billing_method, billing_unit, + capabilities, limitations, highlights, description, sort_order +""" + + +def _customer_listed_conditions(ns): + """已上架且启用的模型;支持按厂商、模型类别筛选""" + conditions = ["listing_status = 1", "is_active = 1"] + if ns.get('provider'): + conditions.append("provider = '%s'" % _escape(ns.get('provider'))) + if ns.get('model_type'): + conditions.append("model_type = '%s'" % _escape(ns.get('model_type'))) + return ' AND '.join(conditions) + +async def model_management_customer_search(ns={}): + """ + 客户查看模型列表:仅已上架且启用的模型。 + + 可选参数: + provider (str) 厂商,精确匹配筛选 + model_type (str) 模型类别,精确匹配筛选 + current_page (int) 页码,默认 1 + page_size (int) 每页条数,默认 10 + + 返回 data: + provider_list 当前可见模型中的厂商列表(去重) + model_type_list 当前可见模型中的模型类别列表(去重) + filter_total 当前筛选条件下的模型数量 + model_list 模型列表 + page_size, current_page + + 调用示例见 model_management_customer_search.dspy + """ + page_size = int(ns.get('page_size', 1000)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + where_clause = _customer_listed_conditions(ns) + listed_base = "listing_status = 1 AND is_active = 1" + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + provider_sql = """ + SELECT DISTINCT provider FROM model_management + WHERE %s AND provider IS NOT NULL AND provider != '' + ORDER BY provider; + """ % listed_base + model_type_sql = """ + SELECT DISTINCT model_type FROM model_management + WHERE %s AND model_type IS NOT NULL AND model_type != '' + ORDER BY model_type; + """ % listed_base + + count_sql = """ + SELECT COUNT(*) AS total_count FROM model_management WHERE %s; + """ % where_clause + find_sql = """ + SELECT %s FROM model_management + WHERE %s + ORDER BY sort_order ASC + LIMIT %s OFFSET %s; + """ % (_CUSTOMER_MODEL_COLUMNS, where_clause, page_size, offset) + + provider_rows = await sor.sqlExe(provider_sql, {}) + model_type_rows = await sor.sqlExe(model_type_sql, {}) + filter_total = (await sor.sqlExe(count_sql, {}))[0]['total_count'] + model_list = await sor.sqlExe(find_sql, {}) + + return { + 'status': True, + 'msg': 'customer model search success', + 'data': { + 'provider_list': [r['provider'] for r in provider_rows], + 'model_type_list': [r['model_type'] for r in model_type_rows], + 'filter_total': filter_total, + 'page_size': page_size, + 'current_page': current_page, + 'model_list': model_list, + }, + } + except Exception as e: + return {'status': False, 'msg': 'customer model search failed, %s' % str(e)} + +ret = await model_management_customer_search(params_kw) +return ret \ No newline at end of file diff --git a/b/cntoai/model_management_move_down.dspy b/b/cntoai/model_management_move_down.dspy new file mode 100644 index 0000000..d0b9ac6 --- /dev/null +++ b/b/cntoai/model_management_move_down.dspy @@ -0,0 +1,67 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + +async def model_management_move_down(ns={}): + """ + 下移:与排序上的下一条记录交换 sort_order(已在最后则提示) + + 必填参数: + id (int|str) 模型主键 + """ + model_id = ns.get('id') + if not model_id: + return {'status': False, 'msg': 'id is required'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + current_sql = """ + SELECT id, sort_order FROM model_management WHERE id = '%s' LIMIT 1; + """ % _escape(model_id) + current = await sor.sqlExe(current_sql, {}) + if not current: + return {'status': False, 'msg': 'model not found'} + + cur = current[0] + cur_order = int(cur.get('sort_order') or 0) + cur_id = int(cur.get('id')) + + next_sql = """ + SELECT id, sort_order FROM model_management + WHERE (sort_order > %s) OR (sort_order = %s AND id > %s) + ORDER BY sort_order ASC, id ASC + LIMIT 1; + """ % (cur_order, cur_order, cur_id) + next_row = await sor.sqlExe(next_sql, {}) + if not next_row: + return {'status': True, 'msg': 'already at bottom', 'data': {'sort_order': cur_order}} + + nxt = next_row[0] + nxt_order = int(nxt.get('sort_order') or 0) + nxt_id = _escape(nxt.get('id')) + + swap_cur_sql = """ + UPDATE model_management SET sort_order = %s WHERE id = '%s'; + """ % (nxt_order, _escape(model_id)) + swap_nxt_sql = """ + UPDATE model_management SET sort_order = %s WHERE id = '%s'; + """ % (cur_order, nxt_id) + await sor.sqlExe(swap_cur_sql, {}) + await sor.sqlExe(swap_nxt_sql, {}) + return { + 'status': True, + 'msg': 'move down success', + 'data': { + 'id': model_id, + 'sort_order': nxt_order, + 'swapped_with_id': nxt.get('id'), + }, + } + except Exception as e: + await sor.rollback() + return {'status': False, 'msg': 'move down failed, %s' % str(e)} + +ret = await model_management_move_down(params_kw) +return ret \ No newline at end of file diff --git a/b/cntoai/model_management_pin_top.dspy b/b/cntoai/model_management_pin_top.dspy new file mode 100644 index 0000000..3a47250 --- /dev/null +++ b/b/cntoai/model_management_pin_top.dspy @@ -0,0 +1,49 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + +async def model_management_pin_top(ns={}): + """ + 置顶:将模型排到全局列表最前(sort_order 设为当前最小值 - 1) + + 必填参数: + id (int|str) 模型主键 + """ + model_id = ns.get('id') + if not model_id: + return {'status': False, 'msg': 'id is required'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + current_sql = """ + SELECT id, sort_order FROM model_management WHERE id = '%s' LIMIT 1; + """ % _escape(model_id) + current = await sor.sqlExe(current_sql, {}) + if not current: + return {'status': False, 'msg': 'model not found'} + + min_sql = "SELECT MIN(sort_order) AS min_order FROM model_management;" + min_order = int((await sor.sqlExe(min_sql, {}))[0].get('min_order') or 0) + current_order = int(current[0].get('sort_order') or 0) + + if current_order <= min_order: + return {'status': True, 'msg': 'already at top', 'data': {'sort_order': current_order}} + + new_order = min_order - 1 + update_sql = """ + UPDATE model_management SET sort_order = %s WHERE id = '%s'; + """ % (new_order, _escape(model_id)) + await sor.sqlExe(update_sql, {}) + return { + 'status': True, + 'msg': 'pin to top success', + 'data': {'id': model_id, 'sort_order': new_order}, + } + except Exception as e: + await sor.rollback() + return {'status': False, 'msg': 'pin to top failed, %s' % str(e)} + +ret = await model_management_pin_top(params_kw) +return ret \ No newline at end of file diff --git a/b/cntoai/model_management_search.dspy b/b/cntoai/model_management_search.dspy index 18ad880..6893ee2 100644 --- a/b/cntoai/model_management_search.dspy +++ b/b/cntoai/model_management_search.dspy @@ -4,7 +4,7 @@ _MODEL_FIELDS = ( 'context_length', 'input_token_price', 'output_token_price', 'cache_hit_input_price', 'billing_method', 'billing_unit', 'capabilities', 'limitations', 'highlights', 'is_active', - 'description', 'listing_status', + 'description', 'listing_status', 'sort_order', ) @@ -30,14 +30,15 @@ async def model_management_search(ns={}): """ import traceback - page_size = int(ns.get('page_size', 100)) + page_size = int(ns.get('page_size', 1000)) current_page = int(ns.get('current_page', 1)) offset = (current_page - 1) * page_size conditions = ['1=1'] if ns.get('model_name'): model_name = ns.get('model_name') - conditions.append("model_name LIKE '%s'" % model_name) + # 模糊查询 + conditions.append(f"model_name LIKE '%%%%{model_name}%%%%'") if ns.get('model_type'): conditions.append("model_type = '%s'" % _escape(ns.get('model_type'))) if ns.get('provider'): @@ -57,7 +58,7 @@ async def model_management_search(ns={}): count_sql = """SELECT COUNT(*) AS total_count FROM model_management WHERE %s;""" % where_clause filter_total = (await sor.sqlExe(count_sql, {}))[0]['total_count'] - find_sql = """SELECT * FROM model_management WHERE %s ORDER BY updated_at DESC LIMIT %s OFFSET %s;""" % (where_clause, page_size, offset) + find_sql = """SELECT * FROM model_management WHERE %s ORDER BY sort_order ASC LIMIT %s OFFSET %s;""" % (where_clause, page_size, offset) model_list = await sor.sqlExe(find_sql, {}) return { diff --git a/b/cntoai/process_user_billing.dspy b/b/cntoai/process_user_billing.dspy index 1036eaa..03964fd 100644 --- a/b/cntoai/process_user_billing.dspy +++ b/b/cntoai/process_user_billing.dspy @@ -26,7 +26,7 @@ async def _charge_order(sor, orderid, order_type='NEW'): """ order_rows = await sor.R('bz_order', {'id': orderid}) if not order_rows: - return {'status': False, 'msg': '订单不存在'} + return {'status': 'error', 'msg': '订单不存在'} order_row = order_rows[0] product_url = None @@ -39,7 +39,7 @@ async def _charge_order(sor, orderid, order_type='NEW'): if count - float(order_row['amount']) < 0: pricedifference = count - round(order_row['amount'], 2) return { - 'status': False, + 'status': 'error', 'msg': '账户余额不足', 'pricedifference': round(pricedifference, 2), } @@ -123,7 +123,7 @@ async def _charge_order(sor, orderid, order_type='NEW'): # await sor.C('customer_goods', nss) return {'status': True, 'msg': '支付成功'} except Exception as error: - return {'status': False, 'msg': str(error)} + return {'status': 'error', 'msg': str(error)} async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1): """ @@ -136,18 +136,18 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit :param quantity: 数量,默认 1 :return: dict 成功: status=True, amount(行总金额), price(折后单价), list_price, discount - 失败: status=False, msg + 失败: status='error', msg """ try: supply_price = abs(float(supply_price)) quantity = int(quantity) except (TypeError, ValueError): - return {'status': False, 'msg': 'supply_price / quantity 必须为有效数字'} + return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'} if supply_price <= 0: - return {'status': False, 'msg': 'supply_price 必须大于 0'} + return {'status': 'error', 'msg': 'supply_price 必须大于 0'} if quantity <= 0: - return {'status': False, 'msg': 'quantity 必须大于 0'} + return {'status': 'error', 'msg': 'quantity 必须大于 0'} saleprotocol_to_person = await sor.R( 'saleprotocol', @@ -193,7 +193,7 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit ) if not product_salemode: - return {'status': False, 'msg': '还未上线这个产品的协议配置'} + return {'status': 'error', 'msg': '还未上线这个产品的协议配置'} discount = product_salemode[0]['discount'] list_price = supply_price @@ -217,12 +217,25 @@ async def process_user_billing(ns={}): :param userid: 用户 ID :param providername: 厂商名称(写入 bz_order.source,并用于查 product) :param productname: 产品名称(写入 servicename,并用于查 product) - :param amount: 扣费金额;use_saleprotocol=False 时为最终扣费额; + :param amount: 扣费金额;use_saleprotocol='error' 时为最终扣费额; use_saleprotocol=True 时为供应价/目录价(折扣前单价),走协议算价 - :param use_saleprotocol: 是否启用 saleprotocol_pricing 协议折扣算价,默认 False 直接按 amount 扣费 + :param use_saleprotocol: 是否启用 saleprotocol_pricing 协议折扣算价,默认 'error' 直接按 amount 扣费 :param quantity: 仅 use_saleprotocol=True 时生效,数量默认 1 :return: dict,含 status、msg;成功时含 orderid、amount """ + # 存储输入值到usage表 + db = DBPools() + async with db.sqlorContext('kboss') as sor: + usage_ns = { + 'id': uuid(), + 'userid': ns.get('userid'), + 'apikey': ns.get('apikey'), + 'llmid': ns.get('llmid'), + 'original_price': ns.get('amount'), + 'usage_content': json.dumps(ns.get('usage')) if isinstance(ns.get('usage'), dict) else ns.get('usage') + } + await sor.C('model_usage', usage_ns) + apikey = ns.get('apikey') userid = ns.get('userid') providername = ns.get('providername') @@ -234,142 +247,151 @@ async def process_user_billing(ns={}): llmid = ns.get('llmid') if not llmid: return { - 'status': False, + 'status': 'error', 'msg': 'llmid必传' } try: amount = round(float(amount), 2) except (TypeError, ValueError): - return {'status': False, 'msg': 'amount 必须为有效数字'} + return {'status': 'error', 'msg': 'amount 必须为有效数字'} if amount <= 0: - return {'status': False, 'msg': 'amount 必须大于 0'} + return {'status': 'error', 'msg': 'amount 必须大于 0'} db = DBPools() async with db.sqlorContext('kboss') as sor: - product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'}) - if not product_li: - return { - 'status': False, - 'msg': '未找到对应产品,请确认' + try: + product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'}) + if not product_li: + return { + 'status': 'error', + 'msg': '未找到对应产品,请确认' + } + product = product_li[0] + productname = product['name'] + providerid = product['providerid'] + providername_list = await sor.R('organization', {'id': providerid}) + if not providername_list: + return { + 'status': 'error', + 'msg': '厂商不存在 %s' % providername + } + providername = providername_list[0]['orgname'] + + userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) + if not userid_li: + return { + 'status': 'error', + 'msg': 'apikey无效,请联系管理员' + } + # userid = userid_li[0]['userid'] + + user_list = await sor.R('users', {'id': userid}) + if not user_list: + return {'status': 'error', 'msg': '用户不存在 %s' % userid} + + org_list = await sor.R('organization', {'id': user_list[0]['orgid']}) + if not org_list: + return {'status': 'error', 'msg': '用户所属机构不存在'} + + customerid = org_list[0]['id'] + # product = await _lookup_product(sor, providername, productname) + # if not product: + # return { + # 'status': 'error', + # 'msg': '未找到对应产品,请确认 providername/productname 与库中 provider、product 配置一致', + # } + + list_price = amount + unit_price = amount + discount = 1 + originalprice = amount + + if use_saleprotocol: + price_res = await calc_price_by_saleprotocol( + sor, org_list[0], product['id'], amount, quantity=quantity, + ) + if not price_res['status']: + return price_res + debug(price_res) + debug('list_price %s' % list_price) + amount = price_res['amount'] + list_price = price_res['list_price'] + unit_price = price_res['price'] + discount = price_res['discount'] + originalprice = list_price * quantity + + balance = await getCustomerBalance(sor, customerid) + if balance is None: + balance = 0 + if amount > balance: + return { + 'status': 'error', + 'msg': '账户余额不足', + 'pricedifference': round(balance - amount, 2), + } + + order_id = uuid() + now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + bz_ns = { + 'id': order_id, + 'order_status': '0', + 'business_op': 'BUY', + 'userid': userid, + 'customerid': customerid, + 'order_date': now_str, + 'source': providername, + 'amount': amount, + 'originalprice': round(originalprice, 2), + 'ordertype': 'prepay', + 'servicename': productname, } - product = product_li[0] - productname = product['name'] - providerid = product['providerid'] - providername_list = await sor.R('organization', {'id': providerid}) - if not providername_list: - return { - 'status': False, - 'msg': '厂商不存在 %s' % providername + await sor.C('bz_order', bz_ns) + + goods_ns = { + 'id': uuid(), + 'orderid': order_id, + 'productid': product['id'], + 'providerid': product['providerid'], + 'list_price': list_price, + 'discount': discount, + 'quantity': quantity if use_saleprotocol else 1, + 'price': unit_price, + 'amount': amount, + 'chargemode': 'prepay', + 'servicename': productname, + 'resourceids': '', + 'resourcestarttime': now_str, + 'resourceendtime': None, } - providername = providername_list[0]['orgname'] - - userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) - if not userid_li: - return { - 'status': False, - 'msg': 'apikey无效,请联系管理员' + await sor.C('order_goods', goods_ns) + + charge_res = await _charge_order(sor, order_id, order_type='NEW') + if not charge_res['status']: + await sor.rollback() + return charge_res + + await sor.U('model_usage', {'id': usage_ns['id'], 'orderid': order_id, 'bill_status': 1}) + + result = { + 'status': 'ok', + 'msg': '扣费成功', + 'orderid': order_id, + 'amount': amount, + 'productid': product['id'], } - # userid = userid_li[0]['userid'] - - user_list = await sor.R('users', {'id': userid}) - if not user_list: - return {'status': False, 'msg': '用户不存在 %s' % userid} - - org_list = await sor.R('organization', {'id': user_list[0]['orgid']}) - if not org_list: - return {'status': False, 'msg': '用户所属机构不存在'} - - customerid = org_list[0]['id'] - # product = await _lookup_product(sor, providername, productname) - # if not product: - # return { - # 'status': False, - # 'msg': '未找到对应产品,请确认 providername/productname 与库中 provider、product 配置一致', - # } - - list_price = amount - unit_price = amount - discount = 1 - originalprice = amount - - if use_saleprotocol: - price_res = await calc_price_by_saleprotocol( - sor, org_list[0], product['id'], amount, quantity=quantity, - ) - if not price_res['status']: - return price_res - debug(price_res) - debug('list_price %s' % list_price) - amount = price_res['amount'] - list_price = price_res['list_price'] - unit_price = price_res['price'] - discount = price_res['discount'] - originalprice = list_price * quantity - - balance = await getCustomerBalance(sor, customerid) - if balance is None: - balance = 0 - if amount > balance: + if use_saleprotocol: + result['discount'] = discount + result['list_price'] = list_price + result['price'] = unit_price + return result + except Exception as e: + sor.rollback() return { - 'status': False, - 'msg': '账户余额不足', - 'pricedifference': round(balance - amount, 2), + 'status': 'error', + 'msg': str(e) } - - order_id = uuid() - now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') - bz_ns = { - 'id': order_id, - 'order_status': '0', - 'business_op': 'BUY', - 'userid': userid, - 'customerid': customerid, - 'order_date': now_str, - 'source': providername, - 'amount': amount, - 'originalprice': round(originalprice, 2), - 'ordertype': 'prepay', - 'servicename': productname, - } - await sor.C('bz_order', bz_ns) - - goods_ns = { - 'id': uuid(), - 'orderid': order_id, - 'productid': product['id'], - 'providerid': product['providerid'], - 'list_price': list_price, - 'discount': discount, - 'quantity': quantity if use_saleprotocol else 1, - 'price': unit_price, - 'amount': amount, - 'chargemode': 'prepay', - 'servicename': productname, - 'resourceids': '', - 'resourcestarttime': now_str, - 'resourceendtime': None, - } - await sor.C('order_goods', goods_ns) - - charge_res = await _charge_order(sor, order_id, order_type='NEW') - if not charge_res['status']: - await sor.rollback() - return charge_res - - result = { - 'status': True, - 'msg': '扣费成功', - 'orderid': order_id, - 'amount': amount, - 'productid': product['id'], - } - if use_saleprotocol: - result['discount'] = discount - result['list_price'] = list_price - result['price'] = unit_price - return result ret = await process_user_billing(params_kw) return ret \ No newline at end of file diff --git a/b/cntoai/sync_cn_ai_user.dspy b/b/cntoai/sync_cn_ai_user.dspy index e199667..96e9181 100644 --- a/b/cntoai/sync_cn_ai_user.dspy +++ b/b/cntoai/sync_cn_ai_user.dspy @@ -23,8 +23,21 @@ async def sync_cn_ai_user(ns={}): already_sync_user_dappid = 'cndemo' # 目标URL - url = "https://ai.atvoe.com/rbac/usersync" - # url = 'https://ai.atvoe.com/tmp/env.dspy' + # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + domain = None + db = DBPools() + async with db.sqlorContext('kboss') as sor: + domain = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain: + domain = domain[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到域名") + return { + 'status': False, + 'msg': '未找到域名' + } + + url = f"{domain}/rbac/usersync" # 请求头 headers = {