From 7124c0bf62365ef7581f9b4bccc2f877ce1b59de Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Mon, 25 May 2026 01:04:16 +0800 Subject: [PATCH] update cntoai --- b/account/open_all_provider_acc.dspy | 8 + b/ali/jiajie_sync_user.dspy | 5 +- b/bill/finance_order_report.dspy | 1238 +++++++++++++++++++++++ b/bill/finance_order_report1.dspy | 887 ++++++++++++++++ b/bill/finance_order_report_detail.dspy | 1231 ++++++++++++++++++++++ b/bz_order/cumulative_order.dspy | 2 + b/cntoai/chat_send_stream.dspy | 13 +- b/cntoai/model_usage_user_report.dspy | 512 ++++++++++ b/cntoai/process_user_billing.dspy | 9 +- b/customer/getNoinvitationcode.dspy | 9 + b/user/logintype.dspy | 3 +- 11 files changed, 3908 insertions(+), 9 deletions(-) create mode 100644 b/account/open_all_provider_acc.dspy create mode 100644 b/bill/finance_order_report.dspy create mode 100644 b/bill/finance_order_report1.dspy create mode 100644 b/bill/finance_order_report_detail.dspy create mode 100644 b/cntoai/model_usage_user_report.dspy diff --git a/b/account/open_all_provider_acc.dspy b/b/account/open_all_provider_acc.dspy new file mode 100644 index 0000000..987de80 --- /dev/null +++ b/b/account/open_all_provider_acc.dspy @@ -0,0 +1,8 @@ +async def open_all_provider_acc(ns): + db = DBPools() + async with db.sqlorContext('kboss') as sor: + accounting_orgid = ns.get('accounting_orgid') + await openAllProviderAccounts(sor, accounting_orgid) + return {'code':200} +ret = await open_all_provider_acc(params_kw) +return ret diff --git a/b/ali/jiajie_sync_user.dspy b/b/ali/jiajie_sync_user.dspy index 28b0e94..53046c3 100644 --- a/b/ali/jiajie_sync_user.dspy +++ b/b/ali/jiajie_sync_user.dspy @@ -26,9 +26,12 @@ async def jiajie_sync_user(ns={}): } userinfos = await sor.R('users', {'id': userid}) phone = userinfos[0]['mobile'] - email = userinfos[0]['email'] + # email = userinfos[0]['email'] name = userinfos[0]['username'] + # 生成随机邮箱 + email = f"{userid}@opencomputing.cn" + # 构造请求数据 筛选条件: 手机号/邮箱 request_data = { "password": "weishijiajiekyy", diff --git a/b/bill/finance_order_report.dspy b/b/bill/finance_order_report.dspy new file mode 100644 index 0000000..2bf333f --- /dev/null +++ b/b/bill/finance_order_report.dspy @@ -0,0 +1,1238 @@ +# -*- coding: utf-8 -*- +""" +管理人员财务订单报表接口。 + +视角:以 accounting_orgid(账本机构)为查看主体,展示客户订单的定价、利润、本级应付结算。 + +依赖运行环境:DBPools, sor.sqlExe, sor.R + +业务口径(已确认): + - product_salemode.providerid = provider.orgid + - 分销商不直接对供应商结算;仅本机构(parentid 为空)对供应商结算 + - 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算) + - settle_upstream_amount:本级应付(本机构→供应商;分销→直接上级机构) + - profit_amount:本级账本「折扣收入」「底价收入」贷方合计 + - include_sub_reseller_customers:是否包含下级分销商发展的客户 +""" + +async def get_parent_orgid(sor, orgid): + sql = """select a.id from organization a, organization b +where b.parentid = a.id + and a.del_flg = '0' + and b.del_flg = '0' + and b.id = ${orgid}$""" + recs = await sor.sqlExe(sql, {'orgid':orgid}) + if len(recs) == 0: + return None + return recs[0]['id'] + +DBNAME = 'kboss' +RESELLER_ORG = '1' +OWNER_OGR = '0' +CORP_CUSTOMER = '2' +PERSONAL = '3' +PROVIDER = '4' +PUBLISHER = '5' +UNDERWRITER = '6' + +PARTY_OWNER = '本机构' +PARTY_CUSTOMER = '客户' +PARTY_RESELLER = '分销商' +PARTY_PROVIDER = '供应商' +PARTY_PUBLISHER = '算力券发行方' +PARTY_UNDERWRITER = '算力券承销方' + +DEBT = '借' +CREDIT = '贷' + +ACTNAME_BUY = '付费' +ACTNAME_RECHARGE = '充值' +ACTNAME_RECHARGE_ALIPAY = '支付宝充值' +ACTNAME_SETTLE = '结算' + +SALEMODE_DISCOUNT = '折扣' +SALEMODE_REBATE = '代付费' +SALEMODE_FLOORPRICE = '底价' + +ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY' +ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE' +ACTION_RECHARGE = 'RECHARGE' +ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE' +ACTION_BUY = 'BUY' +ACTION_REVERSE_BUY = 'BUY_REVERSE' +ACTION_RENEW = 'RENEW' +ACTION_RENEW_REVERSE = 'RENEW_REVERSE' +ACTION_SETTLE = 'SETTLE' +ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE' + +DBNAME = 'kboss' +CUSTOMER_ORG_TYPES = ('2', '3') + +SALEMODE_LABEL = { + '0': SALEMODE_DISCOUNT, + '1': SALEMODE_REBATE, + '2': SALEMODE_FLOORPRICE, +} + +INCOME_SUBJECTS = ('折扣收入', '底价收入') +PARENT_SETTLE_SUBJECT = '分销商存放资金' +SUPPLIER_SETTLE_PREFIX = '待结转' + +_SALEMODE_SQL_OWN = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + +_SALEMODE_SQL_OFFER = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.offer_orgid = ${offer_orgid}$ + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + + +def _round_money(v): + if v is None: + return None + return round(float(v), 2) + + +def _salemode_label(code): + if code is None: + return None + return SALEMODE_LABEL.get(str(code), str(code)) + + +def _parse_bool(v, default=False): + if v is None: + return default + if isinstance(v, bool): + return v + return str(v).strip().lower() in ('1', 'true', 'yes', 'on') + + +def _parse_page(ns, default_page=1, default_size=20, max_size=100): + """解析分页,避免 page_size=0 导致 LIMIT 0 无数据。""" + try: + page_size = int(ns.get('page_size', default_size) or default_size) + except (TypeError, ValueError): + page_size = default_size + try: + current_page = int(ns.get('current_page', default_page) or default_page) + except (TypeError, ValueError): + current_page = default_page + page_size = max(1, min(page_size, max_size)) + current_page = max(1, current_page) + offset = (current_page - 1) * page_size + return current_page, page_size, offset + + +def _sql_rows(result): + """统一 sqlExe 查询结果为 list[dict]。""" + if result is None: + return [] + if isinstance(result, list): + return result + if isinstance(result, dict): + return [result] + return list(result) + + +def _row_get(row, *keys, default=None): + """兼容不同驱动返回的大小写字段名。""" + if not row: + return default + for key in keys: + if key in row: + return row[key] + lower = key.lower() + for k, v in row.items(): + if k.lower() == lower: + return v + return default + + +async def _check_viewer(sor, accounting_orgid, userid=None): + if not userid: + return True, None + users = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not users: + return False, '用户不存在' + user_orgid = users[0].get('orgid') + if user_orgid == accounting_orgid: + return True, None + parent = await get_parent_orgid(sor, accounting_orgid) + if parent and user_orgid == parent: + return True, None + return False, '无权查看该机构财务数据' + + +async def _org_name(sor, orgid): + if not orgid: + return None + rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'}) + return rows[0]['orgname'] if rows else None + + +async def _is_business_owner(sor, orgid): + rows = await sor.sqlExe( + "SELECT id FROM organization WHERE id=${id}$ AND parentid IS NULL AND del_flg='0'", + {'id': orgid}, + ) + return len(rows) > 0 + + +async def _collect_descendant_reseller_ids(sor, root_orgid): + """递归收集 root 下所有下级分销商 organization.id(org_type=1)。""" + found = [] + queue = [root_orgid] + seen = {root_orgid} + while queue: + pid = queue.pop(0) + rows = await sor.sqlExe( + """SELECT id FROM organization + WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""", + {'pid': pid, 'org_type': RESELLER_ORG}, + ) + for r in rows: + cid = r['id'] + if cid not in seen: + seen.add(cid) + found.append(cid) + queue.append(cid) + return found + + +async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers): + """ + 客户范围 SQL 片段与参数。 + 直属:cust.parentid = accounting_orgid + 含下级分销:直属 + cust.parentid IN (下级分销商 id 列表) + """ + params = {'accounting_orgid': accounting_orgid} + if not include_sub_reseller_customers: + return "cust.parentid = ${accounting_orgid}$", params, [] + + reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid) + if not reseller_ids: + return "cust.parentid = ${accounting_orgid}$", params, [] + + in_keys = [] + for i, rid in enumerate(reseller_ids): + key = 'reseller_%d' % i + params[key] = rid + in_keys.append('${%s}$' % key) + in_sql = ', '.join(in_keys) + cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql + return cond, params, reseller_ids + + +async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub): + if customer_parentid == accounting_orgid: + return True + if include_sub and customer_parentid in reseller_ids: + return True + return False + + +async def _fetch_salemode_row(sor, sql, params): + rows = await sor.sqlExe(sql, params) + return rows[0] if rows else None + + +async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid): + """本级应付对象:本机构→供应商;分销→直接上级。""" + if await _is_business_owner(sor, accounting_orgid): + return { + 'settle_upstream_type': 'supplier', + 'settle_upstream_orgid': provider_orgid, + 'settle_upstream_orgname': await _org_name(sor, provider_orgid), + 'immediate_parent_orgid': None, + } + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + return { + 'settle_upstream_type': 'parent_org', + 'settle_upstream_orgid': parent_orgid, + 'settle_upstream_orgname': await _org_name(sor, parent_orgid), + 'immediate_parent_orgid': parent_orgid, + } + + +async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity): + curdate = bill_date + if hasattr(curdate, 'strftime'): + curdate = curdate.strftime('%Y-%m-%d') + elif curdate is not None: + curdate = str(curdate)[:10] + + qty = int(quantity or 1) + base = {'providerid': providerid, 'productid': productid, 'curdate': curdate} + + own = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid), + ) + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid), + ) + if not cust: + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid='*'), + ) + + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + reseller = None + if parent_orgid: + reseller = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid), + ) + + return { + 'parent_orgid': parent_orgid, + 'parent_orgname': await _org_name(sor, parent_orgid), + 'own_salemode': own.get('salemode') if own else None, + 'own_discount': own.get('discount') if own else None, + 'own_floor_unit_price': own.get('price') if own else None, + 'customer_salemode': cust.get('salemode') if cust else None, + 'customer_discount': cust.get('discount') if cust else None, + 'customer_floor_unit_price': cust.get('price') if cust else None, + 'reseller_discount': reseller.get('discount') if reseller else None, + 'reseller_salemode': reseller.get('salemode') if reseller else None, + 'reseller_floor_unit_price': reseller.get('price') if reseller else None, + 'quantity': qty, + } + + +def _estimate_finance(catalog_amount, protocol, is_owner): + """未记账:估算本级利润与本级应付(不含代付费单独字段)。""" + catalog_amount = float(catalog_amount or 0) + qty = protocol.get('quantity') or 1 + profit = None + settle_upstream = None + + own_mode = protocol.get('own_salemode') + cust_mode = protocol.get('customer_salemode') + own_disc = protocol.get('own_discount') + cust_disc = protocol.get('customer_discount') + reseller_disc = protocol.get('reseller_discount') + + if own_mode == '0' or cust_mode == '0': + if own_disc is not None and cust_disc is not None: + profit = catalog_amount * (float(cust_disc) - float(own_disc)) + if is_owner: + if own_disc is not None: + settle_upstream = catalog_amount * float(own_disc) + elif reseller_disc is not None: + settle_upstream = catalog_amount * float(reseller_disc) + + elif own_mode == '2' or cust_mode == '2': + own_price = protocol.get('own_floor_unit_price') + cust_price = protocol.get('customer_floor_unit_price') + reseller_price = protocol.get('reseller_floor_unit_price') + if own_price is not None and cust_price is not None: + profit = (float(cust_price) - float(own_price)) * qty + if is_owner and own_price is not None: + settle_upstream = float(own_price) * qty + elif not is_owner and reseller_price is not None: + settle_upstream = float(reseller_price) * qty + elif not is_owner and cust_price is not None: + settle_upstream = float(cust_price) * qty + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream), + 'amount_source': 'estimated', + } + + +async def _bill_detail_rows(sor, billid): + """单表读 bill_detail,优先 sor.R(避免 sqlExe 多表问题)。""" + rows = await sor.R('bill_detail', {'billid': billid, 'del_flg': '0'}) + if rows: + return rows + return _sql_rows(await sor.sqlExe( + """SELECT accounting_orgid, subjectname, accounting_dir, + participantid, participanttype, amount + FROM bill_detail WHERE billid=${billid}$ AND del_flg='0'""", + {'billid': billid}, + )) + + +async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_owner, parent_orgid): + """已记账:从 bill_detail 取本级利润与本级应付。""" + rows = await _bill_detail_rows(sor, billid) + profit = 0.0 + settle_upstream = 0.0 + legs = [] + + for r in rows: + legs.append({ + 'accounting_orgid': r['accounting_orgid'], + 'subjectname': r['subjectname'], + 'accounting_dir': r['accounting_dir'], + 'participanttype': r.get('participanttype'), + 'participantid': r.get('participantid'), + 'amount': _round_money(r['amount']), + }) + amt = float(r['amount'] or 0) + book = r['accounting_orgid'] + subj = r['subjectname'] or '' + direction = r['accounting_dir'] + + if book == accounting_orgid and subj in INCOME_SUBJECTS and direction == '贷': + profit += amt + + if is_owner: + if ( + book == accounting_orgid + and subj.startswith(SUPPLIER_SETTLE_PREFIX) + and direction == '贷' + ): + settle_upstream += amt + else: + if ( + parent_orgid + and book == parent_orgid + and subj == PARENT_SETTLE_SUBJECT + and direction == '借' + and r.get('participantid') == accounting_orgid + ): + settle_upstream += amt + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0), + 'amount_source': 'bill_detail', + 'bill_detail_legs': legs, + } + + +async def _build_report_row(sor, row, accounting_orgid, is_owner): + bill_id = _row_get(row, 'bill_id') + customerid = _row_get(row, 'customerid') + providerid = _row_get(row, 'providerid') + productid = _row_get(row, 'productid') + bill_date = _row_get(row, 'bill_date') + quantity = _row_get(row, 'quantity') or 1 + catalog_amount = _round_money(_row_get(row, 'catalog_amount')) + customer_pay = _round_money(_row_get(row, 'customer_pay_amount')) + customer_parentid = _row_get(row, 'customer_parentid') + + protocol = await _protocol_snapshot( + sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity, + ) + parent_orgid = protocol['parent_orgid'] + settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid) + + bill_state = _row_get(row, 'bill_state') + if str(bill_state) == '1': + amounts = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + amounts = _estimate_finance(catalog_amount, protocol, is_owner) + amounts['bill_detail_legs'] = [] + + product_name = _row_get(row, 'product_name') + if not product_name and productid: + prows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if prows: + product_name = prows[0].get('name') + provider_name = _row_get(row, 'provider_name') + if not provider_name and providerid: + prov_rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1", + {'orgid': providerid}, + ) + if prov_rows: + provider_name = prov_rows[0].get('name') + + serving_reseller_id = None + serving_reseller_name = None + is_direct = customer_parentid == accounting_orgid + if not is_direct and customer_parentid: + serving_reseller_id = customer_parentid + serving_reseller_name = await _org_name(sor, customer_parentid) + + order_date = _row_get(row, 'order_date') + servicename = _row_get(row, 'servicename') + + return { + 'bill_id': bill_id, + 'order_id': _row_get(row, 'order_id'), + 'bill_date': str(bill_date)[:10] if bill_date else None, + 'order_date': str(order_date)[:19] if order_date else None, + 'business_op': _row_get(row, 'business_op'), + 'bill_state': bill_state, + 'accounted': str(bill_state) == '1', + 'customer': { + 'id': customerid, + 'name': _row_get(row, 'customer_name'), + 'parent_orgid': customer_parentid, + 'is_direct_customer': is_direct, + 'serving_reseller': { + 'id': serving_reseller_id, + 'name': serving_reseller_name, + } if serving_reseller_id else None, + }, + 'product': { + 'id': productid, + 'name': product_name or servicename, + 'servicename': servicename, + }, + 'provider': { + 'orgid': providerid, + 'name': provider_name, + }, + 'quantity': quantity, + 'pricing': { + 'catalog_amount': catalog_amount, + 'list_price_unit': _round_money(_row_get(row, 'list_price')), + 'order_discount': _round_money(_row_get(row, 'order_discount')) + if _row_get(row, 'order_discount') is not None + else None, + 'order_unit_price': _round_money(_row_get(row, 'order_unit_price')), + 'customer_pay_amount': customer_pay, + }, + 'viewer_org': { + 'id': accounting_orgid, + 'is_business_owner': is_owner, + 'immediate_parent': { + 'id': settle_meta.get('immediate_parent_orgid'), + 'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')), + } if settle_meta.get('immediate_parent_orgid') else None, + }, + 'protocol': { + 'parent_salemode': _salemode_label(protocol.get('own_salemode')), + 'parent_discount_to_us': _round_money(protocol.get('own_discount')), + 'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')), + 'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')), + 'our_discount_to_customer': _round_money(protocol.get('customer_discount')), + 'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')), + 'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')), + }, + 'finance': { + 'profit_amount': amounts['profit_amount'], + 'settle_upstream_amount': amounts['settle_upstream_amount'], + 'settle_upstream_type': settle_meta['settle_upstream_type'], + 'settle_upstream_target': { + 'id': settle_meta['settle_upstream_orgid'], + 'name': settle_meta['settle_upstream_orgname'], + }, + 'amount_source': amounts['amount_source'], + }, + 'bill_detail_legs': amounts.get('bill_detail_legs', []), + } + + +def _bill_core_from_clause(): + """统计/查 ID 用最小 JOIN(兼容无 ordergoodsid 字段的库)。""" + return """ +FROM bill b +INNER JOIN bz_order o ON b.orderid = o.id +INNER JOIN organization cust ON b.customerid = cust.id +""" + + +def _bill_ids_sql(where_sql): + """不在 SQL 里写 LIMIT:部分环境 sqlExe 对 LIMIT 会返回空列表。""" + return f""" +SELECT b.id AS bill_id +{_bill_core_from_clause()} +WHERE {where_sql} +ORDER BY b.bill_date DESC, b.create_at DESC +""" + + +def _sql_quote_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + if not s: + return None + for ch in s: + if not (ch.isalnum() or ch in ('_', '-')): + return None + return "'" + s.replace("'", "''") + "'" + + +async def _fetch_overview_bill_rows(sor, where_sql, params, max_bills): + """ + 概览取数:先用仅 bill_id 的 SQL(已验证可用),再 sor.R 组装明细。 + 多列 + JOIN 的 _bill_scope_list_sql 在本环境 sqlExe 会返回空列表。 + """ + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + + total_ids = len(all_ids) + truncated = total_ids > max_bills + if truncated: + all_ids = all_ids[:max_bills] + + rows = [] + for bid in all_ids: + row = await _fetch_bill_row_via_R(sor, bid) + if row: + rows.append(row) + + return { + 'total_ids': total_ids, + 'truncated': truncated, + 'id_query_len': len(id_rows), + 'rows': rows, + } + + +def _new_amount_bucket(): + return { + 'sales_total': 0.0, + 'profit_total': 0.0, + 'settle_upstream_total': 0.0, + 'bill_count': 0, + } + + +def _bucket_add(bucket, sales, profit, settle): + bucket['sales_total'] += float(sales or 0) + bucket['profit_total'] += float(profit or 0) + bucket['settle_upstream_total'] += float(settle or 0) + bucket['bill_count'] += 1 + + +def _bucket_round(bucket): + return { + 'sales_total': _round_money(bucket['sales_total']), + 'profit_total': _round_money(bucket['profit_total']), + 'settle_upstream_total': _round_money(bucket['settle_upstream_total']), + 'bill_count': bucket['bill_count'], + } + + +def _norm_org_id(orgid): + if orgid is None: + return None + s = str(orgid).strip() + return s if s else None + + +def _customer_segment(customer_parentid, accounting_orgid, reseller_id_set): + pid = _norm_org_id(customer_parentid) + acc = _norm_org_id(accounting_orgid) + if pid == acc: + return 'direct_customers' + if pid in reseller_id_set: + return 'from_sub_resellers' + return None + + +async def _bill_finance_amounts(sor, row, accounting_orgid, is_owner, parent_orgid): + """单笔:销售额、本级利润、本级应付上级/供应商。""" + sales = float(_row_get(row, 'customer_pay_amount') or 0) + bill_id = _row_get(row, 'bill_id') + bill_state = _row_get(row, 'bill_state') + quantity = int(_row_get(row, 'quantity') or 1) + + if str(bill_state) == '1': + fin = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + catalog = float(_row_get(row, 'catalog_amount') or 0) + protocol = await _protocol_snapshot( + sor, + accounting_orgid, + _row_get(row, 'customerid'), + _row_get(row, 'providerid'), + _row_get(row, 'productid'), + _row_get(row, 'bill_date'), + quantity, + ) + fin = _estimate_finance(catalog, protocol, is_owner) + + profit = float(fin.get('profit_amount') or 0) + settle = float(fin.get('settle_upstream_amount') or 0) + return sales, profit, settle, fin.get('amount_source', 'unknown') + + +async def _provider_name_cache(sor, cache, provider_orgid): + if not provider_orgid: + return None + if provider_orgid in cache: + return cache[provider_orgid] + name = None + rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${oid}$ AND del_flg='0' LIMIT 1", + {'oid': provider_orgid}, + ) + if rows: + name = rows[0].get('name') + cache[provider_orgid] = name + return name + + +async def _product_name_cache(sor, cache, productid): + if not productid: + return None + if productid in cache: + return cache[productid] + name = None + rows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if rows: + name = rows[0].get('name') + cache[productid] = name + return name + + +def _count_sql(where_sql): + return f""" +SELECT COUNT(*) AS total_count +{_bill_core_from_clause()} +WHERE {where_sql} +""" + + +def _normalize_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + return s if s else None + + +async def _fetch_bill_row_via_R(sor, bill_id): + """ + 用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。 + 已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。 + """ + bid = _normalize_bill_id(bill_id) + if not bid: + return None + + bills = await sor.R('bill', {'id': bid, 'del_flg': '0'}) + if not bills: + bills = await sor.R('bill', {'id': bid}) + if not bills: + return None + b = bills[0] + + o = {} + oid = b.get('orderid') + if oid: + orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'}) + if not orders: + orders = await sor.R('bz_order', {'id': oid}) + if orders: + o = orders[0] + + cust = {} + cid = b.get('customerid') + if cid: + custs = await sor.R('organization', {'id': cid, 'del_flg': '0'}) + if not custs: + custs = await sor.R('organization', {'id': cid}) + if custs: + cust = custs[0] + + og = {} + ogid = b.get('ordergoodsid') + if ogid: + ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'}) + if ogs: + og = ogs[0] + if not og and oid: + ogs = await sor.sqlExe( + """SELECT list_price, discount, price FROM order_goods + WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""", + {'oid': oid}, + ) + if ogs: + og = ogs[0] + + return { + 'bill_id': b.get('id'), + 'order_id': oid, + 'customerid': cid, + 'productid': b.get('productid'), + 'providerid': b.get('providerid'), + 'bill_date': b.get('bill_date'), + 'bill_state': b.get('bill_state'), + 'catalog_amount': b.get('provider_amt'), + 'customer_pay_amount': b.get('amount'), + 'quantity': b.get('quantity'), + 'order_date': o.get('order_date'), + 'business_op': o.get('business_op'), + 'servicename': o.get('servicename'), + 'customer_name': cust.get('orgname'), + 'customer_parentid': cust.get('parentid'), + 'list_price': og.get('list_price'), + 'order_discount': og.get('discount'), + 'order_unit_price': og.get('price'), + } + + +async def _fetch_bill_rows_by_ids(sor, bill_ids): + rows = [] + for bid in bill_ids: + one = await _fetch_bill_row_via_R(sor, bid) + if one: + rows.append(one) + return rows + + +async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size): + """ + 先查全部符合条件的 bill_id(通常数量可控),再在 Python 分页; + 明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。 + """ + id_sql = _bill_ids_sql(where_sql) + id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + total = len(all_ids) + page_ids = all_ids[offset: offset + page_size] + if not page_ids: + return total, [] + ordered = await _fetch_bill_rows_by_ids(sor, page_ids) + return total, ordered + + +async def _finance_report_debug(sor, where_sql, params, page_size, offset): + """debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。""" + dbg = {} + try: + c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params)) + dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None + except Exception as e: + dbg['count_error'] = str(e) + try: + ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + dbg['ids_query_len'] = len(ids) + except Exception as e: + dbg['ids_error'] = str(e) + try: + lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset) + lim = _sql_rows(await sor.sqlExe(lim_sql, params)) + dbg['ids_with_limit_len'] = len(lim) + except Exception as e: + dbg['ids_with_limit_error'] = str(e) + try: + page_ids = [] + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + if id_rows: + dbg['first_id_row_keys'] = list(id_rows[0].keys()) + dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID') + for r in id_rows[offset: offset + page_size]: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid: + page_ids.append(bid) + dbg['page_ids_len'] = len(page_ids) + if page_ids: + dbg['sample_page_id'] = page_ids[0] + rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'}) + dbg['sor_R_bill_count'] = len(rb) if rb else 0 + one = await _fetch_bill_row_via_R(sor, page_ids[0]) + dbg['sor_R_row_ok'] = one is not None + except Exception as e: + dbg['detail_fetch_error'] = str(e) + return dbg + + +async def finance_order_report(ns=None): + """ + 管理人员 — 客户订单财务列表(分页)。 + + 入参 ns: + accounting_orgid (必填) 账本机构 + include_sub_reseller_customers (可选) 默认 false;true 时含下级分销商的客户订单 + userid, start_date, end_date, customerid, productid, order_id, bill_state + current_page (默认1), page_size (默认20, 最大100) + + 返回 finance.settle_upstream_*: + - 本机构:应付供应商(待结转* 贷方,仅本机构账本) + - 分销商:应付直接上级(上级账本「分销商存放资金」借方,participant=本级) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + current_page, page_size, offset = _parse_page(ns) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if ns.get('customerid'): + conditions.append('b.customerid = ${customerid}$') + params['customerid'] = ns['customerid'] + if ns.get('productid'): + conditions.append('b.productid = ${productid}$') + params['productid'] = ns['productid'] + if ns.get('order_id'): + conditions.append('b.orderid = ${order_id}$') + params['order_id'] = ns['order_id'] + if ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + if total_count > 0 and offset >= total_count: + current_page = 1 + offset = 0 + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + + debug_info = None + if _parse_bool(ns.get('debug'), False): + debug_info = await _finance_report_debug( + sor, where_sql, params, page_size, offset, + ) + debug_info['page_ids_fetched'] = len(rows) + + items = [] + sum_profit = 0.0 + sum_pay = 0.0 + sum_upstream = 0.0 + row_errors = [] + + for row in rows: + try: + item = await _build_report_row(sor, row, accounting_orgid, is_owner) + except Exception as exc: + row_errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + items.append(item) + fin = item['finance'] + if fin.get('profit_amount') is not None: + sum_profit += fin['profit_amount'] + pay = item['pricing'].get('customer_pay_amount') + if pay is not None: + sum_pay += pay + up = fin.get('settle_upstream_amount') + if up is not None: + sum_upstream += up + + return { + 'status': True, + 'msg': 'ok', + 'data': { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'summary': { + 'customer_pay_total': _round_money(sum_pay), + 'profit_total': _round_money(sum_profit), + 'settle_upstream_total': _round_money(sum_upstream), + }, + 'items': items, + 'row_errors': row_errors, + 'list_row_count': len(rows), + 'debug': debug_info, + }, + } + + +async def finance_order_report_detail(ns=None): + """ + 管理人员 — 单笔账单财务明细。 + + 入参:accounting_orgid, bill_id, include_sub_reseller_customers(与列表一致,用于校验客户范围) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + bill_id = ns.get('bill_id') + if not accounting_orgid or not bill_id: + return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + _, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub) + is_owner = await _is_business_owner(sor, accounting_orgid) + + rows = await _fetch_bill_rows_by_ids(sor, [bill_id]) + if not rows: + return {'status': False, 'msg': '账单不存在'} + + row = rows[0] + if not await _customer_in_scope( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_ids, + include_sub, + ): + return {'status': False, 'msg': '该账单不在当前查询客户范围内'} + + detail = await _build_report_row(sor, row, accounting_orgid, is_owner) + detail['customer_scope'] = {'include_sub_reseller_customers': include_sub} + return {'status': True, 'msg': 'ok', 'data': detail} + + +async def finance_billing_overview(ns=None): + """ + 计费概览页 — 销售/利润/应付上级 汇总(单接口,建议概览页只调此接口)。 + + 入参 ns: + accounting_orgid (必填) 当前账本机构 + include_sub_reseller_customers (可选) 默认 true;是否含下级分销商客户带来的金额 + start_date / end_date (可选) 按 bill.bill_date 筛选 + bill_state (可选) 仅统计指定账单状态,默认不筛(含未记账则利润/结算为估算) + only_accounted (可选) true 时仅 bill_state=1(推荐概览页使用) + userid (可选) 权限 + max_bills (可选) 默认 5000,超出则截断并返回 truncated=true + + 口径(与订单明细接口一致): + sales_total = 客户实付合计 bill.amount + profit_total = 本级账本折扣收入+底价收入(bill_detail)或协议估算 + settle_upstream_total= 本机构→供应商待结转;分销→上级账本分销商存放资金借方 + + 分段: + direct_customers 直属客户(cust.parentid = accounting_orgid) + from_sub_resellers 下级分销商的客户(cust.parentid in 下级分销 org) + + 返回 by_provider_product:按供应商 orgid + 产品 id 拆分上述三项及合计。 + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), True) + only_accounted = _parse_bool(ns.get('only_accounted'), False) + try: + max_bills = int(ns.get('max_bills', 5000) or 5000) + except (TypeError, ValueError): + max_bills = 5000 + max_bills = max(100, min(max_bills, 20000)) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if only_accounted: + conditions.append("b.bill_state = '1'") + elif ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + fetch = await _fetch_overview_bill_rows(sor, where_sql, params, max_bills) + bill_rows = fetch['rows'] + truncated = fetch['truncated'] + reseller_id_set = {_norm_org_id(r) for r in reseller_ids if _norm_org_id(r)} + + totals = { + 'direct_customers': _new_amount_bucket(), + 'from_sub_resellers': _new_amount_bucket(), + 'grand_total': _new_amount_bucket(), + } + by_pp = {} + provider_names = {} + product_names = {} + skipped = 0 + errors = [] + + for row in bill_rows: + seg = _customer_segment( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_id_set, + ) + if seg is None: + skipped += 1 + continue + try: + sales, profit, settle, _src = await _bill_finance_amounts( + sor, row, accounting_orgid, is_owner, parent_orgid, + ) + except Exception as exc: + errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + + _bucket_add(totals[seg], sales, profit, settle) + _bucket_add(totals['grand_total'], sales, profit, settle) + + pid = _row_get(row, 'providerid') or '' + prid = _row_get(row, 'productid') or '' + key = (pid, prid) + if key not in by_pp: + by_pp[key] = { + 'provider_orgid': pid, + 'product_id': prid, + 'direct_customers': _new_amount_bucket(), + 'from_sub_resellers': _new_amount_bucket(), + 'total': _new_amount_bucket(), + } + _bucket_add(by_pp[key][seg], sales, profit, settle) + _bucket_add(by_pp[key]['total'], sales, profit, settle) + + breakdown = [] + for (pid, prid), node in by_pp.items(): + breakdown.append({ + 'provider': { + 'orgid': pid, + 'name': await _provider_name_cache(sor, provider_names, pid), + }, + 'product': { + 'id': prid, + 'name': await _product_name_cache(sor, product_names, prid), + }, + 'direct_customers': _bucket_round(node['direct_customers']), + 'from_sub_resellers': _bucket_round(node['from_sub_resellers']), + 'total': _bucket_round(node['total']), + }) + breakdown.sort(key=lambda x: float(x['total']['sales_total'] or 0), reverse=True) + + settle_target = await _settle_upstream_meta(sor, accounting_orgid, None) + if is_owner: + settle_label = '应付供应商' + else: + settle_label = '应付上级机构' + + data = { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'settle_upstream_label': settle_label, + 'settle_upstream_target': { + 'type': settle_target['settle_upstream_type'], + 'orgid': settle_target['settle_upstream_orgid'], + 'name': settle_target['settle_upstream_orgname'], + }, + 'period': { + 'start_date': ns.get('start_date'), + 'end_date': ns.get('end_date'), + }, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'filters': { + 'only_accounted': only_accounted, + 'bill_state': ns.get('bill_state'), + }, + 'bill_count': len(bill_rows), + 'bill_id_count': fetch['total_ids'], + 'id_query_len': fetch['id_query_len'], + 'truncated': truncated, + 'max_bills': max_bills, + 'skipped_out_of_scope': skipped, + 'totals': { + 'direct_customers': _bucket_round(totals['direct_customers']), + 'from_sub_resellers': _bucket_round(totals['from_sub_resellers']), + 'grand_total': _bucket_round(totals['grand_total']), + }, + 'by_provider_product': breakdown, + 'errors': errors, + } + if _parse_bool(ns.get('debug'), False) or ( + data['totals']['grand_total']['bill_count'] == 0 + and fetch['id_query_len'] > 0 + ): + data['debug'] = { + 'id_query_len': fetch['id_query_len'], + 'bill_id_count': fetch['total_ids'], + 'overview_rows_len': len(bill_rows), + 'skipped_out_of_scope': skipped, + 'sample_parentid': _row_get(bill_rows[0], 'customer_parentid') if bill_rows else None, + } + return {'status': True, 'msg': 'ok', 'data': data} + + +_report = params_kw.get('report') or params_kw.get('api') or 'order_list' +if _report in ('overview', 'billing_overview', 'finance_billing_overview'): + ret = await finance_billing_overview(params_kw) +elif _report in ('detail', 'order_detail'): + ret = await finance_order_report_detail(params_kw) +else: + ret = await finance_order_report(params_kw) +return ret \ No newline at end of file diff --git a/b/bill/finance_order_report1.dspy b/b/bill/finance_order_report1.dspy new file mode 100644 index 0000000..5af70f2 --- /dev/null +++ b/b/bill/finance_order_report1.dspy @@ -0,0 +1,887 @@ +# -*- coding: utf-8 -*- +""" +管理人员财务订单报表接口。 + +视角:以 accounting_orgid(账本机构)为查看主体,展示客户订单的定价、利润、本级应付结算。 + +依赖运行环境:DBPools, sor.sqlExe, sor.R + +业务口径(已确认): + - product_salemode.providerid = provider.orgid + - 分销商不直接对供应商结算;仅本机构(parentid 为空)对供应商结算 + - 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算) + - settle_upstream_amount:本级应付(本机构→供应商;分销→直接上级机构) + - profit_amount:本级账本「折扣收入」「底价收入」贷方合计 + - include_sub_reseller_customers:是否包含下级分销商发展的客户 +""" + +async def get_parent_orgid(sor, orgid): + sql = """select a.id from organization a, organization b +where b.parentid = a.id + and a.del_flg = '0' + and b.del_flg = '0' + and b.id = ${orgid}$""" + recs = await sor.sqlExe(sql, {'orgid':orgid}) + if len(recs) == 0: + return None + return recs[0]['id'] + +DBNAME = 'kboss' +RESELLER_ORG = '1' +OWNER_OGR = '0' +CORP_CUSTOMER = '2' +PERSONAL = '3' +PROVIDER = '4' +PUBLISHER = '5' +UNDERWRITER = '6' + +PARTY_OWNER = '本机构' +PARTY_CUSTOMER = '客户' +PARTY_RESELLER = '分销商' +PARTY_PROVIDER = '供应商' +PARTY_PUBLISHER = '算力券发行方' +PARTY_UNDERWRITER = '算力券承销方' + +DEBT = '借' +CREDIT = '贷' + +ACTNAME_BUY = '付费' +ACTNAME_RECHARGE = '充值' +ACTNAME_RECHARGE_ALIPAY = '支付宝充值' +ACTNAME_SETTLE = '结算' + +SALEMODE_DISCOUNT = '折扣' +SALEMODE_REBATE = '代付费' +SALEMODE_FLOORPRICE = '底价' + +ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY' +ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE' +ACTION_RECHARGE = 'RECHARGE' +ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE' +ACTION_BUY = 'BUY' +ACTION_REVERSE_BUY = 'BUY_REVERSE' +ACTION_RENEW = 'RENEW' +ACTION_RENEW_REVERSE = 'RENEW_REVERSE' +ACTION_SETTLE = 'SETTLE' +ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE' + +DBNAME = 'kboss' +CUSTOMER_ORG_TYPES = ('2', '3') + +SALEMODE_LABEL = { + '0': SALEMODE_DISCOUNT, + '1': SALEMODE_REBATE, + '2': SALEMODE_FLOORPRICE, +} + +INCOME_SUBJECTS = ('折扣收入', '底价收入') +PARENT_SETTLE_SUBJECT = '分销商存放资金' +SUPPLIER_SETTLE_PREFIX = '待结转' + +_SALEMODE_SQL_OWN = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + +_SALEMODE_SQL_OFFER = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.offer_orgid = ${offer_orgid}$ + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + + +def _round_money(v): + if v is None: + return None + return round(float(v), 2) + + +def _salemode_label(code): + if code is None: + return None + return SALEMODE_LABEL.get(str(code), str(code)) + + +def _parse_bool(v, default=False): + if v is None: + return default + if isinstance(v, bool): + return v + return str(v).strip().lower() in ('1', 'true', 'yes', 'on') + + +def _parse_page(ns, default_page=1, default_size=20, max_size=100): + """解析分页,避免 page_size=0 导致 LIMIT 0 无数据。""" + try: + page_size = int(ns.get('page_size', default_size) or default_size) + except (TypeError, ValueError): + page_size = default_size + try: + current_page = int(ns.get('current_page', default_page) or default_page) + except (TypeError, ValueError): + current_page = default_page + page_size = max(1, min(page_size, max_size)) + current_page = max(1, current_page) + offset = (current_page - 1) * page_size + return current_page, page_size, offset + + +def _sql_rows(result): + """统一 sqlExe 查询结果为 list[dict]。""" + if result is None: + return [] + if isinstance(result, list): + return result + if isinstance(result, dict): + return [result] + return list(result) + + +def _row_get(row, *keys, default=None): + """兼容不同驱动返回的大小写字段名。""" + if not row: + return default + for key in keys: + if key in row: + return row[key] + lower = key.lower() + for k, v in row.items(): + if k.lower() == lower: + return v + return default + + +async def _check_viewer(sor, accounting_orgid, userid=None): + if not userid: + return True, None + users = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not users: + return False, '用户不存在' + user_orgid = users[0].get('orgid') + if user_orgid == accounting_orgid: + return True, None + parent = await get_parent_orgid(sor, accounting_orgid) + if parent and user_orgid == parent: + return True, None + return False, '无权查看该机构财务数据' + + +async def _org_name(sor, orgid): + if not orgid: + return None + rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'}) + return rows[0]['orgname'] if rows else None + + +async def _is_business_owner(sor, orgid): + rows = await sor.sqlExe( + "SELECT id FROM organization WHERE id=${id}$ AND parentid IS NULL AND del_flg='0'", + {'id': orgid}, + ) + return len(rows) > 0 + + +async def _collect_descendant_reseller_ids(sor, root_orgid): + """递归收集 root 下所有下级分销商 organization.id(org_type=1)。""" + found = [] + queue = [root_orgid] + seen = {root_orgid} + while queue: + pid = queue.pop(0) + rows = await sor.sqlExe( + """SELECT id FROM organization + WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""", + {'pid': pid, 'org_type': RESELLER_ORG}, + ) + for r in rows: + cid = r['id'] + if cid not in seen: + seen.add(cid) + found.append(cid) + queue.append(cid) + return found + + +async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers): + """ + 客户范围 SQL 片段与参数。 + 直属:cust.parentid = accounting_orgid + 含下级分销:直属 + cust.parentid IN (下级分销商 id 列表) + """ + params = {'accounting_orgid': accounting_orgid} + if not include_sub_reseller_customers: + return "cust.parentid = ${accounting_orgid}$", params, [] + + reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid) + if not reseller_ids: + return "cust.parentid = ${accounting_orgid}$", params, [] + + in_keys = [] + for i, rid in enumerate(reseller_ids): + key = 'reseller_%d' % i + params[key] = rid + in_keys.append('${%s}$' % key) + in_sql = ', '.join(in_keys) + cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql + return cond, params, reseller_ids + + +async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub): + if customer_parentid == accounting_orgid: + return True + if include_sub and customer_parentid in reseller_ids: + return True + return False + + +async def _fetch_salemode_row(sor, sql, params): + rows = await sor.sqlExe(sql, params) + return rows[0] if rows else None + + +async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid): + """本级应付对象:本机构→供应商;分销→直接上级。""" + if await _is_business_owner(sor, accounting_orgid): + return { + 'settle_upstream_type': 'supplier', + 'settle_upstream_orgid': provider_orgid, + 'settle_upstream_orgname': await _org_name(sor, provider_orgid), + 'immediate_parent_orgid': None, + } + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + return { + 'settle_upstream_type': 'parent_org', + 'settle_upstream_orgid': parent_orgid, + 'settle_upstream_orgname': await _org_name(sor, parent_orgid), + 'immediate_parent_orgid': parent_orgid, + } + + +async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity): + curdate = bill_date + if hasattr(curdate, 'strftime'): + curdate = curdate.strftime('%Y-%m-%d') + elif curdate is not None: + curdate = str(curdate)[:10] + + qty = int(quantity or 1) + base = {'providerid': providerid, 'productid': productid, 'curdate': curdate} + + own = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid), + ) + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid), + ) + if not cust: + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid='*'), + ) + + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + reseller = None + if parent_orgid: + reseller = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid), + ) + + return { + 'parent_orgid': parent_orgid, + 'parent_orgname': await _org_name(sor, parent_orgid), + 'own_salemode': own.get('salemode') if own else None, + 'own_discount': own.get('discount') if own else None, + 'own_floor_unit_price': own.get('price') if own else None, + 'customer_salemode': cust.get('salemode') if cust else None, + 'customer_discount': cust.get('discount') if cust else None, + 'customer_floor_unit_price': cust.get('price') if cust else None, + 'reseller_discount': reseller.get('discount') if reseller else None, + 'reseller_salemode': reseller.get('salemode') if reseller else None, + 'reseller_floor_unit_price': reseller.get('price') if reseller else None, + 'quantity': qty, + } + + +def _estimate_finance(catalog_amount, protocol, is_owner): + """未记账:估算本级利润与本级应付(不含代付费单独字段)。""" + catalog_amount = float(catalog_amount or 0) + qty = protocol.get('quantity') or 1 + profit = None + settle_upstream = None + + own_mode = protocol.get('own_salemode') + cust_mode = protocol.get('customer_salemode') + own_disc = protocol.get('own_discount') + cust_disc = protocol.get('customer_discount') + reseller_disc = protocol.get('reseller_discount') + + if own_mode == '0' or cust_mode == '0': + if own_disc is not None and cust_disc is not None: + profit = catalog_amount * (float(cust_disc) - float(own_disc)) + if is_owner: + if own_disc is not None: + settle_upstream = catalog_amount * float(own_disc) + elif reseller_disc is not None: + settle_upstream = catalog_amount * float(reseller_disc) + + elif own_mode == '2' or cust_mode == '2': + own_price = protocol.get('own_floor_unit_price') + cust_price = protocol.get('customer_floor_unit_price') + reseller_price = protocol.get('reseller_floor_unit_price') + if own_price is not None and cust_price is not None: + profit = (float(cust_price) - float(own_price)) * qty + if is_owner and own_price is not None: + settle_upstream = float(own_price) * qty + elif not is_owner and reseller_price is not None: + settle_upstream = float(reseller_price) * qty + elif not is_owner and cust_price is not None: + settle_upstream = float(cust_price) * qty + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream), + 'amount_source': 'estimated', + } + + +async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_owner, parent_orgid): + """已记账:从 bill_detail 取本级利润与本级应付。""" + sql = """ +SELECT accounting_orgid, subjectname, accounting_dir, participantid, participanttype, amount +FROM bill_detail WHERE billid = ${billid}$ AND del_flg = '0' +""" + rows = await sor.sqlExe(sql, {'billid': billid}) + profit = 0.0 + settle_upstream = 0.0 + legs = [] + + for r in rows: + legs.append({ + 'accounting_orgid': r['accounting_orgid'], + 'subjectname': r['subjectname'], + 'accounting_dir': r['accounting_dir'], + 'participanttype': r.get('participanttype'), + 'participantid': r.get('participantid'), + 'amount': _round_money(r['amount']), + }) + amt = float(r['amount'] or 0) + book = r['accounting_orgid'] + subj = r['subjectname'] or '' + direction = r['accounting_dir'] + + if book == accounting_orgid and subj in INCOME_SUBJECTS and direction == '贷': + profit += amt + + if is_owner: + if ( + book == accounting_orgid + and subj.startswith(SUPPLIER_SETTLE_PREFIX) + and direction == '贷' + ): + settle_upstream += amt + else: + if ( + parent_orgid + and book == parent_orgid + and subj == PARENT_SETTLE_SUBJECT + and direction == '借' + and r.get('participantid') == accounting_orgid + ): + settle_upstream += amt + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0), + 'amount_source': 'bill_detail', + 'bill_detail_legs': legs, + } + + +async def _build_report_row(sor, row, accounting_orgid, is_owner): + bill_id = _row_get(row, 'bill_id') + customerid = _row_get(row, 'customerid') + providerid = _row_get(row, 'providerid') + productid = _row_get(row, 'productid') + bill_date = _row_get(row, 'bill_date') + quantity = _row_get(row, 'quantity') or 1 + catalog_amount = _round_money(_row_get(row, 'catalog_amount')) + customer_pay = _round_money(_row_get(row, 'customer_pay_amount')) + customer_parentid = _row_get(row, 'customer_parentid') + + protocol = await _protocol_snapshot( + sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity, + ) + parent_orgid = protocol['parent_orgid'] + settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid) + + bill_state = _row_get(row, 'bill_state') + if str(bill_state) == '1': + amounts = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + amounts = _estimate_finance(catalog_amount, protocol, is_owner) + amounts['bill_detail_legs'] = [] + + product_name = _row_get(row, 'product_name') + if not product_name and productid: + prows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if prows: + product_name = prows[0].get('name') + provider_name = _row_get(row, 'provider_name') + if not provider_name and providerid: + prov_rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1", + {'orgid': providerid}, + ) + if prov_rows: + provider_name = prov_rows[0].get('name') + + serving_reseller_id = None + serving_reseller_name = None + is_direct = customer_parentid == accounting_orgid + if not is_direct and customer_parentid: + serving_reseller_id = customer_parentid + serving_reseller_name = await _org_name(sor, customer_parentid) + + order_date = _row_get(row, 'order_date') + servicename = _row_get(row, 'servicename') + + return { + 'bill_id': bill_id, + 'order_id': _row_get(row, 'order_id'), + 'bill_date': str(bill_date)[:10] if bill_date else None, + 'order_date': str(order_date)[:19] if order_date else None, + 'business_op': _row_get(row, 'business_op'), + 'bill_state': bill_state, + 'accounted': str(bill_state) == '1', + 'customer': { + 'id': customerid, + 'name': _row_get(row, 'customer_name'), + 'parent_orgid': customer_parentid, + 'is_direct_customer': is_direct, + 'serving_reseller': { + 'id': serving_reseller_id, + 'name': serving_reseller_name, + } if serving_reseller_id else None, + }, + 'product': { + 'id': productid, + 'name': product_name or servicename, + 'servicename': servicename, + }, + 'provider': { + 'orgid': providerid, + 'name': provider_name, + }, + 'quantity': quantity, + 'pricing': { + 'catalog_amount': catalog_amount, + 'list_price_unit': _round_money(_row_get(row, 'list_price')), + 'order_discount': _round_money(_row_get(row, 'order_discount')) + if _row_get(row, 'order_discount') is not None + else None, + 'order_unit_price': _round_money(_row_get(row, 'order_unit_price')), + 'customer_pay_amount': customer_pay, + }, + 'viewer_org': { + 'id': accounting_orgid, + 'is_business_owner': is_owner, + 'immediate_parent': { + 'id': settle_meta.get('immediate_parent_orgid'), + 'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')), + } if settle_meta.get('immediate_parent_orgid') else None, + }, + 'protocol': { + 'parent_salemode': _salemode_label(protocol.get('own_salemode')), + 'parent_discount_to_us': _round_money(protocol.get('own_discount')), + 'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')), + 'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')), + 'our_discount_to_customer': _round_money(protocol.get('customer_discount')), + 'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')), + 'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')), + }, + 'finance': { + 'profit_amount': amounts['profit_amount'], + 'settle_upstream_amount': amounts['settle_upstream_amount'], + 'settle_upstream_type': settle_meta['settle_upstream_type'], + 'settle_upstream_target': { + 'id': settle_meta['settle_upstream_orgid'], + 'name': settle_meta['settle_upstream_orgname'], + }, + 'amount_source': amounts['amount_source'], + }, + 'bill_detail_legs': amounts.get('bill_detail_legs', []), + } + + +def _bill_core_from_clause(): + """统计/查 ID 用最小 JOIN(兼容无 ordergoodsid 字段的库)。""" + return """ +FROM bill b +INNER JOIN bz_order o ON b.orderid = o.id +INNER JOIN organization cust ON b.customerid = cust.id +""" + + +def _bill_ids_sql(where_sql): + """不在 SQL 里写 LIMIT:部分环境 sqlExe 对 LIMIT 会返回空列表。""" + return f""" +SELECT b.id AS bill_id +{_bill_core_from_clause()} +WHERE {where_sql} +ORDER BY b.bill_date DESC, b.create_at DESC +""" + + +def _count_sql(where_sql): + return f""" +SELECT COUNT(*) AS total_count +{_bill_core_from_clause()} +WHERE {where_sql} +""" + + +def _normalize_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + return s if s else None + + +async def _fetch_bill_row_via_R(sor, bill_id): + """ + 用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。 + 已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。 + """ + bid = _normalize_bill_id(bill_id) + if not bid: + return None + + bills = await sor.R('bill', {'id': bid, 'del_flg': '0'}) + if not bills: + bills = await sor.R('bill', {'id': bid}) + if not bills: + return None + b = bills[0] + + o = {} + oid = b.get('orderid') + if oid: + orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'}) + if not orders: + orders = await sor.R('bz_order', {'id': oid}) + if orders: + o = orders[0] + + cust = {} + cid = b.get('customerid') + if cid: + custs = await sor.R('organization', {'id': cid, 'del_flg': '0'}) + if not custs: + custs = await sor.R('organization', {'id': cid}) + if custs: + cust = custs[0] + + og = {} + ogid = b.get('ordergoodsid') + if ogid: + ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'}) + if ogs: + og = ogs[0] + if not og and oid: + ogs = await sor.sqlExe( + """SELECT list_price, discount, price FROM order_goods + WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""", + {'oid': oid}, + ) + if ogs: + og = ogs[0] + + return { + 'bill_id': b.get('id'), + 'order_id': oid, + 'customerid': cid, + 'productid': b.get('productid'), + 'providerid': b.get('providerid'), + 'bill_date': b.get('bill_date'), + 'bill_state': b.get('bill_state'), + 'catalog_amount': b.get('provider_amt'), + 'customer_pay_amount': b.get('amount'), + 'quantity': b.get('quantity'), + 'order_date': o.get('order_date'), + 'business_op': o.get('business_op'), + 'servicename': o.get('servicename'), + 'customer_name': cust.get('orgname'), + 'customer_parentid': cust.get('parentid'), + 'list_price': og.get('list_price'), + 'order_discount': og.get('discount'), + 'order_unit_price': og.get('price'), + } + + +async def _fetch_bill_rows_by_ids(sor, bill_ids): + rows = [] + for bid in bill_ids: + one = await _fetch_bill_row_via_R(sor, bid) + if one: + rows.append(one) + return rows + + +async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size): + """ + 先查全部符合条件的 bill_id(通常数量可控),再在 Python 分页; + 明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。 + """ + id_sql = _bill_ids_sql(where_sql) + id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + total = len(all_ids) + page_ids = all_ids[offset: offset + page_size] + if not page_ids: + return total, [] + ordered = await _fetch_bill_rows_by_ids(sor, page_ids) + return total, ordered + + +async def _finance_report_debug(sor, where_sql, params, page_size, offset): + """debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。""" + dbg = {} + try: + c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params)) + dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None + except Exception as e: + dbg['count_error'] = str(e) + try: + ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + dbg['ids_query_len'] = len(ids) + except Exception as e: + dbg['ids_error'] = str(e) + try: + lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset) + lim = _sql_rows(await sor.sqlExe(lim_sql, params)) + dbg['ids_with_limit_len'] = len(lim) + except Exception as e: + dbg['ids_with_limit_error'] = str(e) + try: + page_ids = [] + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + if id_rows: + dbg['first_id_row_keys'] = list(id_rows[0].keys()) + dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID') + for r in id_rows[offset: offset + page_size]: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid: + page_ids.append(bid) + dbg['page_ids_len'] = len(page_ids) + if page_ids: + dbg['sample_page_id'] = page_ids[0] + rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'}) + dbg['sor_R_bill_count'] = len(rb) if rb else 0 + one = await _fetch_bill_row_via_R(sor, page_ids[0]) + dbg['sor_R_row_ok'] = one is not None + except Exception as e: + dbg['detail_fetch_error'] = str(e) + return dbg + + +async def finance_order_report(ns=None): + """ + 管理人员 — 客户订单财务列表(分页)。 + + 入参 ns: + accounting_orgid (必填) 账本机构 + include_sub_reseller_customers (可选) 默认 false;true 时含下级分销商的客户订单 + userid, start_date, end_date, customerid, productid, order_id, bill_state + current_page (默认1), page_size (默认20, 最大100) + + 返回 finance.settle_upstream_*: + - 本机构:应付供应商(待结转* 贷方,仅本机构账本) + - 分销商:应付直接上级(上级账本「分销商存放资金」借方,participant=本级) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + current_page, page_size, offset = _parse_page(ns) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if ns.get('customerid'): + conditions.append('b.customerid = ${customerid}$') + params['customerid'] = ns['customerid'] + if ns.get('productid'): + conditions.append('b.productid = ${productid}$') + params['productid'] = ns['productid'] + if ns.get('order_id'): + conditions.append('b.orderid = ${order_id}$') + params['order_id'] = ns['order_id'] + if ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + if total_count > 0 and offset >= total_count: + current_page = 1 + offset = 0 + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + + debug_info = None + if _parse_bool(ns.get('debug'), False): + debug_info = await _finance_report_debug( + sor, where_sql, params, page_size, offset, + ) + debug_info['page_ids_fetched'] = len(rows) + + items = [] + sum_profit = 0.0 + sum_pay = 0.0 + sum_upstream = 0.0 + row_errors = [] + + for row in rows: + try: + item = await _build_report_row(sor, row, accounting_orgid, is_owner) + except Exception as exc: + row_errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + items.append(item) + fin = item['finance'] + if fin.get('profit_amount') is not None: + sum_profit += fin['profit_amount'] + pay = item['pricing'].get('customer_pay_amount') + if pay is not None: + sum_pay += pay + up = fin.get('settle_upstream_amount') + if up is not None: + sum_upstream += up + + return { + 'status': True, + 'msg': 'ok', + 'data': { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'summary': { + 'customer_pay_total': _round_money(sum_pay), + 'profit_total': _round_money(sum_profit), + 'settle_upstream_total': _round_money(sum_upstream), + }, + 'items': items, + 'row_errors': row_errors, + 'list_row_count': len(rows), + 'debug': debug_info, + }, + } + + +async def finance_order_report_detail(ns=None): + """ + 管理人员 — 单笔账单财务明细。 + + 入参:accounting_orgid, bill_id, include_sub_reseller_customers(与列表一致,用于校验客户范围) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + bill_id = ns.get('bill_id') + if not accounting_orgid or not bill_id: + return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + _, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub) + is_owner = await _is_business_owner(sor, accounting_orgid) + + rows = await _fetch_bill_rows_by_ids(sor, [bill_id]) + if not rows: + return {'status': False, 'msg': '账单不存在'} + + row = rows[0] + if not await _customer_in_scope( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_ids, + include_sub, + ): + return {'status': False, 'msg': '该账单不在当前查询客户范围内'} + + detail = await _build_report_row(sor, row, accounting_orgid, is_owner) + detail['customer_scope'] = {'include_sub_reseller_customers': include_sub} + return {'status': True, 'msg': 'ok', 'data': detail} + + +ret = await finance_order_report(params_kw) +return ret \ No newline at end of file diff --git a/b/bill/finance_order_report_detail.dspy b/b/bill/finance_order_report_detail.dspy new file mode 100644 index 0000000..b4f366e --- /dev/null +++ b/b/bill/finance_order_report_detail.dspy @@ -0,0 +1,1231 @@ +# -*- coding: utf-8 -*- +""" +管理人员财务订单报表接口。 + +视角:以 accounting_orgid(账本机构)为查看主体,展示客户订单的定价、利润、本级应付结算。 + +依赖运行环境:DBPools, sor.sqlExe, sor.R + +业务口径(已确认): + - product_salemode.providerid = provider.orgid + - 分销商不直接对供应商结算;仅本机构(parentid 为空)对供应商结算 + - 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算) + - settle_upstream_amount:本级应付(本机构→供应商;分销→直接上级机构) + - profit_amount:本级账本「折扣收入」「底价收入」贷方合计 + - include_sub_reseller_customers:是否包含下级分销商发展的客户 +""" + +async def get_parent_orgid(sor, orgid): + sql = """select a.id from organization a, organization b +where b.parentid = a.id + and a.del_flg = '0' + and b.del_flg = '0' + and b.id = ${orgid}$""" + recs = await sor.sqlExe(sql, {'orgid':orgid}) + if len(recs) == 0: + return None + return recs[0]['id'] + +DBNAME = 'kboss' +RESELLER_ORG = '1' +OWNER_OGR = '0' +CORP_CUSTOMER = '2' +PERSONAL = '3' +PROVIDER = '4' +PUBLISHER = '5' +UNDERWRITER = '6' + +PARTY_OWNER = '本机构' +PARTY_CUSTOMER = '客户' +PARTY_RESELLER = '分销商' +PARTY_PROVIDER = '供应商' +PARTY_PUBLISHER = '算力券发行方' +PARTY_UNDERWRITER = '算力券承销方' + +DEBT = '借' +CREDIT = '贷' + +ACTNAME_BUY = '付费' +ACTNAME_RECHARGE = '充值' +ACTNAME_RECHARGE_ALIPAY = '支付宝充值' +ACTNAME_SETTLE = '结算' + +SALEMODE_DISCOUNT = '折扣' +SALEMODE_REBATE = '代付费' +SALEMODE_FLOORPRICE = '底价' + +ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY' +ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE' +ACTION_RECHARGE = 'RECHARGE' +ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE' +ACTION_BUY = 'BUY' +ACTION_REVERSE_BUY = 'BUY_REVERSE' +ACTION_RENEW = 'RENEW' +ACTION_RENEW_REVERSE = 'RENEW_REVERSE' +ACTION_SETTLE = 'SETTLE' +ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE' + +DBNAME = 'kboss' +CUSTOMER_ORG_TYPES = ('2', '3') + +SALEMODE_LABEL = { + '0': SALEMODE_DISCOUNT, + '1': SALEMODE_REBATE, + '2': SALEMODE_FLOORPRICE, +} + +INCOME_SUBJECTS = ('折扣收入', '底价收入') +PARENT_SETTLE_SUBJECT = '分销商存放资金' +SUPPLIER_SETTLE_PREFIX = '待结转' + +_SALEMODE_SQL_OWN = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + +_SALEMODE_SQL_OFFER = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.offer_orgid = ${offer_orgid}$ + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + + +def _round_money(v): + if v is None: + return None + return round(float(v), 2) + + +def _salemode_label(code): + if code is None: + return None + return SALEMODE_LABEL.get(str(code), str(code)) + + +def _parse_bool(v, default=False): + if v is None: + return default + if isinstance(v, bool): + return v + return str(v).strip().lower() in ('1', 'true', 'yes', 'on') + + +def _parse_page(ns, default_page=1, default_size=20, max_size=100): + """解析分页,避免 page_size=0 导致 LIMIT 0 无数据。""" + try: + page_size = int(ns.get('page_size', default_size) or default_size) + except (TypeError, ValueError): + page_size = default_size + try: + current_page = int(ns.get('current_page', default_page) or default_page) + except (TypeError, ValueError): + current_page = default_page + page_size = max(1, min(page_size, max_size)) + current_page = max(1, current_page) + offset = (current_page - 1) * page_size + return current_page, page_size, offset + + +def _sql_rows(result): + """统一 sqlExe 查询结果为 list[dict]。""" + if result is None: + return [] + if isinstance(result, list): + return result + if isinstance(result, dict): + return [result] + return list(result) + + +def _row_get(row, *keys, default=None): + """兼容不同驱动返回的大小写字段名。""" + if not row: + return default + for key in keys: + if key in row: + return row[key] + lower = key.lower() + for k, v in row.items(): + if k.lower() == lower: + return v + return default + + +async def _check_viewer(sor, accounting_orgid, userid=None): + if not userid: + return True, None + users = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not users: + return False, '用户不存在' + user_orgid = users[0].get('orgid') + if user_orgid == accounting_orgid: + return True, None + parent = await get_parent_orgid(sor, accounting_orgid) + if parent and user_orgid == parent: + return True, None + return False, '无权查看该机构财务数据' + + +async def _org_name(sor, orgid): + if not orgid: + return None + rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'}) + return rows[0]['orgname'] if rows else None + + +async def _is_business_owner(sor, orgid): + rows = await sor.sqlExe( + "SELECT id FROM organization WHERE id=${id}$ AND parentid IS NULL AND del_flg='0'", + {'id': orgid}, + ) + return len(rows) > 0 + + +async def _collect_descendant_reseller_ids(sor, root_orgid): + """递归收集 root 下所有下级分销商 organization.id(org_type=1)。""" + found = [] + queue = [root_orgid] + seen = {root_orgid} + while queue: + pid = queue.pop(0) + rows = await sor.sqlExe( + """SELECT id FROM organization + WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""", + {'pid': pid, 'org_type': RESELLER_ORG}, + ) + for r in rows: + cid = r['id'] + if cid not in seen: + seen.add(cid) + found.append(cid) + queue.append(cid) + return found + + +async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers): + """ + 客户范围 SQL 片段与参数。 + 直属:cust.parentid = accounting_orgid + 含下级分销:直属 + cust.parentid IN (下级分销商 id 列表) + """ + params = {'accounting_orgid': accounting_orgid} + if not include_sub_reseller_customers: + return "cust.parentid = ${accounting_orgid}$", params, [] + + reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid) + if not reseller_ids: + return "cust.parentid = ${accounting_orgid}$", params, [] + + in_keys = [] + for i, rid in enumerate(reseller_ids): + key = 'reseller_%d' % i + params[key] = rid + in_keys.append('${%s}$' % key) + in_sql = ', '.join(in_keys) + cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql + return cond, params, reseller_ids + + +async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub): + if customer_parentid == accounting_orgid: + return True + if include_sub and customer_parentid in reseller_ids: + return True + return False + + +async def _fetch_salemode_row(sor, sql, params): + rows = await sor.sqlExe(sql, params) + return rows[0] if rows else None + + +async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid): + """本级应付对象:本机构→供应商;分销→直接上级。""" + if await _is_business_owner(sor, accounting_orgid): + return { + 'settle_upstream_type': 'supplier', + 'settle_upstream_orgid': provider_orgid, + 'settle_upstream_orgname': await _org_name(sor, provider_orgid), + 'immediate_parent_orgid': None, + } + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + return { + 'settle_upstream_type': 'parent_org', + 'settle_upstream_orgid': parent_orgid, + 'settle_upstream_orgname': await _org_name(sor, parent_orgid), + 'immediate_parent_orgid': parent_orgid, + } + + +async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity): + curdate = bill_date + if hasattr(curdate, 'strftime'): + curdate = curdate.strftime('%Y-%m-%d') + elif curdate is not None: + curdate = str(curdate)[:10] + + qty = int(quantity or 1) + base = {'providerid': providerid, 'productid': productid, 'curdate': curdate} + + own = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid), + ) + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid), + ) + if not cust: + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid='*'), + ) + + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + reseller = None + if parent_orgid: + reseller = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid), + ) + + return { + 'parent_orgid': parent_orgid, + 'parent_orgname': await _org_name(sor, parent_orgid), + 'own_salemode': own.get('salemode') if own else None, + 'own_discount': own.get('discount') if own else None, + 'own_floor_unit_price': own.get('price') if own else None, + 'customer_salemode': cust.get('salemode') if cust else None, + 'customer_discount': cust.get('discount') if cust else None, + 'customer_floor_unit_price': cust.get('price') if cust else None, + 'reseller_discount': reseller.get('discount') if reseller else None, + 'reseller_salemode': reseller.get('salemode') if reseller else None, + 'reseller_floor_unit_price': reseller.get('price') if reseller else None, + 'quantity': qty, + } + + +def _estimate_finance(catalog_amount, protocol, is_owner): + """未记账:估算本级利润与本级应付(不含代付费单独字段)。""" + catalog_amount = float(catalog_amount or 0) + qty = protocol.get('quantity') or 1 + profit = None + settle_upstream = None + + own_mode = protocol.get('own_salemode') + cust_mode = protocol.get('customer_salemode') + own_disc = protocol.get('own_discount') + cust_disc = protocol.get('customer_discount') + reseller_disc = protocol.get('reseller_discount') + + if own_mode == '0' or cust_mode == '0': + if own_disc is not None and cust_disc is not None: + profit = catalog_amount * (float(cust_disc) - float(own_disc)) + if is_owner: + if own_disc is not None: + settle_upstream = catalog_amount * float(own_disc) + elif reseller_disc is not None: + settle_upstream = catalog_amount * float(reseller_disc) + + elif own_mode == '2' or cust_mode == '2': + own_price = protocol.get('own_floor_unit_price') + cust_price = protocol.get('customer_floor_unit_price') + reseller_price = protocol.get('reseller_floor_unit_price') + if own_price is not None and cust_price is not None: + profit = (float(cust_price) - float(own_price)) * qty + if is_owner and own_price is not None: + settle_upstream = float(own_price) * qty + elif not is_owner and reseller_price is not None: + settle_upstream = float(reseller_price) * qty + elif not is_owner and cust_price is not None: + settle_upstream = float(cust_price) * qty + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream), + 'amount_source': 'estimated', + } + + +async def _bill_detail_rows(sor, billid): + """单表读 bill_detail,优先 sor.R(避免 sqlExe 多表问题)。""" + rows = await sor.R('bill_detail', {'billid': billid, 'del_flg': '0'}) + if rows: + return rows + return _sql_rows(await sor.sqlExe( + """SELECT accounting_orgid, subjectname, accounting_dir, + participantid, participanttype, amount + FROM bill_detail WHERE billid=${billid}$ AND del_flg='0'""", + {'billid': billid}, + )) + + +async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_owner, parent_orgid): + """已记账:从 bill_detail 取本级利润与本级应付。""" + rows = await _bill_detail_rows(sor, billid) + profit = 0.0 + settle_upstream = 0.0 + legs = [] + + for r in rows: + legs.append({ + 'accounting_orgid': r['accounting_orgid'], + 'subjectname': r['subjectname'], + 'accounting_dir': r['accounting_dir'], + 'participanttype': r.get('participanttype'), + 'participantid': r.get('participantid'), + 'amount': _round_money(r['amount']), + }) + amt = float(r['amount'] or 0) + book = r['accounting_orgid'] + subj = r['subjectname'] or '' + direction = r['accounting_dir'] + + if book == accounting_orgid and subj in INCOME_SUBJECTS and direction == '贷': + profit += amt + + if is_owner: + if ( + book == accounting_orgid + and subj.startswith(SUPPLIER_SETTLE_PREFIX) + and direction == '贷' + ): + settle_upstream += amt + else: + if ( + parent_orgid + and book == parent_orgid + and subj == PARENT_SETTLE_SUBJECT + and direction == '借' + and r.get('participantid') == accounting_orgid + ): + settle_upstream += amt + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0), + 'amount_source': 'bill_detail', + 'bill_detail_legs': legs, + } + + +async def _build_report_row(sor, row, accounting_orgid, is_owner): + bill_id = _row_get(row, 'bill_id') + customerid = _row_get(row, 'customerid') + providerid = _row_get(row, 'providerid') + productid = _row_get(row, 'productid') + bill_date = _row_get(row, 'bill_date') + quantity = _row_get(row, 'quantity') or 1 + catalog_amount = _round_money(_row_get(row, 'catalog_amount')) + customer_pay = _round_money(_row_get(row, 'customer_pay_amount')) + customer_parentid = _row_get(row, 'customer_parentid') + + protocol = await _protocol_snapshot( + sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity, + ) + parent_orgid = protocol['parent_orgid'] + settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid) + + bill_state = _row_get(row, 'bill_state') + if str(bill_state) == '1': + amounts = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + amounts = _estimate_finance(catalog_amount, protocol, is_owner) + amounts['bill_detail_legs'] = [] + + product_name = _row_get(row, 'product_name') + if not product_name and productid: + prows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if prows: + product_name = prows[0].get('name') + provider_name = _row_get(row, 'provider_name') + if not provider_name and providerid: + prov_rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1", + {'orgid': providerid}, + ) + if prov_rows: + provider_name = prov_rows[0].get('name') + + serving_reseller_id = None + serving_reseller_name = None + is_direct = customer_parentid == accounting_orgid + if not is_direct and customer_parentid: + serving_reseller_id = customer_parentid + serving_reseller_name = await _org_name(sor, customer_parentid) + + order_date = _row_get(row, 'order_date') + servicename = _row_get(row, 'servicename') + + return { + 'bill_id': bill_id, + 'order_id': _row_get(row, 'order_id'), + 'bill_date': str(bill_date)[:10] if bill_date else None, + 'order_date': str(order_date)[:19] if order_date else None, + 'business_op': _row_get(row, 'business_op'), + 'bill_state': bill_state, + 'accounted': str(bill_state) == '1', + 'customer': { + 'id': customerid, + 'name': _row_get(row, 'customer_name'), + 'parent_orgid': customer_parentid, + 'is_direct_customer': is_direct, + 'serving_reseller': { + 'id': serving_reseller_id, + 'name': serving_reseller_name, + } if serving_reseller_id else None, + }, + 'product': { + 'id': productid, + 'name': product_name or servicename, + 'servicename': servicename, + }, + 'provider': { + 'orgid': providerid, + 'name': provider_name, + }, + 'quantity': quantity, + 'pricing': { + 'catalog_amount': catalog_amount, + 'list_price_unit': _round_money(_row_get(row, 'list_price')), + 'order_discount': _round_money(_row_get(row, 'order_discount')) + if _row_get(row, 'order_discount') is not None + else None, + 'order_unit_price': _round_money(_row_get(row, 'order_unit_price')), + 'customer_pay_amount': customer_pay, + }, + 'viewer_org': { + 'id': accounting_orgid, + 'is_business_owner': is_owner, + 'immediate_parent': { + 'id': settle_meta.get('immediate_parent_orgid'), + 'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')), + } if settle_meta.get('immediate_parent_orgid') else None, + }, + 'protocol': { + 'parent_salemode': _salemode_label(protocol.get('own_salemode')), + 'parent_discount_to_us': _round_money(protocol.get('own_discount')), + 'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')), + 'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')), + 'our_discount_to_customer': _round_money(protocol.get('customer_discount')), + 'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')), + 'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')), + }, + 'finance': { + 'profit_amount': amounts['profit_amount'], + 'settle_upstream_amount': amounts['settle_upstream_amount'], + 'settle_upstream_type': settle_meta['settle_upstream_type'], + 'settle_upstream_target': { + 'id': settle_meta['settle_upstream_orgid'], + 'name': settle_meta['settle_upstream_orgname'], + }, + 'amount_source': amounts['amount_source'], + }, + 'bill_detail_legs': amounts.get('bill_detail_legs', []), + } + + +def _bill_core_from_clause(): + """统计/查 ID 用最小 JOIN(兼容无 ordergoodsid 字段的库)。""" + return """ +FROM bill b +INNER JOIN bz_order o ON b.orderid = o.id +INNER JOIN organization cust ON b.customerid = cust.id +""" + + +def _bill_ids_sql(where_sql): + """不在 SQL 里写 LIMIT:部分环境 sqlExe 对 LIMIT 会返回空列表。""" + return f""" +SELECT b.id AS bill_id +{_bill_core_from_clause()} +WHERE {where_sql} +ORDER BY b.bill_date DESC, b.create_at DESC +""" + + +def _sql_quote_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + if not s: + return None + for ch in s: + if not (ch.isalnum() or ch in ('_', '-')): + return None + return "'" + s.replace("'", "''") + "'" + + +async def _fetch_overview_bill_rows(sor, where_sql, params, max_bills): + """ + 概览取数:先用仅 bill_id 的 SQL(已验证可用),再 sor.R 组装明细。 + 多列 + JOIN 的 _bill_scope_list_sql 在本环境 sqlExe 会返回空列表。 + """ + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + + total_ids = len(all_ids) + truncated = total_ids > max_bills + if truncated: + all_ids = all_ids[:max_bills] + + rows = [] + for bid in all_ids: + row = await _fetch_bill_row_via_R(sor, bid) + if row: + rows.append(row) + + return { + 'total_ids': total_ids, + 'truncated': truncated, + 'id_query_len': len(id_rows), + 'rows': rows, + } + + +def _new_amount_bucket(): + return { + 'sales_total': 0.0, + 'profit_total': 0.0, + 'settle_upstream_total': 0.0, + 'bill_count': 0, + } + + +def _bucket_add(bucket, sales, profit, settle): + bucket['sales_total'] += float(sales or 0) + bucket['profit_total'] += float(profit or 0) + bucket['settle_upstream_total'] += float(settle or 0) + bucket['bill_count'] += 1 + + +def _bucket_round(bucket): + return { + 'sales_total': _round_money(bucket['sales_total']), + 'profit_total': _round_money(bucket['profit_total']), + 'settle_upstream_total': _round_money(bucket['settle_upstream_total']), + 'bill_count': bucket['bill_count'], + } + + +def _norm_org_id(orgid): + if orgid is None: + return None + s = str(orgid).strip() + return s if s else None + + +def _customer_segment(customer_parentid, accounting_orgid, reseller_id_set): + pid = _norm_org_id(customer_parentid) + acc = _norm_org_id(accounting_orgid) + if pid == acc: + return 'direct_customers' + if pid in reseller_id_set: + return 'from_sub_resellers' + return None + + +async def _bill_finance_amounts(sor, row, accounting_orgid, is_owner, parent_orgid): + """单笔:销售额、本级利润、本级应付上级/供应商。""" + sales = float(_row_get(row, 'customer_pay_amount') or 0) + bill_id = _row_get(row, 'bill_id') + bill_state = _row_get(row, 'bill_state') + quantity = int(_row_get(row, 'quantity') or 1) + + if str(bill_state) == '1': + fin = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + catalog = float(_row_get(row, 'catalog_amount') or 0) + protocol = await _protocol_snapshot( + sor, + accounting_orgid, + _row_get(row, 'customerid'), + _row_get(row, 'providerid'), + _row_get(row, 'productid'), + _row_get(row, 'bill_date'), + quantity, + ) + fin = _estimate_finance(catalog, protocol, is_owner) + + profit = float(fin.get('profit_amount') or 0) + settle = float(fin.get('settle_upstream_amount') or 0) + return sales, profit, settle, fin.get('amount_source', 'unknown') + + +async def _provider_name_cache(sor, cache, provider_orgid): + if not provider_orgid: + return None + if provider_orgid in cache: + return cache[provider_orgid] + name = None + rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${oid}$ AND del_flg='0' LIMIT 1", + {'oid': provider_orgid}, + ) + if rows: + name = rows[0].get('name') + cache[provider_orgid] = name + return name + + +async def _product_name_cache(sor, cache, productid): + if not productid: + return None + if productid in cache: + return cache[productid] + name = None + rows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if rows: + name = rows[0].get('name') + cache[productid] = name + return name + + +def _count_sql(where_sql): + return f""" +SELECT COUNT(*) AS total_count +{_bill_core_from_clause()} +WHERE {where_sql} +""" + + +def _normalize_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + return s if s else None + + +async def _fetch_bill_row_via_R(sor, bill_id): + """ + 用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。 + 已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。 + """ + bid = _normalize_bill_id(bill_id) + if not bid: + return None + + bills = await sor.R('bill', {'id': bid, 'del_flg': '0'}) + if not bills: + bills = await sor.R('bill', {'id': bid}) + if not bills: + return None + b = bills[0] + + o = {} + oid = b.get('orderid') + if oid: + orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'}) + if not orders: + orders = await sor.R('bz_order', {'id': oid}) + if orders: + o = orders[0] + + cust = {} + cid = b.get('customerid') + if cid: + custs = await sor.R('organization', {'id': cid, 'del_flg': '0'}) + if not custs: + custs = await sor.R('organization', {'id': cid}) + if custs: + cust = custs[0] + + og = {} + ogid = b.get('ordergoodsid') + if ogid: + ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'}) + if ogs: + og = ogs[0] + if not og and oid: + ogs = await sor.sqlExe( + """SELECT list_price, discount, price FROM order_goods + WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""", + {'oid': oid}, + ) + if ogs: + og = ogs[0] + + return { + 'bill_id': b.get('id'), + 'order_id': oid, + 'customerid': cid, + 'productid': b.get('productid'), + 'providerid': b.get('providerid'), + 'bill_date': b.get('bill_date'), + 'bill_state': b.get('bill_state'), + 'catalog_amount': b.get('provider_amt'), + 'customer_pay_amount': b.get('amount'), + 'quantity': b.get('quantity'), + 'order_date': o.get('order_date'), + 'business_op': o.get('business_op'), + 'servicename': o.get('servicename'), + 'customer_name': cust.get('orgname'), + 'customer_parentid': cust.get('parentid'), + 'list_price': og.get('list_price'), + 'order_discount': og.get('discount'), + 'order_unit_price': og.get('price'), + } + + +async def _fetch_bill_rows_by_ids(sor, bill_ids): + rows = [] + for bid in bill_ids: + one = await _fetch_bill_row_via_R(sor, bid) + if one: + rows.append(one) + return rows + + +async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size): + """ + 先查全部符合条件的 bill_id(通常数量可控),再在 Python 分页; + 明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。 + """ + id_sql = _bill_ids_sql(where_sql) + id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + total = len(all_ids) + page_ids = all_ids[offset: offset + page_size] + if not page_ids: + return total, [] + ordered = await _fetch_bill_rows_by_ids(sor, page_ids) + return total, ordered + + +async def _finance_report_debug(sor, where_sql, params, page_size, offset): + """debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。""" + dbg = {} + try: + c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params)) + dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None + except Exception as e: + dbg['count_error'] = str(e) + try: + ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + dbg['ids_query_len'] = len(ids) + except Exception as e: + dbg['ids_error'] = str(e) + try: + lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset) + lim = _sql_rows(await sor.sqlExe(lim_sql, params)) + dbg['ids_with_limit_len'] = len(lim) + except Exception as e: + dbg['ids_with_limit_error'] = str(e) + try: + page_ids = [] + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + if id_rows: + dbg['first_id_row_keys'] = list(id_rows[0].keys()) + dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID') + for r in id_rows[offset: offset + page_size]: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid: + page_ids.append(bid) + dbg['page_ids_len'] = len(page_ids) + if page_ids: + dbg['sample_page_id'] = page_ids[0] + rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'}) + dbg['sor_R_bill_count'] = len(rb) if rb else 0 + one = await _fetch_bill_row_via_R(sor, page_ids[0]) + dbg['sor_R_row_ok'] = one is not None + except Exception as e: + dbg['detail_fetch_error'] = str(e) + return dbg + + +async def finance_order_report(ns=None): + """ + 管理人员 — 客户订单财务列表(分页)。 + + 入参 ns: + accounting_orgid (必填) 账本机构 + include_sub_reseller_customers (可选) 默认 false;true 时含下级分销商的客户订单 + userid, start_date, end_date, customerid, productid, order_id, bill_state + current_page (默认1), page_size (默认20, 最大100) + + 返回 finance.settle_upstream_*: + - 本机构:应付供应商(待结转* 贷方,仅本机构账本) + - 分销商:应付直接上级(上级账本「分销商存放资金」借方,participant=本级) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + current_page, page_size, offset = _parse_page(ns) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if ns.get('customerid'): + conditions.append('b.customerid = ${customerid}$') + params['customerid'] = ns['customerid'] + if ns.get('productid'): + conditions.append('b.productid = ${productid}$') + params['productid'] = ns['productid'] + if ns.get('order_id'): + conditions.append('b.orderid = ${order_id}$') + params['order_id'] = ns['order_id'] + if ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + if total_count > 0 and offset >= total_count: + current_page = 1 + offset = 0 + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + + debug_info = None + if _parse_bool(ns.get('debug'), False): + debug_info = await _finance_report_debug( + sor, where_sql, params, page_size, offset, + ) + debug_info['page_ids_fetched'] = len(rows) + + items = [] + sum_profit = 0.0 + sum_pay = 0.0 + sum_upstream = 0.0 + row_errors = [] + + for row in rows: + try: + item = await _build_report_row(sor, row, accounting_orgid, is_owner) + except Exception as exc: + row_errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + items.append(item) + fin = item['finance'] + if fin.get('profit_amount') is not None: + sum_profit += fin['profit_amount'] + pay = item['pricing'].get('customer_pay_amount') + if pay is not None: + sum_pay += pay + up = fin.get('settle_upstream_amount') + if up is not None: + sum_upstream += up + + return { + 'status': True, + 'msg': 'ok', + 'data': { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'summary': { + 'customer_pay_total': _round_money(sum_pay), + 'profit_total': _round_money(sum_profit), + 'settle_upstream_total': _round_money(sum_upstream), + }, + 'items': items, + 'row_errors': row_errors, + 'list_row_count': len(rows), + 'debug': debug_info, + }, + } + + +async def finance_order_report_detail(ns=None): + """ + 管理人员 — 单笔账单财务明细。 + + 入参:accounting_orgid, bill_id, include_sub_reseller_customers(与列表一致,用于校验客户范围) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + bill_id = ns.get('bill_id') + if not accounting_orgid or not bill_id: + return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + _, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub) + is_owner = await _is_business_owner(sor, accounting_orgid) + + rows = await _fetch_bill_rows_by_ids(sor, [bill_id]) + if not rows: + return {'status': False, 'msg': '账单不存在'} + + row = rows[0] + if not await _customer_in_scope( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_ids, + include_sub, + ): + return {'status': False, 'msg': '该账单不在当前查询客户范围内'} + + detail = await _build_report_row(sor, row, accounting_orgid, is_owner) + detail['customer_scope'] = {'include_sub_reseller_customers': include_sub} + return {'status': True, 'msg': 'ok', 'data': detail} + + +async def finance_billing_overview(ns=None): + """ + 计费概览页 — 销售/利润/应付上级 汇总(单接口,建议概览页只调此接口)。 + + 入参 ns: + accounting_orgid (必填) 当前账本机构 + include_sub_reseller_customers (可选) 默认 true;是否含下级分销商客户带来的金额 + start_date / end_date (可选) 按 bill.bill_date 筛选 + bill_state (可选) 仅统计指定账单状态,默认不筛(含未记账则利润/结算为估算) + only_accounted (可选) true 时仅 bill_state=1(推荐概览页使用) + userid (可选) 权限 + max_bills (可选) 默认 5000,超出则截断并返回 truncated=true + + 口径(与订单明细接口一致): + sales_total = 客户实付合计 bill.amount + profit_total = 本级账本折扣收入+底价收入(bill_detail)或协议估算 + settle_upstream_total= 本机构→供应商待结转;分销→上级账本分销商存放资金借方 + + 分段: + direct_customers 直属客户(cust.parentid = accounting_orgid) + from_sub_resellers 下级分销商的客户(cust.parentid in 下级分销 org) + + 返回 by_provider_product:按供应商 orgid + 产品 id 拆分上述三项及合计。 + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), True) + only_accounted = _parse_bool(ns.get('only_accounted'), False) + try: + max_bills = int(ns.get('max_bills', 5000) or 5000) + except (TypeError, ValueError): + max_bills = 5000 + max_bills = max(100, min(max_bills, 20000)) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if only_accounted: + conditions.append("b.bill_state = '1'") + elif ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + fetch = await _fetch_overview_bill_rows(sor, where_sql, params, max_bills) + bill_rows = fetch['rows'] + truncated = fetch['truncated'] + reseller_id_set = {_norm_org_id(r) for r in reseller_ids if _norm_org_id(r)} + + totals = { + 'direct_customers': _new_amount_bucket(), + 'from_sub_resellers': _new_amount_bucket(), + 'grand_total': _new_amount_bucket(), + } + by_pp = {} + provider_names = {} + product_names = {} + skipped = 0 + errors = [] + + for row in bill_rows: + seg = _customer_segment( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_id_set, + ) + if seg is None: + skipped += 1 + continue + try: + sales, profit, settle, _src = await _bill_finance_amounts( + sor, row, accounting_orgid, is_owner, parent_orgid, + ) + except Exception as exc: + errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + + _bucket_add(totals[seg], sales, profit, settle) + _bucket_add(totals['grand_total'], sales, profit, settle) + + pid = _row_get(row, 'providerid') or '' + prid = _row_get(row, 'productid') or '' + key = (pid, prid) + if key not in by_pp: + by_pp[key] = { + 'provider_orgid': pid, + 'product_id': prid, + 'direct_customers': _new_amount_bucket(), + 'from_sub_resellers': _new_amount_bucket(), + 'total': _new_amount_bucket(), + } + _bucket_add(by_pp[key][seg], sales, profit, settle) + _bucket_add(by_pp[key]['total'], sales, profit, settle) + + breakdown = [] + for (pid, prid), node in by_pp.items(): + breakdown.append({ + 'provider': { + 'orgid': pid, + 'name': await _provider_name_cache(sor, provider_names, pid), + }, + 'product': { + 'id': prid, + 'name': await _product_name_cache(sor, product_names, prid), + }, + 'direct_customers': _bucket_round(node['direct_customers']), + 'from_sub_resellers': _bucket_round(node['from_sub_resellers']), + 'total': _bucket_round(node['total']), + }) + breakdown.sort(key=lambda x: float(x['total']['sales_total'] or 0), reverse=True) + + settle_target = await _settle_upstream_meta(sor, accounting_orgid, None) + if is_owner: + settle_label = '应付供应商' + else: + settle_label = '应付上级机构' + + data = { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'settle_upstream_label': settle_label, + 'settle_upstream_target': { + 'type': settle_target['settle_upstream_type'], + 'orgid': settle_target['settle_upstream_orgid'], + 'name': settle_target['settle_upstream_orgname'], + }, + 'period': { + 'start_date': ns.get('start_date'), + 'end_date': ns.get('end_date'), + }, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'filters': { + 'only_accounted': only_accounted, + 'bill_state': ns.get('bill_state'), + }, + 'bill_count': len(bill_rows), + 'bill_id_count': fetch['total_ids'], + 'id_query_len': fetch['id_query_len'], + 'truncated': truncated, + 'max_bills': max_bills, + 'skipped_out_of_scope': skipped, + 'totals': { + 'direct_customers': _bucket_round(totals['direct_customers']), + 'from_sub_resellers': _bucket_round(totals['from_sub_resellers']), + 'grand_total': _bucket_round(totals['grand_total']), + }, + 'by_provider_product': breakdown, + 'errors': errors, + } + if _parse_bool(ns.get('debug'), False) or ( + data['totals']['grand_total']['bill_count'] == 0 + and fetch['id_query_len'] > 0 + ): + data['debug'] = { + 'id_query_len': fetch['id_query_len'], + 'bill_id_count': fetch['total_ids'], + 'overview_rows_len': len(bill_rows), + 'skipped_out_of_scope': skipped, + 'sample_parentid': _row_get(bill_rows[0], 'customer_parentid') if bill_rows else None, + } + return {'status': True, 'msg': 'ok', 'data': data} + +ret = await finance_order_report_detail(params_kw) +return ret \ No newline at end of file diff --git a/b/bz_order/cumulative_order.dspy b/b/bz_order/cumulative_order.dspy index bd20114..bb838b0 100644 --- a/b/bz_order/cumulative_order.dspy +++ b/b/bz_order/cumulative_order.dspy @@ -13,6 +13,7 @@ async def cumulative_order(ns={}): # 统计全部 累计支付金额和累计优惠金额 不包含各种筛选条件 # 累计支付金额=BUY+RENEW-BUY_REVERSE并且对应order_status=1是实际支付金额 累计优惠金额=BUY+RENEW-BUY_REVERSE并且对应order_status=1是优惠金额 + # 不包含大模型订单 total_amount_sql = """ SELECT COALESCE(SUM( @@ -33,6 +34,7 @@ async def cumulative_order(ns={}): JOIN bz_order bo ON og.orderid = bo.id WHERE og.del_flg = '0' AND bo.del_flg = '0' + AND og.is_big_model = 0 AND bo.customerid = ${customerid}$ """ total_amount_result = await sor.sqlExe(total_amount_sql, {'customerid': customerid}) diff --git a/b/cntoai/chat_send_stream.dspy b/b/cntoai/chat_send_stream.dspy index b98d116..bae739b 100644 --- a/b/cntoai/chat_send_stream.dspy +++ b/b/cntoai/chat_send_stream.dspy @@ -74,8 +74,13 @@ async def _load_session_messages(sor, session_id): async def _resolve_chat_config(ns, sor): # api_url = ns.get('api_url') # api_key = ns.get('api_key') - api_url = 'https://api.deepseek.com/chat/completions' - api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909' + # api_url = 'https://api.deepseek.com/chat/completions' + # api_key = 'sk-c22d6573e85a4d3fa8ab932386cf2909' + + + api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions' + api_key = 'jYq8_ye1lZMCTJLz22Pcd' + if not api_url and ns.get('model_id'): doc_rows = await sor.sqlExe( "SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;" @@ -199,7 +204,9 @@ async def inference_generator(request, params_kw=None, **kw): ns = params_kw or {} # model = ns.get('model') - model = 'deepseek-v4-pro' + # model = 'deepseek-v4-pro' + model = 'qwen3.6-plus' + if not model: yield _sse_event({'type': 'error', 'msg': 'model is required'}) yield 'data: [DONE]\n\n' diff --git a/b/cntoai/model_usage_user_report.dspy b/b/cntoai/model_usage_user_report.dspy new file mode 100644 index 0000000..befac12 --- /dev/null +++ b/b/cntoai/model_usage_user_report.dspy @@ -0,0 +1,512 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _parse_usage_content(raw): + if not raw: + return {} + if isinstance(raw, dict): + return raw + try: + return json.loads(raw) + except (TypeError, ValueError): + return {} + + +def _resolve_time_range(ns): + """ + 解析时间范围。 + + 优先使用 start_time + end_time;否则按 range 快捷窗口: + hour -> 最近 1 小时 + day -> 最近 1 天 + week -> 最近 7 天 + 默认 day。 + """ + start_time = ns.get('start_time') + end_time = ns.get('end_time') + range_type = (ns.get('range') or ns.get('range_type') or 'day').lower() + + now = datetime.datetime.now() + if start_time and end_time: + start_dt = _parse_datetime(start_time) + end_dt = _parse_datetime(end_time, end_of_day=True) + if not start_dt or not end_dt: + return None, None, 'start_time / end_time 格式无效,请使用 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS' + if start_dt > end_dt: + return None, None, 'start_time 不能晚于 end_time' + return start_dt, end_dt, None + + delta_map = { + 'hour': datetime.timedelta(hours=1), + 'day': datetime.timedelta(days=1), + 'week': datetime.timedelta(weeks=1), + } + if range_type not in delta_map: + return None, None, 'range 仅支持 hour / day / week' + + end_dt = now + start_dt = now - delta_map[range_type] + return start_dt, end_dt, None + + +def _parse_datetime(value, end_of_day=False): + if not value: + return None + text = str(value).strip() + for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): + try: + dt = datetime.datetime.strptime(text, fmt) + if fmt == '%Y-%m-%d' and end_of_day: + dt = dt.replace(hour=23, minute=59, second=59) + return dt + except ValueError: + continue + return None + + +def _format_datetime(dt): + return dt.strftime('%Y-%m-%d %H:%M:%S') + + +def _group_key(dt, group_by): + if group_by == 'hour': + return dt.strftime('%Y-%m-%d %H:00:00') + if group_by == 'week': + monday = dt - datetime.timedelta(days=dt.weekday()) + return monday.strftime('%Y-%m-%d') + return dt.strftime('%Y-%m-%d') + + +def _normalize_usage_row(row, bill_amount_map=None): + usage = _parse_usage_content(row.get('usage_content')) + orderid = row.get('orderid') + amount = float(row.get('original_price') or 0) + if bill_amount_map and orderid and orderid in bill_amount_map: + amount = float(bill_amount_map[orderid] or amount) + + return { + 'id': row.get('id'), + 'userid': row.get('userid'), + 'llmid': row.get('llmid'), + 'model': usage.get('model') or row.get('model_name') or row.get('llm_model'), + 'prompt_tokens': int(usage.get('prompt_tokens') or 0), + 'completion_tokens': int(usage.get('completion_tokens') or 0), + 'total_tokens': int(usage.get('total_tokens') or 0), + 'amount': round(amount, 8), + 'bill_status': row.get('bill_status'), + 'orderid': orderid, + 'usage_time': row.get('created_at'), + } + + +async def _fetch_bill_amount_map(sor, order_ids): + if not order_ids: + return {} + result = {} + for orderid in set(order_ids): + if not orderid: + continue + rows = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'}) + if rows: + result[orderid] = sum(float(r.get('amount') or 0) for r in rows) + return result + + +async def _fetch_llm_model_map(sor, llm_ids): + result = {} + for llmid in set(llm_ids): + if not llmid: + continue + rows = await sor.R('llm', {'id': llmid}) + if rows: + result[llmid] = rows[0].get('model') + return result + + +def _build_usage_where(userid=None, userids=None, start_dt=None, end_dt=None, model_filter=None): + """构建 model_usage 单表查询条件(不使用 JOIN,兼容 sqlExe 环境)。""" + conditions = [] + if userid: + conditions.append("userid = '%s'" % _escape(userid)) + elif userids: + escaped = ["'%s'" % _escape(uid) for uid in userids if uid] + if not escaped: + conditions.append('1 = 0') + else: + conditions.append('userid IN (%s)' % ','.join(escaped)) + if start_dt and end_dt: + conditions.append( + "created_at BETWEEN '%s' AND '%s'" + % (_format_datetime(start_dt), _format_datetime(end_dt)) + ) + if model_filter: + conditions.append( + "JSON_UNQUOTE(JSON_EXTRACT(usage_content, '$.model')) = '%s'" + % _escape(model_filter) + ) + return conditions + + +async def _query_model_usage_rows(sor, conditions, limit=None, offset=None): + where_clause = ' AND '.join(conditions) if conditions else '1 = 1' + sql = """ + SELECT id, userid, llmid, original_price, orderid, bill_status, usage_content, created_at + FROM model_usage + WHERE %s + ORDER BY created_at DESC + """ % where_clause + if limit is not None: + sql += ' LIMIT %d OFFSET %d' % (int(limit), int(offset or 0)) + return await sor.sqlExe(sql, {}) + + +async def _count_model_usage(sor, conditions): + where_clause = ' AND '.join(conditions) if conditions else '1 = 1' + sql = 'SELECT COUNT(*) AS total_count FROM model_usage WHERE %s' % where_clause + return (await sor.sqlExe(sql, {}))[0]['total_count'] + + +async def _enrich_usage_rows(sor, rows): + order_ids = [row.get('orderid') for row in rows if row.get('orderid')] + llm_ids = [row.get('llmid') for row in rows if row.get('llmid')] + bill_amount_map = await _fetch_bill_amount_map(sor, order_ids) + llm_model_map = await _fetch_llm_model_map(sor, llm_ids) + items = [] + for row in rows: + enriched = dict(row) + enriched['llm_model'] = llm_model_map.get(row.get('llmid')) + items.append(_normalize_usage_row(enriched, bill_amount_map)) + return items + + +async def _fetch_customer_users(sor, orgid, customerid=None): + """获取机构下客户及其用户映射。""" + org_rows = await sor.R('organization', {'parentid': orgid, 'del_flg': '0'}) + if customerid: + org_rows = [row for row in org_rows if row.get('id') == customerid] + org_map = {row['id']: row for row in org_rows} + + user_map = {} + for oid in org_map: + user_rows = await sor.R('users', {'orgid': oid, 'del_flg': '0'}) + for user in user_rows: + user_map[user['id']] = user + return org_map, user_map + + +def _aggregate_admin_summary(items, user_map, org_map): + buckets = {} + for item in items: + user = user_map.get(item.get('userid'), {}) + org = org_map.get(user.get('orgid'), {}) + model = item.get('model') or 'unknown' + key = (org.get('id'), item.get('userid'), model) + if key not in buckets: + buckets[key] = { + 'customerid': org.get('id'), + 'customer_name': org.get('orgname'), + 'userid': item.get('userid'), + 'username': user.get('username'), + 'user_name': user.get('name'), + 'model': model, + 'prompt_tokens': 0, + 'completion_tokens': 0, + 'total_tokens': 0, + 'amount': 0.0, + 'request_count': 0, + 'first_usage_time': item.get('usage_time'), + 'last_usage_time': item.get('usage_time'), + } + bucket = buckets[key] + bucket['prompt_tokens'] += item.get('prompt_tokens') or 0 + bucket['completion_tokens'] += item.get('completion_tokens') or 0 + bucket['total_tokens'] += item.get('total_tokens') or 0 + bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8) + bucket['request_count'] += 1 + usage_time = item.get('usage_time') + if usage_time: + bucket['last_usage_time'] = usage_time + if ( + not bucket.get('first_usage_time') + or str(usage_time) < str(bucket['first_usage_time']) + ): + bucket['first_usage_time'] = usage_time + return sorted(buckets.values(), key=lambda x: x['amount'], reverse=True) + + +def _aggregate_items(items, group_by=None): + if not group_by: + return items + + buckets = {} + for item in items: + usage_time = item.get('usage_time') + if isinstance(usage_time, str): + dt = _parse_datetime(usage_time) + else: + dt = usage_time + if not dt: + key = 'unknown' + else: + key = _group_key(dt, group_by) + + if key not in buckets: + buckets[key] = { + 'period': key, + 'model': item.get('model'), + 'prompt_tokens': 0, + 'completion_tokens': 0, + 'total_tokens': 0, + 'amount': 0.0, + 'request_count': 0, + } + bucket = buckets[key] + bucket['prompt_tokens'] += item.get('prompt_tokens') or 0 + bucket['completion_tokens'] += item.get('completion_tokens') or 0 + bucket['total_tokens'] += item.get('total_tokens') or 0 + bucket['amount'] = round(bucket['amount'] + float(item.get('amount') or 0), 8) + bucket['request_count'] += 1 + + return sorted(buckets.values(), key=lambda x: x['period'], reverse=True) + + +def _summarize(items): + return { + 'request_count': len(items), + 'prompt_tokens': sum(i.get('prompt_tokens') or 0 for i in items), + 'completion_tokens': sum(i.get('completion_tokens') or 0 for i in items), + 'total_tokens': sum(i.get('total_tokens') or 0 for i in items), + 'amount': round(sum(float(i.get('amount') or 0) for i in items), 8), + } + + +async def model_usage_user_report(ns={}): + """ + 用户查看自己的模型使用记录。 + + 参数: + userid 可选,默认当前登录用户 + start_time 开始时间,格式 YYYY-MM-DD 或 YYYY-MM-DD HH:MM:SS + end_time 结束时间 + range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day) + model 按模型标识筛选(usage_content.model) + group_by 聚合粒度:hour / day / week,不传则返回明细 + current_page 页码,默认 1 + page_size 每页条数,默认 20 + + 返回字段: + model, prompt_tokens, completion_tokens, total_tokens, amount, usage_time + """ + if ns.get('userid'): + userid = ns.get('userid') + else: + userid = await get_user() + if not userid: + server_error(401) + + start_dt, end_dt, err = _resolve_time_range(ns) + if err: + return {'status': False, 'msg': err} + + model_filter = ns.get('model') + group_by = (ns.get('group_by') or '').lower() or None + if group_by and group_by not in ('hour', 'day', 'week'): + return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'} + + page_size = int(ns.get('page_size', 20)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not user_rows: + return {'status': False, 'msg': '用户不存在'} + + conditions = _build_usage_where( + userid=userid, + start_dt=start_dt, + end_dt=end_dt, + model_filter=model_filter, + ) + total_count = await _count_model_usage(sor, conditions) + + if group_by: + all_rows = await _query_model_usage_rows(sor, conditions) + all_items = await _enrich_usage_rows(sor, all_rows) + grouped = _aggregate_items(all_items, group_by) + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'userid': userid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'group_by': group_by, + 'groups': grouped, + }, + } + + all_rows = await _query_model_usage_rows(sor, conditions) + all_items = await _enrich_usage_rows(sor, all_rows) + page_rows = await _query_model_usage_rows( + sor, conditions, limit=page_size, offset=offset, + ) + items = await _enrich_usage_rows(sor, page_rows) + + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'userid': userid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'items': items, + }, + } + except Exception as e: + return {'status': False, 'msg': '查询失败, %s' % str(e)} + + +async def model_usage_admin_report(ns={}): + """ + 管理员查看当前机构下所有客户的模型使用汇总。 + + 参数: + userid 可选,默认当前登录用户(须为机构管理员) + start_time 开始时间 + end_time 结束时间 + range 快捷范围:hour / day / week(未传 start/end 时生效,默认 day) + customerid 可选,按客户机构 id 筛选 + model 可选,按模型标识筛选 + group_by 聚合粒度:hour / day / week;不传则按客户+模型汇总 + current_page 页码,默认 1 + page_size 每页条数,默认 20 + + 返回字段: + customerid, customer_name, userid, username, model, + prompt_tokens, completion_tokens, total_tokens, amount, usage_time + """ + if ns.get('userid'): + userid = ns.get('userid') + else: + userid = await get_user() + if not userid: + server_error(401) + + start_dt, end_dt, err = _resolve_time_range(ns) + if err: + return {'status': False, 'msg': err} + + customerid = ns.get('customerid') + model_filter = ns.get('model') + group_by = (ns.get('group_by') or '').lower() or None + if group_by and group_by not in ('hour', 'day', 'week'): + return {'status': False, 'msg': 'group_by 仅支持 hour / day / week'} + + page_size = int(ns.get('page_size', 20)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + user_rows = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not user_rows: + return {'status': False, 'msg': '用户不存在'} + + orgid = user_rows[0].get('orgid') + user_role = await get_user_role({'userid': userid, 'sor': sor}) + if user_role not in ('管理员', '运营', '运营管理员'): + return {'status': False, 'msg': '无权限,仅机构管理员可查看'} + + org_map, user_map = await _fetch_customer_users(sor, orgid, customerid) + user_ids = list(user_map.keys()) + if not user_ids: + empty_data = { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize([]), + 'total_count': 0, + 'current_page': current_page, + 'page_size': page_size, + 'items': [], + } + if group_by: + empty_data = { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize([]), + 'group_by': group_by, + 'groups': [], + } + return {'status': True, 'msg': '查询成功', 'data': empty_data} + + conditions = _build_usage_where( + userids=user_ids, + start_dt=start_dt, + end_dt=end_dt, + model_filter=model_filter, + ) + all_rows = await _query_model_usage_rows(sor, conditions) + all_items = await _enrich_usage_rows(sor, all_rows) + + if group_by: + enriched_items = [] + for item in all_items: + user = user_map.get(item.get('userid'), {}) + org = org_map.get(user.get('orgid'), {}) + enriched = dict(item) + enriched['customerid'] = org.get('id') + enriched['customer_name'] = org.get('orgname') + enriched['username'] = user.get('username') + enriched['user_name'] = user.get('name') + enriched_items.append(enriched) + grouped = _aggregate_items(enriched_items, group_by) + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'group_by': group_by, + 'groups': grouped, + }, + } + + summary_items = _aggregate_admin_summary(all_items, user_map, org_map) + total_count = len(summary_items) + page_items = summary_items[offset:offset + page_size] + + return { + 'status': True, + 'msg': '查询成功', + 'data': { + 'orgid': orgid, + 'start_time': _format_datetime(start_dt), + 'end_time': _format_datetime(end_dt), + 'summary': _summarize(all_items), + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'items': page_items, + }, + } + except Exception as e: + return {'status': False, 'msg': '查询失败, %s' % str(e)} + +ret = await model_usage_user_report(params_kw) +return ret \ No newline at end of file diff --git a/b/cntoai/process_user_billing.dspy b/b/cntoai/process_user_billing.dspy index 35e9ed5..2dc2e17 100644 --- a/b/cntoai/process_user_billing.dspy +++ b/b/cntoai/process_user_billing.dspy @@ -231,9 +231,10 @@ async def process_user_billing(ns={}): :param quantity: 仅 use_saleprotocol=True 时生效,数量默认 1 :return: dict,含 status、msg;成功时含 orderid、amount """ + debug(f"process_user_billing 传递参数: {ns}") # 存储输入值到usage表 db = DBPools() - async with db.sqlorContext('kboss') as sor: + async with db.sqlorContext('kboss') as sor_init: usage_ns = { 'id': uuid(), 'userid': ns.get('userid'), @@ -242,7 +243,7 @@ async def process_user_billing(ns={}): 'original_price': ns.get('amount'), 'usage_content': json.dumps(ns.get('usage')) if isinstance(ns.get('usage'), dict) else ns.get('usage') } - await sor.C('model_usage', usage_ns) + await sor_init.C('model_usage', usage_ns) apikey = ns.get('apikey') userid = ns.get('userid') @@ -328,7 +329,7 @@ async def process_user_billing(ns={}): price_res = await calc_price_by_saleprotocol( sor, org_list[0], product['id'], amount, quantity=quantity, ) - if not price_res['status']: + if price_res['status'] == 'error': return price_res debug(price_res) debug('list_price %s' % list_price) @@ -386,7 +387,7 @@ async def process_user_billing(ns={}): await sor.C('order_goods', goods_ns) charge_res = await _charge_order(sor, order_id, order_type='NEW') - if not charge_res['status']: + if charge_res['status'] == 'error': await sor.rollback() return charge_res diff --git a/b/customer/getNoinvitationcode.dspy b/b/customer/getNoinvitationcode.dspy index c354338..f690e8c 100644 --- a/b/customer/getNoinvitationcode.dspy +++ b/b/customer/getNoinvitationcode.dspy @@ -3,6 +3,14 @@ async def getNoinvitationcode(ns={}): try: db = DBPools() async with db.sqlorContext('kboss') as sor: + userid = ns.get('userid') + if not userid: + userid = await get_user() + userinfo_li = await sor.R('users', {'id': userid}) + if userinfo_li: + orgid = userinfo_li[0]['orgid'] + ns['orgid'] = orgid + # 获取组织信息 orgs = await sor.R('organization', {"org_type": '0'}) if not orgs: @@ -69,6 +77,7 @@ async def getNoinvitationcode(ns={}): data_sql = data_sql % tuple(params) # 执行数据查询 + debug(f"getNoinvitationcode 查询数据: {data_sql}") result = await sor.sqlExe(data_sql, {}) # 返回结果,包含分页信息 diff --git a/b/user/logintype.dspy b/b/user/logintype.dspy index ec75082..da8958d 100644 --- a/b/user/logintype.dspy +++ b/b/user/logintype.dspy @@ -140,7 +140,8 @@ async def logintype(ns): async with db.sqlorContext('kboss') as sor: domain_name = ns.get('domain_name') - if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527'] and ns.get('username') not in ['开元云(北京)科技有限公司', 'admin', 'kyy_root', 'kyy_kaiyuan', 'kyacloud', 'kyy_运营', 'kyy_销售', 'kyy_财务', '测试用户', 'kycloud']: + # if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and ns.get('username') not in ['开元云(北京)科技有限公司', 'admin', 'kyy_root', 'kyy_kaiyuan', 'kyacloud', 'kyy_运营', 'kyy_销售', 'kyy_财务', '测试用户', 'kycloud']: + if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527', 'www.ncmatch.cn'] and not ns.get('username'): # 登录失败次数限制 login_allowed = await check_login_allowed(ns.get('username'))