From ceb8fe28f34c4224b74a86caf486757f7bad0ef1 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 29 May 2026 11:40:24 +0800 Subject: [PATCH] update --- b/bill/finance_order_report.dspy | 10 +- b/bill/finance_order_report_overview.dspy | 1239 +++++++++++++++++++++ 2 files changed, 1245 insertions(+), 4 deletions(-) create mode 100644 b/bill/finance_order_report_overview.dspy diff --git a/b/bill/finance_order_report.dspy b/b/bill/finance_order_report.dspy index 2bf333f..d9ede32 100644 --- a/b/bill/finance_order_report.dspy +++ b/b/bill/finance_order_report.dspy @@ -1069,10 +1069,10 @@ async def finance_billing_overview(ns=None): include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), True) only_accounted = _parse_bool(ns.get('only_accounted'), False) try: - max_bills = int(ns.get('max_bills', 5000) or 5000) + max_bills = int(ns.get('max_bills', 50000) or 50000) except (TypeError, ValueError): - max_bills = 5000 - max_bills = max(100, min(max_bills, 20000)) + max_bills = 50000 + max_bills = max(100, min(max_bills, 200000)) db = DBPools() async with db.sqlorContext(DBNAME) as sor: @@ -1228,11 +1228,13 @@ async def finance_billing_overview(ns=None): return {'status': True, 'msg': 'ok', 'data': data} -_report = params_kw.get('report') or params_kw.get('api') or 'order_list' +# _report = params_kw.get('report') or params_kw.get('api') or 'order_list' +_report = None if _report in ('overview', 'billing_overview', 'finance_billing_overview'): ret = await finance_billing_overview(params_kw) elif _report in ('detail', 'order_detail'): ret = await finance_order_report_detail(params_kw) else: ret = await finance_order_report(params_kw) +return ret return ret \ No newline at end of file diff --git a/b/bill/finance_order_report_overview.dspy b/b/bill/finance_order_report_overview.dspy new file mode 100644 index 0000000..43af926 --- /dev/null +++ b/b/bill/finance_order_report_overview.dspy @@ -0,0 +1,1239 @@ +# -*- coding: utf-8 -*- +""" +管理人员财务订单报表接口。 + +视角:以 accounting_orgid(账本机构)为查看主体,展示客户订单的定价、利润、本级应付结算。 + +依赖运行环境:DBPools, sor.sqlExe, sor.R + +业务口径(已确认): + - product_salemode.providerid = provider.orgid + - 分销商不直接对供应商结算;仅本机构(parentid 为空)对供应商结算 + - 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算) + - settle_upstream_amount:本级应付(本机构→供应商;分销→直接上级机构) + - profit_amount:本级账本「折扣收入」「底价收入」贷方合计 + - include_sub_reseller_customers:是否包含下级分销商发展的客户 +""" + +async def get_parent_orgid(sor, orgid): + sql = """select a.id from organization a, organization b +where b.parentid = a.id + and a.del_flg = '0' + and b.del_flg = '0' + and b.id = ${orgid}$""" + recs = await sor.sqlExe(sql, {'orgid':orgid}) + if len(recs) == 0: + return None + return recs[0]['id'] + +DBNAME = 'kboss' +RESELLER_ORG = '1' +OWNER_OGR = '0' +CORP_CUSTOMER = '2' +PERSONAL = '3' +PROVIDER = '4' +PUBLISHER = '5' +UNDERWRITER = '6' + +PARTY_OWNER = '本机构' +PARTY_CUSTOMER = '客户' +PARTY_RESELLER = '分销商' +PARTY_PROVIDER = '供应商' +PARTY_PUBLISHER = '算力券发行方' +PARTY_UNDERWRITER = '算力券承销方' + +DEBT = '借' +CREDIT = '贷' + +ACTNAME_BUY = '付费' +ACTNAME_RECHARGE = '充值' +ACTNAME_RECHARGE_ALIPAY = '支付宝充值' +ACTNAME_SETTLE = '结算' + +SALEMODE_DISCOUNT = '折扣' +SALEMODE_REBATE = '代付费' +SALEMODE_FLOORPRICE = '底价' + +ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY' +ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE' +ACTION_RECHARGE = 'RECHARGE' +ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE' +ACTION_BUY = 'BUY' +ACTION_REVERSE_BUY = 'BUY_REVERSE' +ACTION_RENEW = 'RENEW' +ACTION_RENEW_REVERSE = 'RENEW_REVERSE' +ACTION_SETTLE = 'SETTLE' +ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE' + +DBNAME = 'kboss' +CUSTOMER_ORG_TYPES = ('2', '3') + +SALEMODE_LABEL = { + '0': SALEMODE_DISCOUNT, + '1': SALEMODE_REBATE, + '2': SALEMODE_FLOORPRICE, +} + +INCOME_SUBJECTS = ('折扣收入', '底价收入') +PARENT_SETTLE_SUBJECT = '分销商存放资金' +SUPPLIER_SETTLE_PREFIX = '待结转' + +_SALEMODE_SQL_OWN = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + +_SALEMODE_SQL_OFFER = """ +SELECT a.salemode, a.settle_mode, b.discount, b.price +FROM saleprotocol a, product_salemode b +WHERE a.id = b.protocolid + AND a.offer_orgid = ${offer_orgid}$ + AND a.bid_orgid = ${bid_orgid}$ + AND (b.productid = ${productid}$ OR b.productid = '*') + AND b.providerid = ${providerid}$ + AND a.start_date <= ${curdate}$ + AND a.end_date > ${curdate}$ + AND a.del_flg = '0' + AND b.del_flg = '0' +ORDER BY b.productid DESC +LIMIT 1 +""" + + +def _round_money(v): + if v is None: + return None + return round(float(v), 2) + + +def _salemode_label(code): + if code is None: + return None + return SALEMODE_LABEL.get(str(code), str(code)) + + +def _parse_bool(v, default=False): + if v is None: + return default + if isinstance(v, bool): + return v + return str(v).strip().lower() in ('1', 'true', 'yes', 'on') + + +def _parse_page(ns, default_page=1, default_size=20, max_size=100): + """解析分页,避免 page_size=0 导致 LIMIT 0 无数据。""" + try: + page_size = int(ns.get('page_size', default_size) or default_size) + except (TypeError, ValueError): + page_size = default_size + try: + current_page = int(ns.get('current_page', default_page) or default_page) + except (TypeError, ValueError): + current_page = default_page + page_size = max(1, min(page_size, max_size)) + current_page = max(1, current_page) + offset = (current_page - 1) * page_size + return current_page, page_size, offset + + +def _sql_rows(result): + """统一 sqlExe 查询结果为 list[dict]。""" + if result is None: + return [] + if isinstance(result, list): + return result + if isinstance(result, dict): + return [result] + return list(result) + + +def _row_get(row, *keys, default=None): + """兼容不同驱动返回的大小写字段名。""" + if not row: + return default + for key in keys: + if key in row: + return row[key] + lower = key.lower() + for k, v in row.items(): + if k.lower() == lower: + return v + return default + + +async def _check_viewer(sor, accounting_orgid, userid=None): + if not userid: + return True, None + users = await sor.R('users', {'id': userid, 'del_flg': '0'}) + if not users: + return False, '用户不存在' + user_orgid = users[0].get('orgid') + if user_orgid == accounting_orgid: + return True, None + parent = await get_parent_orgid(sor, accounting_orgid) + if parent and user_orgid == parent: + return True, None + return False, '无权查看该机构财务数据' + + +async def _org_name(sor, orgid): + if not orgid: + return None + rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'}) + return rows[0]['orgname'] if rows else None + + +async def _is_business_owner(sor, orgid): + rows = await sor.sqlExe( + "SELECT id FROM organization WHERE id=${id}$ AND parentid IS NULL AND del_flg='0'", + {'id': orgid}, + ) + return len(rows) > 0 + + +async def _collect_descendant_reseller_ids(sor, root_orgid): + """递归收集 root 下所有下级分销商 organization.id(org_type=1)。""" + found = [] + queue = [root_orgid] + seen = {root_orgid} + while queue: + pid = queue.pop(0) + rows = await sor.sqlExe( + """SELECT id FROM organization + WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""", + {'pid': pid, 'org_type': RESELLER_ORG}, + ) + for r in rows: + cid = r['id'] + if cid not in seen: + seen.add(cid) + found.append(cid) + queue.append(cid) + return found + + +async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers): + """ + 客户范围 SQL 片段与参数。 + 直属:cust.parentid = accounting_orgid + 含下级分销:直属 + cust.parentid IN (下级分销商 id 列表) + """ + params = {'accounting_orgid': accounting_orgid} + if not include_sub_reseller_customers: + return "cust.parentid = ${accounting_orgid}$", params, [] + + reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid) + if not reseller_ids: + return "cust.parentid = ${accounting_orgid}$", params, [] + + in_keys = [] + for i, rid in enumerate(reseller_ids): + key = 'reseller_%d' % i + params[key] = rid + in_keys.append('${%s}$' % key) + in_sql = ', '.join(in_keys) + cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql + return cond, params, reseller_ids + + +async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub): + if customer_parentid == accounting_orgid: + return True + if include_sub and customer_parentid in reseller_ids: + return True + return False + + +async def _fetch_salemode_row(sor, sql, params): + rows = await sor.sqlExe(sql, params) + return rows[0] if rows else None + + +async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid): + """本级应付对象:本机构→供应商;分销→直接上级。""" + if await _is_business_owner(sor, accounting_orgid): + return { + 'settle_upstream_type': 'supplier', + 'settle_upstream_orgid': provider_orgid, + 'settle_upstream_orgname': await _org_name(sor, provider_orgid), + 'immediate_parent_orgid': None, + } + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + return { + 'settle_upstream_type': 'parent_org', + 'settle_upstream_orgid': parent_orgid, + 'settle_upstream_orgname': await _org_name(sor, parent_orgid), + 'immediate_parent_orgid': parent_orgid, + } + + +async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity): + curdate = bill_date + if hasattr(curdate, 'strftime'): + curdate = curdate.strftime('%Y-%m-%d') + elif curdate is not None: + curdate = str(curdate)[:10] + + qty = int(quantity or 1) + base = {'providerid': providerid, 'productid': productid, 'curdate': curdate} + + own = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid), + ) + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid), + ) + if not cust: + cust = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=accounting_orgid, bid_orgid='*'), + ) + + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + reseller = None + if parent_orgid: + reseller = await _fetch_salemode_row( + sor, _SALEMODE_SQL_OFFER, + dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid), + ) + + return { + 'parent_orgid': parent_orgid, + 'parent_orgname': await _org_name(sor, parent_orgid), + 'own_salemode': own.get('salemode') if own else None, + 'own_discount': own.get('discount') if own else None, + 'own_floor_unit_price': own.get('price') if own else None, + 'customer_salemode': cust.get('salemode') if cust else None, + 'customer_discount': cust.get('discount') if cust else None, + 'customer_floor_unit_price': cust.get('price') if cust else None, + 'reseller_discount': reseller.get('discount') if reseller else None, + 'reseller_salemode': reseller.get('salemode') if reseller else None, + 'reseller_floor_unit_price': reseller.get('price') if reseller else None, + 'quantity': qty, + } + + +def _estimate_finance(catalog_amount, protocol, is_owner): + """未记账:估算本级利润与本级应付(不含代付费单独字段)。""" + catalog_amount = float(catalog_amount or 0) + qty = protocol.get('quantity') or 1 + profit = None + settle_upstream = None + + own_mode = protocol.get('own_salemode') + cust_mode = protocol.get('customer_salemode') + own_disc = protocol.get('own_discount') + cust_disc = protocol.get('customer_discount') + reseller_disc = protocol.get('reseller_discount') + + if own_mode == '0' or cust_mode == '0': + if own_disc is not None and cust_disc is not None: + profit = catalog_amount * (float(cust_disc) - float(own_disc)) + if is_owner: + if own_disc is not None: + settle_upstream = catalog_amount * float(own_disc) + elif reseller_disc is not None: + settle_upstream = catalog_amount * float(reseller_disc) + + elif own_mode == '2' or cust_mode == '2': + own_price = protocol.get('own_floor_unit_price') + cust_price = protocol.get('customer_floor_unit_price') + reseller_price = protocol.get('reseller_floor_unit_price') + if own_price is not None and cust_price is not None: + profit = (float(cust_price) - float(own_price)) * qty + if is_owner and own_price is not None: + settle_upstream = float(own_price) * qty + elif not is_owner and reseller_price is not None: + settle_upstream = float(reseller_price) * qty + elif not is_owner and cust_price is not None: + settle_upstream = float(cust_price) * qty + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream), + 'amount_source': 'estimated', + } + + +async def _bill_detail_rows(sor, billid): + """单表读 bill_detail,优先 sor.R(避免 sqlExe 多表问题)。""" + rows = await sor.R('bill_detail', {'billid': billid, 'del_flg': '0'}) + if rows: + return rows + return _sql_rows(await sor.sqlExe( + """SELECT accounting_orgid, subjectname, accounting_dir, + participantid, participanttype, amount + FROM bill_detail WHERE billid=${billid}$ AND del_flg='0'""", + {'billid': billid}, + )) + + +async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_owner, parent_orgid): + """已记账:从 bill_detail 取本级利润与本级应付。""" + rows = await _bill_detail_rows(sor, billid) + profit = 0.0 + settle_upstream = 0.0 + legs = [] + + for r in rows: + legs.append({ + 'accounting_orgid': r['accounting_orgid'], + 'subjectname': r['subjectname'], + 'accounting_dir': r['accounting_dir'], + 'participanttype': r.get('participanttype'), + 'participantid': r.get('participantid'), + 'amount': _round_money(r['amount']), + }) + amt = float(r['amount'] or 0) + book = r['accounting_orgid'] + subj = r['subjectname'] or '' + direction = r['accounting_dir'] + + if book == accounting_orgid and subj in INCOME_SUBJECTS and direction == '贷': + profit += amt + + if is_owner: + if ( + book == accounting_orgid + and subj.startswith(SUPPLIER_SETTLE_PREFIX) + and direction == '贷' + ): + settle_upstream += amt + else: + if ( + parent_orgid + and book == parent_orgid + and subj == PARENT_SETTLE_SUBJECT + and direction == '借' + and r.get('participantid') == accounting_orgid + ): + settle_upstream += amt + + return { + 'profit_amount': _round_money(profit), + 'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0), + 'amount_source': 'bill_detail', + 'bill_detail_legs': legs, + } + + +async def _build_report_row(sor, row, accounting_orgid, is_owner): + bill_id = _row_get(row, 'bill_id') + customerid = _row_get(row, 'customerid') + providerid = _row_get(row, 'providerid') + productid = _row_get(row, 'productid') + bill_date = _row_get(row, 'bill_date') + quantity = _row_get(row, 'quantity') or 1 + catalog_amount = _round_money(_row_get(row, 'catalog_amount')) + customer_pay = _round_money(_row_get(row, 'customer_pay_amount')) + customer_parentid = _row_get(row, 'customer_parentid') + + protocol = await _protocol_snapshot( + sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity, + ) + parent_orgid = protocol['parent_orgid'] + settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid) + + bill_state = _row_get(row, 'bill_state') + if str(bill_state) == '1': + amounts = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + amounts = _estimate_finance(catalog_amount, protocol, is_owner) + amounts['bill_detail_legs'] = [] + + product_name = _row_get(row, 'product_name') + if not product_name and productid: + prows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if prows: + product_name = prows[0].get('name') + provider_name = _row_get(row, 'provider_name') + if not provider_name and providerid: + prov_rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1", + {'orgid': providerid}, + ) + if prov_rows: + provider_name = prov_rows[0].get('name') + + serving_reseller_id = None + serving_reseller_name = None + is_direct = customer_parentid == accounting_orgid + if not is_direct and customer_parentid: + serving_reseller_id = customer_parentid + serving_reseller_name = await _org_name(sor, customer_parentid) + + order_date = _row_get(row, 'order_date') + servicename = _row_get(row, 'servicename') + + return { + 'bill_id': bill_id, + 'order_id': _row_get(row, 'order_id'), + 'bill_date': str(bill_date)[:10] if bill_date else None, + 'order_date': str(order_date)[:19] if order_date else None, + 'business_op': _row_get(row, 'business_op'), + 'bill_state': bill_state, + 'accounted': str(bill_state) == '1', + 'customer': { + 'id': customerid, + 'name': _row_get(row, 'customer_name'), + 'parent_orgid': customer_parentid, + 'is_direct_customer': is_direct, + 'serving_reseller': { + 'id': serving_reseller_id, + 'name': serving_reseller_name, + } if serving_reseller_id else None, + }, + 'product': { + 'id': productid, + 'name': product_name or servicename, + 'servicename': servicename, + }, + 'provider': { + 'orgid': providerid, + 'name': provider_name, + }, + 'quantity': quantity, + 'pricing': { + 'catalog_amount': catalog_amount, + 'list_price_unit': _round_money(_row_get(row, 'list_price')), + 'order_discount': _round_money(_row_get(row, 'order_discount')) + if _row_get(row, 'order_discount') is not None + else None, + 'order_unit_price': _round_money(_row_get(row, 'order_unit_price')), + 'customer_pay_amount': customer_pay, + }, + 'viewer_org': { + 'id': accounting_orgid, + 'is_business_owner': is_owner, + 'immediate_parent': { + 'id': settle_meta.get('immediate_parent_orgid'), + 'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')), + } if settle_meta.get('immediate_parent_orgid') else None, + }, + 'protocol': { + 'parent_salemode': _salemode_label(protocol.get('own_salemode')), + 'parent_discount_to_us': _round_money(protocol.get('own_discount')), + 'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')), + 'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')), + 'our_discount_to_customer': _round_money(protocol.get('customer_discount')), + 'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')), + 'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')), + }, + 'finance': { + 'profit_amount': amounts['profit_amount'], + 'settle_upstream_amount': amounts['settle_upstream_amount'], + 'settle_upstream_type': settle_meta['settle_upstream_type'], + 'settle_upstream_target': { + 'id': settle_meta['settle_upstream_orgid'], + 'name': settle_meta['settle_upstream_orgname'], + }, + 'amount_source': amounts['amount_source'], + }, + 'bill_detail_legs': amounts.get('bill_detail_legs', []), + } + + +def _bill_core_from_clause(): + """统计/查 ID 用最小 JOIN(兼容无 ordergoodsid 字段的库)。""" + return """ +FROM bill b +INNER JOIN bz_order o ON b.orderid = o.id +INNER JOIN organization cust ON b.customerid = cust.id +""" + + +def _bill_ids_sql(where_sql): + """不在 SQL 里写 LIMIT:部分环境 sqlExe 对 LIMIT 会返回空列表。""" + return f""" +SELECT b.id AS bill_id +{_bill_core_from_clause()} +WHERE {where_sql} +ORDER BY b.bill_date DESC, b.create_at DESC +""" + + +def _sql_quote_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + if not s: + return None + for ch in s: + if not (ch.isalnum() or ch in ('_', '-')): + return None + return "'" + s.replace("'", "''") + "'" + + +async def _fetch_overview_bill_rows(sor, where_sql, params, max_bills): + """ + 概览取数:先用仅 bill_id 的 SQL(已验证可用),再 sor.R 组装明细。 + 多列 + JOIN 的 _bill_scope_list_sql 在本环境 sqlExe 会返回空列表。 + """ + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + + total_ids = len(all_ids) + truncated = total_ids > max_bills + if truncated: + all_ids = all_ids[:max_bills] + + rows = [] + for bid in all_ids: + row = await _fetch_bill_row_via_R(sor, bid) + if row: + rows.append(row) + + return { + 'total_ids': total_ids, + 'truncated': truncated, + 'id_query_len': len(id_rows), + 'rows': rows, + } + + +def _new_amount_bucket(): + return { + 'sales_total': 0.0, + 'profit_total': 0.0, + 'settle_upstream_total': 0.0, + 'bill_count': 0, + } + + +def _bucket_add(bucket, sales, profit, settle): + bucket['sales_total'] += float(sales or 0) + bucket['profit_total'] += float(profit or 0) + bucket['settle_upstream_total'] += float(settle or 0) + bucket['bill_count'] += 1 + + +def _bucket_round(bucket): + return { + 'sales_total': _round_money(bucket['sales_total']), + 'profit_total': _round_money(bucket['profit_total']), + 'settle_upstream_total': _round_money(bucket['settle_upstream_total']), + 'bill_count': bucket['bill_count'], + } + + +def _norm_org_id(orgid): + if orgid is None: + return None + s = str(orgid).strip() + return s if s else None + + +def _customer_segment(customer_parentid, accounting_orgid, reseller_id_set): + pid = _norm_org_id(customer_parentid) + acc = _norm_org_id(accounting_orgid) + if pid == acc: + return 'direct_customers' + if pid in reseller_id_set: + return 'from_sub_resellers' + return None + + +async def _bill_finance_amounts(sor, row, accounting_orgid, is_owner, parent_orgid): + """单笔:销售额、本级利润、本级应付上级/供应商。""" + sales = float(_row_get(row, 'customer_pay_amount') or 0) + bill_id = _row_get(row, 'bill_id') + bill_state = _row_get(row, 'bill_state') + quantity = int(_row_get(row, 'quantity') or 1) + + if str(bill_state) == '1': + fin = await _finance_from_bill_detail( + sor, bill_id, accounting_orgid, is_owner, parent_orgid, + ) + else: + catalog = float(_row_get(row, 'catalog_amount') or 0) + protocol = await _protocol_snapshot( + sor, + accounting_orgid, + _row_get(row, 'customerid'), + _row_get(row, 'providerid'), + _row_get(row, 'productid'), + _row_get(row, 'bill_date'), + quantity, + ) + fin = _estimate_finance(catalog, protocol, is_owner) + + profit = float(fin.get('profit_amount') or 0) + settle = float(fin.get('settle_upstream_amount') or 0) + return sales, profit, settle, fin.get('amount_source', 'unknown') + + +async def _provider_name_cache(sor, cache, provider_orgid): + if not provider_orgid: + return None + if provider_orgid in cache: + return cache[provider_orgid] + name = None + rows = await sor.sqlExe( + "SELECT name FROM provider WHERE orgid=${oid}$ AND del_flg='0' LIMIT 1", + {'oid': provider_orgid}, + ) + if rows: + name = rows[0].get('name') + cache[provider_orgid] = name + return name + + +async def _product_name_cache(sor, cache, productid): + if not productid: + return None + if productid in cache: + return cache[productid] + name = None + rows = await sor.R('product', {'id': productid, 'del_flg': '0'}) + if rows: + name = rows[0].get('name') + cache[productid] = name + return name + + +def _count_sql(where_sql): + return f""" +SELECT COUNT(*) AS total_count +{_bill_core_from_clause()} +WHERE {where_sql} +""" + + +def _normalize_bill_id(bill_id): + if bill_id is None: + return None + s = str(bill_id).strip() + return s if s else None + + +async def _fetch_bill_row_via_R(sor, bill_id): + """ + 用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。 + 已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。 + """ + bid = _normalize_bill_id(bill_id) + if not bid: + return None + + bills = await sor.R('bill', {'id': bid, 'del_flg': '0'}) + if not bills: + bills = await sor.R('bill', {'id': bid}) + if not bills: + return None + b = bills[0] + + o = {} + oid = b.get('orderid') + if oid: + orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'}) + if not orders: + orders = await sor.R('bz_order', {'id': oid}) + if orders: + o = orders[0] + + cust = {} + cid = b.get('customerid') + if cid: + custs = await sor.R('organization', {'id': cid, 'del_flg': '0'}) + if not custs: + custs = await sor.R('organization', {'id': cid}) + if custs: + cust = custs[0] + + og = {} + ogid = b.get('ordergoodsid') + if ogid: + ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'}) + if ogs: + og = ogs[0] + if not og and oid: + ogs = await sor.sqlExe( + """SELECT list_price, discount, price FROM order_goods + WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""", + {'oid': oid}, + ) + if ogs: + og = ogs[0] + + return { + 'bill_id': b.get('id'), + 'order_id': oid, + 'customerid': cid, + 'productid': b.get('productid'), + 'providerid': b.get('providerid'), + 'bill_date': b.get('bill_date'), + 'bill_state': b.get('bill_state'), + 'catalog_amount': b.get('provider_amt'), + 'customer_pay_amount': b.get('amount'), + 'quantity': b.get('quantity'), + 'order_date': o.get('order_date'), + 'business_op': o.get('business_op'), + 'servicename': o.get('servicename'), + 'customer_name': cust.get('orgname'), + 'customer_parentid': cust.get('parentid'), + 'list_price': og.get('list_price'), + 'order_discount': og.get('discount'), + 'order_unit_price': og.get('price'), + } + + +async def _fetch_bill_rows_by_ids(sor, bill_ids): + rows = [] + for bid in bill_ids: + one = await _fetch_bill_row_via_R(sor, bid) + if one: + rows.append(one) + return rows + + +async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size): + """ + 先查全部符合条件的 bill_id(通常数量可控),再在 Python 分页; + 明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。 + """ + id_sql = _bill_ids_sql(where_sql) + id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params)) + all_ids = [] + seen = set() + for r in id_rows: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid and bid not in seen: + seen.add(bid) + all_ids.append(bid) + total = len(all_ids) + page_ids = all_ids[offset: offset + page_size] + if not page_ids: + return total, [] + ordered = await _fetch_bill_rows_by_ids(sor, page_ids) + return total, ordered + + +async def _finance_report_debug(sor, where_sql, params, page_size, offset): + """debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。""" + dbg = {} + try: + c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params)) + dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None + except Exception as e: + dbg['count_error'] = str(e) + try: + ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + dbg['ids_query_len'] = len(ids) + except Exception as e: + dbg['ids_error'] = str(e) + try: + lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset) + lim = _sql_rows(await sor.sqlExe(lim_sql, params)) + dbg['ids_with_limit_len'] = len(lim) + except Exception as e: + dbg['ids_with_limit_error'] = str(e) + try: + page_ids = [] + id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) + if id_rows: + dbg['first_id_row_keys'] = list(id_rows[0].keys()) + dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID') + for r in id_rows[offset: offset + page_size]: + bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) + if bid: + page_ids.append(bid) + dbg['page_ids_len'] = len(page_ids) + if page_ids: + dbg['sample_page_id'] = page_ids[0] + rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'}) + dbg['sor_R_bill_count'] = len(rb) if rb else 0 + one = await _fetch_bill_row_via_R(sor, page_ids[0]) + dbg['sor_R_row_ok'] = one is not None + except Exception as e: + dbg['detail_fetch_error'] = str(e) + return dbg + + +async def finance_order_report(ns=None): + """ + 管理人员 — 客户订单财务列表(分页)。 + + 入参 ns: + accounting_orgid (必填) 账本机构 + include_sub_reseller_customers (可选) 默认 false;true 时含下级分销商的客户订单 + userid, start_date, end_date, customerid, productid, order_id, bill_state + current_page (默认1), page_size (默认20, 最大100) + + 返回 finance.settle_upstream_*: + - 本机构:应付供应商(待结转* 贷方,仅本机构账本) + - 分销商:应付直接上级(上级账本「分销商存放资金」借方,participant=本级) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + current_page, page_size, offset = _parse_page(ns) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if ns.get('customerid'): + conditions.append('b.customerid = ${customerid}$') + params['customerid'] = ns['customerid'] + if ns.get('productid'): + conditions.append('b.productid = ${productid}$') + params['productid'] = ns['productid'] + if ns.get('order_id'): + conditions.append('b.orderid = ${order_id}$') + params['order_id'] = ns['order_id'] + if ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + if total_count > 0 and offset >= total_count: + current_page = 1 + offset = 0 + total_count, rows = await _fetch_bill_rows_page( + sor, where_sql, params, offset, page_size, + ) + + debug_info = None + if _parse_bool(ns.get('debug'), False): + debug_info = await _finance_report_debug( + sor, where_sql, params, page_size, offset, + ) + debug_info['page_ids_fetched'] = len(rows) + + items = [] + sum_profit = 0.0 + sum_pay = 0.0 + sum_upstream = 0.0 + row_errors = [] + + for row in rows: + try: + item = await _build_report_row(sor, row, accounting_orgid, is_owner) + except Exception as exc: + row_errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + items.append(item) + fin = item['finance'] + if fin.get('profit_amount') is not None: + sum_profit += fin['profit_amount'] + pay = item['pricing'].get('customer_pay_amount') + if pay is not None: + sum_pay += pay + up = fin.get('settle_upstream_amount') + if up is not None: + sum_upstream += up + + return { + 'status': True, + 'msg': 'ok', + 'data': { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'total_count': total_count, + 'current_page': current_page, + 'page_size': page_size, + 'summary': { + 'customer_pay_total': _round_money(sum_pay), + 'profit_total': _round_money(sum_profit), + 'settle_upstream_total': _round_money(sum_upstream), + }, + 'items': items, + 'row_errors': row_errors, + 'list_row_count': len(rows), + 'debug': debug_info, + }, + } + + +async def finance_order_report_detail(ns=None): + """ + 管理人员 — 单笔账单财务明细。 + + 入参:accounting_orgid, bill_id, include_sub_reseller_customers(与列表一致,用于校验客户范围) + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + bill_id = ns.get('bill_id') + if not accounting_orgid or not bill_id: + return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + _, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub) + is_owner = await _is_business_owner(sor, accounting_orgid) + + rows = await _fetch_bill_rows_by_ids(sor, [bill_id]) + if not rows: + return {'status': False, 'msg': '账单不存在'} + + row = rows[0] + if not await _customer_in_scope( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_ids, + include_sub, + ): + return {'status': False, 'msg': '该账单不在当前查询客户范围内'} + + detail = await _build_report_row(sor, row, accounting_orgid, is_owner) + detail['customer_scope'] = {'include_sub_reseller_customers': include_sub} + return {'status': True, 'msg': 'ok', 'data': detail} + + +async def finance_billing_overview(ns=None): + """ + 计费概览页 — 销售/利润/应付上级 汇总(单接口,建议概览页只调此接口)。 + + 入参 ns: + accounting_orgid (必填) 当前账本机构 + include_sub_reseller_customers (可选) 默认 true;是否含下级分销商客户带来的金额 + start_date / end_date (可选) 按 bill.bill_date 筛选 + bill_state (可选) 仅统计指定账单状态,默认不筛(含未记账则利润/结算为估算) + only_accounted (可选) true 时仅 bill_state=1(推荐概览页使用) + userid (可选) 权限 + max_bills (可选) 默认 5000,超出则截断并返回 truncated=true + + 口径(与订单明细接口一致): + sales_total = 客户实付合计 bill.amount + profit_total = 本级账本折扣收入+底价收入(bill_detail)或协议估算 + settle_upstream_total= 本机构→供应商待结转;分销→上级账本分销商存放资金借方 + + 分段: + direct_customers 直属客户(cust.parentid = accounting_orgid) + from_sub_resellers 下级分销商的客户(cust.parentid in 下级分销 org) + + 返回 by_provider_product:按供应商 orgid + 产品 id 拆分上述三项及合计。 + """ + ns = ns or {} + accounting_orgid = ns.get('accounting_orgid') + if not accounting_orgid: + return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} + + include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), True) + only_accounted = _parse_bool(ns.get('only_accounted'), False) + try: + max_bills = int(ns.get('max_bills', 50000) or 50000) + except (TypeError, ValueError): + max_bills = 50000 + max_bills = max(100, min(max_bills, 200000)) + + db = DBPools() + async with db.sqlorContext(DBNAME) as sor: + ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) + if not ok: + return {'status': False, 'msg': err} + + scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( + sor, accounting_orgid, include_sub, + ) + is_owner = await _is_business_owner(sor, accounting_orgid) + parent_orgid = await get_parent_orgid(sor, accounting_orgid) + + conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] + params = dict(scope_params) + if ns.get('start_date'): + conditions.append('b.bill_date >= ${start_date}$') + params['start_date'] = ns['start_date'] + if ns.get('end_date'): + conditions.append('b.bill_date <= ${end_date}$') + params['end_date'] = ns['end_date'] + if only_accounted: + conditions.append("b.bill_state = '1'") + elif ns.get('bill_state') is not None: + conditions.append('b.bill_state = ${bill_state}$') + params['bill_state'] = str(ns['bill_state']) + + where_sql = ' AND '.join(conditions) + fetch = await _fetch_overview_bill_rows(sor, where_sql, params, max_bills) + bill_rows = fetch['rows'] + truncated = fetch['truncated'] + reseller_id_set = {_norm_org_id(r) for r in reseller_ids if _norm_org_id(r)} + + totals = { + 'direct_customers': _new_amount_bucket(), + 'from_sub_resellers': _new_amount_bucket(), + 'grand_total': _new_amount_bucket(), + } + by_pp = {} + provider_names = {} + product_names = {} + skipped = 0 + errors = [] + + for row in bill_rows: + seg = _customer_segment( + _row_get(row, 'customer_parentid'), + accounting_orgid, + reseller_id_set, + ) + if seg is None: + skipped += 1 + continue + try: + sales, profit, settle, _src = await _bill_finance_amounts( + sor, row, accounting_orgid, is_owner, parent_orgid, + ) + except Exception as exc: + errors.append({ + 'bill_id': _row_get(row, 'bill_id'), + 'error': str(exc), + }) + continue + + _bucket_add(totals[seg], sales, profit, settle) + _bucket_add(totals['grand_total'], sales, profit, settle) + + pid = _row_get(row, 'providerid') or '' + prid = _row_get(row, 'productid') or '' + key = (pid, prid) + if key not in by_pp: + by_pp[key] = { + 'provider_orgid': pid, + 'product_id': prid, + 'direct_customers': _new_amount_bucket(), + 'from_sub_resellers': _new_amount_bucket(), + 'total': _new_amount_bucket(), + } + _bucket_add(by_pp[key][seg], sales, profit, settle) + _bucket_add(by_pp[key]['total'], sales, profit, settle) + + breakdown = [] + for (pid, prid), node in by_pp.items(): + breakdown.append({ + 'provider': { + 'orgid': pid, + 'name': await _provider_name_cache(sor, provider_names, pid), + }, + 'product': { + 'id': prid, + 'name': await _product_name_cache(sor, product_names, prid), + }, + 'direct_customers': _bucket_round(node['direct_customers']), + 'from_sub_resellers': _bucket_round(node['from_sub_resellers']), + 'total': _bucket_round(node['total']), + }) + breakdown.sort(key=lambda x: float(x['total']['sales_total'] or 0), reverse=True) + + settle_target = await _settle_upstream_meta(sor, accounting_orgid, None) + if is_owner: + settle_label = '应付供应商' + else: + settle_label = '应付上级机构' + + data = { + 'accounting_orgid': accounting_orgid, + 'accounting_orgname': await _org_name(sor, accounting_orgid), + 'is_business_owner': is_owner, + 'settle_upstream_label': settle_label, + 'settle_upstream_target': { + 'type': settle_target['settle_upstream_type'], + 'orgid': settle_target['settle_upstream_orgid'], + 'name': settle_target['settle_upstream_orgname'], + }, + 'period': { + 'start_date': ns.get('start_date'), + 'end_date': ns.get('end_date'), + }, + 'customer_scope': { + 'include_sub_reseller_customers': include_sub, + 'descendant_reseller_ids': reseller_ids, + 'descendant_reseller_count': len(reseller_ids), + }, + 'filters': { + 'only_accounted': only_accounted, + 'bill_state': ns.get('bill_state'), + }, + 'bill_count': len(bill_rows), + 'bill_id_count': fetch['total_ids'], + 'id_query_len': fetch['id_query_len'], + 'truncated': truncated, + 'max_bills': max_bills, + 'skipped_out_of_scope': skipped, + 'totals': { + 'direct_customers': _bucket_round(totals['direct_customers']), + 'from_sub_resellers': _bucket_round(totals['from_sub_resellers']), + 'grand_total': _bucket_round(totals['grand_total']), + }, + 'by_provider_product': breakdown, + 'errors': errors, + } + if _parse_bool(ns.get('debug'), False) or ( + data['totals']['grand_total']['bill_count'] == 0 + and fetch['id_query_len'] > 0 + ): + data['debug'] = { + 'id_query_len': fetch['id_query_len'], + 'bill_id_count': fetch['total_ids'], + 'overview_rows_len': len(bill_rows), + 'skipped_out_of_scope': skipped, + 'sample_parentid': _row_get(bill_rows[0], 'customer_parentid') if bill_rows else None, + } + return {'status': True, 'msg': 'ok', 'data': data} + + +# _report = params_kw.get('report') or params_kw.get('api') or 'order_list' +_report = 'overview' +if _report in ('overview', 'billing_overview', 'finance_billing_overview'): + ret = await finance_billing_overview(params_kw) +elif _report in ('detail', 'order_detail'): + ret = await finance_order_report_detail(params_kw) +else: + ret = await finance_order_report(params_kw) +return ret \ No newline at end of file