# -*- coding: utf-8 -*- """ 管理人员财务订单报表接口。 视角:以 accounting_orgid(账本机构)为查看主体,展示客户订单的定价、利润、本级应付结算。 依赖运行环境:DBPools, sor.sqlExe, sor.R 业务口径(已确认): - product_salemode.providerid = provider.orgid - 分销商不直接对供应商结算;仅本机构(parentid 为空)对供应商结算 - 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算) - settle_upstream_amount:本级应付(本机构→供应商;分销→直接上级机构) - profit_amount:本级账本「折扣收入」「底价收入」贷方合计 - include_sub_reseller_customers:是否包含下级分销商发展的客户 """ async def get_parent_orgid(sor, orgid): sql = """select a.id from organization a, organization b where b.parentid = a.id and a.del_flg = '0' and b.del_flg = '0' and b.id = ${orgid}$""" recs = await sor.sqlExe(sql, {'orgid':orgid}) if len(recs) == 0: return None return recs[0]['id'] DBNAME = 'kboss' RESELLER_ORG = '1' OWNER_OGR = '0' CORP_CUSTOMER = '2' PERSONAL = '3' PROVIDER = '4' PUBLISHER = '5' UNDERWRITER = '6' PARTY_OWNER = '本机构' PARTY_CUSTOMER = '客户' PARTY_RESELLER = '分销商' PARTY_PROVIDER = '供应商' PARTY_PUBLISHER = '算力券发行方' PARTY_UNDERWRITER = '算力券承销方' DEBT = '借' CREDIT = '贷' ACTNAME_BUY = '付费' ACTNAME_RECHARGE = '充值' ACTNAME_RECHARGE_ALIPAY = '支付宝充值' ACTNAME_SETTLE = '结算' SALEMODE_DISCOUNT = '折扣' SALEMODE_REBATE = '代付费' SALEMODE_FLOORPRICE = '底价' ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY' ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE' ACTION_RECHARGE = 'RECHARGE' ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE' ACTION_BUY = 'BUY' ACTION_REVERSE_BUY = 'BUY_REVERSE' ACTION_RENEW = 'RENEW' ACTION_RENEW_REVERSE = 'RENEW_REVERSE' ACTION_SETTLE = 'SETTLE' ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE' DBNAME = 'kboss' CUSTOMER_ORG_TYPES = ('2', '3') SALEMODE_LABEL = { '0': SALEMODE_DISCOUNT, '1': SALEMODE_REBATE, '2': SALEMODE_FLOORPRICE, } INCOME_SUBJECTS = ('折扣收入', '底价收入') SUPPLIER_SETTLE_PREFIX = '待结转' _SALEMODE_SQL_OWN = """ SELECT a.salemode, a.settle_mode, b.discount, b.price FROM saleprotocol a, product_salemode b WHERE a.id = b.protocolid AND a.bid_orgid = ${bid_orgid}$ AND (b.productid = ${productid}$ OR b.productid = '*') AND b.providerid = ${providerid}$ AND a.start_date <= ${curdate}$ AND a.end_date > ${curdate}$ AND a.del_flg = '0' AND b.del_flg = '0' ORDER BY b.productid DESC LIMIT 1 """ _SALEMODE_SQL_OFFER = """ SELECT a.salemode, a.settle_mode, b.discount, b.price FROM saleprotocol a, product_salemode b WHERE a.id = b.protocolid AND a.offer_orgid = ${offer_orgid}$ AND a.bid_orgid = ${bid_orgid}$ AND (b.productid = ${productid}$ OR b.productid = '*') AND b.providerid = ${providerid}$ AND a.start_date <= ${curdate}$ AND a.end_date > ${curdate}$ AND a.del_flg = '0' AND b.del_flg = '0' ORDER BY b.productid DESC LIMIT 1 """ def _round_money(v): if v is None: return None return round(float(v), 2) def _is_reverse_op(business_op): """退订/红冲类操作(BUY_REVERSE 等 *_REVERSE)。 数据中退订账单的 bill_detail 方向与正常单一致(均为贷),仅靠 business_op 区分。 退订口径:销售额、利润取反(减少);上游结算/应付供应商仍正向累加(增加)。 """ return str(business_op or '').upper().endswith('_REVERSE') def _salemode_label(code): if code is None: return None return SALEMODE_LABEL.get(str(code), str(code)) def _parse_bool(v, default=False): if v is None: return default if isinstance(v, bool): return v return str(v).strip().lower() in ('1', 'true', 'yes', 'on') def _parse_page(ns, default_page=1, default_size=20, max_size=100): """解析分页,避免 page_size=0 导致 LIMIT 0 无数据。""" try: page_size = int(ns.get('page_size', default_size) or default_size) except (TypeError, ValueError): page_size = default_size try: current_page = int(ns.get('current_page', default_page) or default_page) except (TypeError, ValueError): current_page = default_page page_size = max(1, min(page_size, max_size)) current_page = max(1, current_page) offset = (current_page - 1) * page_size return current_page, page_size, offset def _sql_rows(result): """统一 sqlExe 查询结果为 list[dict]。""" if result is None: return [] if isinstance(result, list): return result if isinstance(result, dict): return [result] return list(result) def _row_get(row, *keys, default=None): """兼容不同驱动返回的大小写字段名。""" if not row: return default for key in keys: if key in row: return row[key] lower = key.lower() for k, v in row.items(): if k.lower() == lower: return v return default async def _check_viewer(sor, accounting_orgid, userid=None): if not userid: return True, None users = await sor.R('users', {'id': userid, 'del_flg': '0'}) if not users: return False, '用户不存在' user_orgid = users[0].get('orgid') if user_orgid == accounting_orgid: return True, None parent = await get_parent_orgid(sor, accounting_orgid) if parent and user_orgid == parent: return True, None return False, '无权查看该机构财务数据' async def _org_name(sor, orgid): if not orgid: return None rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'}) return rows[0]['orgname'] if rows else None async def _is_business_owner(sor, orgid): # 业主机构 parentid 为空串(非 NULL),org_type='0';两者都视为顶级机构。 rows = await sor.sqlExe( """SELECT id FROM organization WHERE id=${id}$ AND del_flg='0' AND ((parentid IS NULL OR parentid='') OR org_type=${owner_type}$)""", {'id': orgid, 'owner_type': OWNER_OGR}, ) return len(rows) > 0 async def _collect_descendant_reseller_ids(sor, root_orgid): """递归收集 root 下所有下级分销商 organization.id(org_type=1)。""" found = [] queue = [root_orgid] seen = {root_orgid} while queue: pid = queue.pop(0) rows = await sor.sqlExe( """SELECT id FROM organization WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""", {'pid': pid, 'org_type': RESELLER_ORG}, ) for r in rows: cid = r['id'] if cid not in seen: seen.add(cid) found.append(cid) queue.append(cid) return found async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers): """ 客户范围 SQL 片段与参数。 直属:cust.parentid = accounting_orgid 含下级分销:直属 + cust.parentid IN (下级分销商 id 列表) """ params = {'accounting_orgid': accounting_orgid} if not include_sub_reseller_customers: return "cust.parentid = ${accounting_orgid}$", params, [] reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid) if not reseller_ids: return "cust.parentid = ${accounting_orgid}$", params, [] in_keys = [] for i, rid in enumerate(reseller_ids): key = 'reseller_%d' % i params[key] = rid in_keys.append('${%s}$' % key) in_sql = ', '.join(in_keys) cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql return cond, params, reseller_ids async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub): if customer_parentid == accounting_orgid: return True if include_sub and customer_parentid in reseller_ids: return True return False async def _fetch_salemode_row(sor, sql, params): rows = await sor.sqlExe(sql, params) return rows[0] if rows else None async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid): """本级应付对象:本机构→供应商;分销→直接上级。""" if await _is_business_owner(sor, accounting_orgid): return { 'settle_upstream_type': 'supplier', 'settle_upstream_orgid': provider_orgid, 'settle_upstream_orgname': await _org_name(sor, provider_orgid), 'immediate_parent_orgid': None, } parent_orgid = await get_parent_orgid(sor, accounting_orgid) return { 'settle_upstream_type': 'parent_org', 'settle_upstream_orgid': parent_orgid, 'settle_upstream_orgname': await _org_name(sor, parent_orgid), 'immediate_parent_orgid': parent_orgid, } async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity): curdate = bill_date if hasattr(curdate, 'strftime'): curdate = curdate.strftime('%Y-%m-%d') elif curdate is not None: curdate = str(curdate)[:10] qty = int(quantity or 1) base = {'providerid': providerid, 'productid': productid, 'curdate': curdate} own = await _fetch_salemode_row( sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid), ) cust = await _fetch_salemode_row( sor, _SALEMODE_SQL_OFFER, dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid), ) if not cust: cust = await _fetch_salemode_row( sor, _SALEMODE_SQL_OFFER, dict(base, offer_orgid=accounting_orgid, bid_orgid='*'), ) parent_orgid = await get_parent_orgid(sor, accounting_orgid) reseller = None if parent_orgid: reseller = await _fetch_salemode_row( sor, _SALEMODE_SQL_OFFER, dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid), ) return { 'parent_orgid': parent_orgid, 'parent_orgname': await _org_name(sor, parent_orgid), 'own_salemode': own.get('salemode') if own else None, 'own_discount': own.get('discount') if own else None, 'own_floor_unit_price': own.get('price') if own else None, 'customer_salemode': cust.get('salemode') if cust else None, 'customer_discount': cust.get('discount') if cust else None, 'customer_floor_unit_price': cust.get('price') if cust else None, 'reseller_discount': reseller.get('discount') if reseller else None, 'reseller_salemode': reseller.get('salemode') if reseller else None, 'reseller_floor_unit_price': reseller.get('price') if reseller else None, 'quantity': qty, } def _estimate_finance(catalog_amount, protocol, is_reverse=False): """未记账:估算本级利润与本级应付(本级成本)。 与已记账口径一致,本级应付取「本级作为采购方(bid)的协议」即 own_*, 对业主机构与分销商统一(own_discount 即上级给本级的折扣/底价)。 退订(is_reverse):利润取反(减少);上游结算仍正向(增加)。 """ catalog_amount = float(catalog_amount or 0) qty = protocol.get('quantity') or 1 profit = None settle_upstream = None own_mode = protocol.get('own_salemode') cust_mode = protocol.get('customer_salemode') own_disc = protocol.get('own_discount') cust_disc = protocol.get('customer_discount') if own_mode == '0' or cust_mode == '0': if own_disc is not None and cust_disc is not None: profit = catalog_amount * (float(cust_disc) - float(own_disc)) if own_disc is not None: settle_upstream = catalog_amount * float(own_disc) elif own_mode == '2' or cust_mode == '2': own_price = protocol.get('own_floor_unit_price') cust_price = protocol.get('customer_floor_unit_price') if own_price is not None and cust_price is not None: profit = (float(cust_price) - float(own_price)) * qty if own_price is not None: settle_upstream = float(own_price) * qty if is_reverse and profit is not None: profit = -profit return { 'profit_amount': _round_money(profit), 'settle_upstream_amount': _round_money(settle_upstream), 'amount_source': 'estimated', } async def _bill_detail_rows(sor, billid): """单表读 bill_detail,优先 sor.R(避免 sqlExe 多表问题)。""" rows = await sor.R('bill_detail', {'billid': billid, 'del_flg': '0'}) if rows: return rows return _sql_rows(await sor.sqlExe( """SELECT accounting_orgid, subjectname, accounting_dir, participantid, participanttype, amount FROM bill_detail WHERE billid=${billid}$ AND del_flg='0'""", {'billid': billid}, )) async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_reverse=False): """已记账:从 bill_detail 取本级利润与本级应付。 统一口径(业主机构与分销商一致): - profit_amount = 本级账本「折扣收入/底价收入」贷方合计 - settle_upstream = 本级账本「待结转*销售收入」贷方合计(本级成本/本级应付上级) 退订(is_reverse):利润取反(减少);上游结算仍正向累加(增加)。 """ rows = await _bill_detail_rows(sor, billid) profit = 0.0 settle_upstream = 0.0 legs = [] for r in rows: legs.append({ 'accounting_orgid': r['accounting_orgid'], 'subjectname': r['subjectname'], 'accounting_dir': r['accounting_dir'], 'participanttype': r.get('participanttype'), 'participantid': r.get('participantid'), 'amount': _round_money(r['amount']), }) amt = float(r['amount'] or 0) book = r['accounting_orgid'] subj = r['subjectname'] or '' direction = r['accounting_dir'] if book == accounting_orgid and direction == '贷': if subj in INCOME_SUBJECTS: profit += amt elif subj.startswith(SUPPLIER_SETTLE_PREFIX): settle_upstream += amt if is_reverse: profit = -profit return { 'profit_amount': _round_money(profit), 'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0), 'amount_source': 'bill_detail', 'bill_detail_legs': legs, } async def _build_report_row(sor, row, accounting_orgid, is_owner): bill_id = _row_get(row, 'bill_id') customerid = _row_get(row, 'customerid') providerid = _row_get(row, 'providerid') productid = _row_get(row, 'productid') bill_date = _row_get(row, 'bill_date') quantity = _row_get(row, 'quantity') or 1 catalog_amount = _round_money(_row_get(row, 'catalog_amount')) customer_pay = _round_money(_row_get(row, 'customer_pay_amount')) customer_parentid = _row_get(row, 'customer_parentid') is_reverse = _is_reverse_op(_row_get(row, 'business_op')) if is_reverse and customer_pay is not None: # 退订:客户实付(销售额)取负,使汇总相应减少 customer_pay = _round_money(-customer_pay) protocol = await _protocol_snapshot( sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity, ) settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid) bill_state = _row_get(row, 'bill_state') if str(bill_state) == '1': amounts = await _finance_from_bill_detail(sor, bill_id, accounting_orgid, is_reverse) else: amounts = _estimate_finance(catalog_amount, protocol, is_reverse) amounts['bill_detail_legs'] = [] product_name = _row_get(row, 'product_name') if not product_name and productid: prows = await sor.R('product', {'id': productid, 'del_flg': '0'}) if prows: product_name = prows[0].get('name') provider_name = _row_get(row, 'provider_name') if not provider_name and providerid: prov_rows = await sor.sqlExe( "SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1", {'orgid': providerid}, ) if prov_rows: provider_name = prov_rows[0].get('name') serving_reseller_id = None serving_reseller_name = None is_direct = customer_parentid == accounting_orgid if not is_direct and customer_parentid: serving_reseller_id = customer_parentid serving_reseller_name = await _org_name(sor, customer_parentid) order_date = _row_get(row, 'order_date') servicename = _row_get(row, 'servicename') return { 'bill_id': bill_id, 'order_id': _row_get(row, 'order_id'), 'bill_date': str(bill_date)[:10] if bill_date else None, 'order_date': str(order_date)[:19] if order_date else None, 'business_op': _row_get(row, 'business_op'), 'bill_state': bill_state, 'accounted': str(bill_state) == '1', 'customer': { 'id': customerid, 'name': _row_get(row, 'customer_name'), 'parent_orgid': customer_parentid, 'is_direct_customer': is_direct, 'serving_reseller': { 'id': serving_reseller_id, 'name': serving_reseller_name, } if serving_reseller_id else None, }, 'product': { 'id': productid, 'name': product_name or servicename, 'servicename': servicename, }, 'provider': { 'orgid': providerid, 'name': provider_name, }, 'quantity': quantity, 'pricing': { 'catalog_amount': catalog_amount, 'list_price_unit': _round_money(_row_get(row, 'list_price')), 'order_discount': _round_money(_row_get(row, 'order_discount')) if _row_get(row, 'order_discount') is not None else None, 'order_unit_price': _round_money(_row_get(row, 'order_unit_price')), 'customer_pay_amount': customer_pay, }, 'viewer_org': { 'id': accounting_orgid, 'is_business_owner': is_owner, 'immediate_parent': { 'id': settle_meta.get('immediate_parent_orgid'), 'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')), } if settle_meta.get('immediate_parent_orgid') else None, }, 'protocol': { 'parent_salemode': _salemode_label(protocol.get('own_salemode')), 'parent_discount_to_us': _round_money(protocol.get('own_discount')), 'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')), 'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')), 'our_discount_to_customer': _round_money(protocol.get('customer_discount')), 'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')), 'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')), }, 'finance': { 'profit_amount': amounts['profit_amount'], 'settle_upstream_amount': amounts['settle_upstream_amount'], 'settle_upstream_type': settle_meta['settle_upstream_type'], 'settle_upstream_target': { 'id': settle_meta['settle_upstream_orgid'], 'name': settle_meta['settle_upstream_orgname'], }, 'amount_source': amounts['amount_source'], }, 'bill_detail_legs': amounts.get('bill_detail_legs', []), } def _bill_core_from_clause(): """统计/查 ID 用最小 JOIN(兼容无 ordergoodsid 字段的库)。""" return """ FROM bill b INNER JOIN bz_order o ON b.orderid = o.id INNER JOIN organization cust ON b.customerid = cust.id """ def _bill_ids_sql(where_sql): """不在 SQL 里写 LIMIT:部分环境 sqlExe 对 LIMIT 会返回空列表。""" return f""" SELECT b.id AS bill_id {_bill_core_from_clause()} WHERE {where_sql} ORDER BY b.bill_date DESC, b.create_at DESC """ def _sql_quote_bill_id(bill_id): if bill_id is None: return None s = str(bill_id).strip() if not s: return None for ch in s: if not (ch.isalnum() or ch in ('_', '-')): return None return "'" + s.replace("'", "''") + "'" async def _fetch_overview_bill_rows(sor, where_sql, params, max_bills): """ 概览取数:先用仅 bill_id 的 SQL(已验证可用),再 sor.R 组装明细。 多列 + JOIN 的 _bill_scope_list_sql 在本环境 sqlExe 会返回空列表。 """ id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) all_ids = [] seen = set() for r in id_rows: bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) if bid and bid not in seen: seen.add(bid) all_ids.append(bid) total_ids = len(all_ids) truncated = total_ids > max_bills if truncated: all_ids = all_ids[:max_bills] rows = [] for bid in all_ids: row = await _fetch_bill_row_via_R(sor, bid) if row: rows.append(row) return { 'total_ids': total_ids, 'truncated': truncated, 'id_query_len': len(id_rows), 'rows': rows, } def _new_amount_bucket(): return { 'sales_total': 0.0, 'profit_total': 0.0, 'settle_upstream_total': 0.0, 'bill_count': 0, } def _bucket_add(bucket, sales, profit, settle): bucket['sales_total'] += float(sales or 0) bucket['profit_total'] += float(profit or 0) bucket['settle_upstream_total'] += float(settle or 0) bucket['bill_count'] += 1 def _bucket_round(bucket): return { 'sales_total': _round_money(bucket['sales_total']), 'profit_total': _round_money(bucket['profit_total']), 'settle_upstream_total': _round_money(bucket['settle_upstream_total']), 'bill_count': bucket['bill_count'], } def _norm_org_id(orgid): if orgid is None: return None s = str(orgid).strip() return s if s else None def _customer_segment(customer_parentid, accounting_orgid, reseller_id_set): pid = _norm_org_id(customer_parentid) acc = _norm_org_id(accounting_orgid) if pid == acc: return 'direct_customers' if pid in reseller_id_set: return 'from_sub_resellers' return None async def _bill_finance_amounts(sor, row, accounting_orgid): """单笔:销售额、本级利润、本级应付上级/供应商。 退订(BUY_REVERSE 等):销售额、利润取负(减少);应付上级仍正向(增加)。 """ sales = float(_row_get(row, 'customer_pay_amount') or 0) bill_id = _row_get(row, 'bill_id') bill_state = _row_get(row, 'bill_state') quantity = int(_row_get(row, 'quantity') or 1) is_reverse = _is_reverse_op(_row_get(row, 'business_op')) if is_reverse: sales = -sales if str(bill_state) == '1': fin = await _finance_from_bill_detail(sor, bill_id, accounting_orgid, is_reverse) else: catalog = float(_row_get(row, 'catalog_amount') or 0) protocol = await _protocol_snapshot( sor, accounting_orgid, _row_get(row, 'customerid'), _row_get(row, 'providerid'), _row_get(row, 'productid'), _row_get(row, 'bill_date'), quantity, ) fin = _estimate_finance(catalog, protocol, is_reverse) profit = float(fin.get('profit_amount') or 0) settle = float(fin.get('settle_upstream_amount') or 0) return sales, profit, settle, fin.get('amount_source', 'unknown') async def _provider_name_cache(sor, cache, provider_orgid): if not provider_orgid: return None if provider_orgid in cache: return cache[provider_orgid] name = None rows = await sor.sqlExe( "SELECT name FROM provider WHERE orgid=${oid}$ AND del_flg='0' LIMIT 1", {'oid': provider_orgid}, ) if rows: name = rows[0].get('name') cache[provider_orgid] = name return name async def _product_name_cache(sor, cache, productid): if not productid: return None if productid in cache: return cache[productid] name = None rows = await sor.R('product', {'id': productid, 'del_flg': '0'}) if rows: name = rows[0].get('name') cache[productid] = name return name def _count_sql(where_sql): return f""" SELECT COUNT(*) AS total_count {_bill_core_from_clause()} WHERE {where_sql} """ def _normalize_bill_id(bill_id): if bill_id is None: return None s = str(bill_id).strip() return s if s else None async def _fetch_bill_row_via_R(sor, bill_id): """ 用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。 已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。 """ bid = _normalize_bill_id(bill_id) if not bid: return None bills = await sor.R('bill', {'id': bid, 'del_flg': '0'}) if not bills: bills = await sor.R('bill', {'id': bid}) if not bills: return None b = bills[0] o = {} oid = b.get('orderid') if oid: orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'}) if not orders: orders = await sor.R('bz_order', {'id': oid}) if orders: o = orders[0] cust = {} cid = b.get('customerid') if cid: custs = await sor.R('organization', {'id': cid, 'del_flg': '0'}) if not custs: custs = await sor.R('organization', {'id': cid}) if custs: cust = custs[0] og = {} ogid = b.get('ordergoodsid') if ogid: ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'}) if ogs: og = ogs[0] if not og and oid: ogs = await sor.sqlExe( """SELECT list_price, discount, price FROM order_goods WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""", {'oid': oid}, ) if ogs: og = ogs[0] return { 'bill_id': b.get('id'), 'order_id': oid, 'customerid': cid, 'productid': b.get('productid'), 'providerid': b.get('providerid'), 'bill_date': b.get('bill_date'), 'bill_state': b.get('bill_state'), 'catalog_amount': b.get('provider_amt'), 'customer_pay_amount': b.get('amount'), 'quantity': b.get('quantity'), 'order_date': o.get('order_date'), 'business_op': b.get('business_op') or o.get('business_op'), 'servicename': o.get('servicename'), 'customer_name': cust.get('orgname'), 'customer_parentid': cust.get('parentid'), 'list_price': og.get('list_price'), 'order_discount': og.get('discount'), 'order_unit_price': og.get('price'), } async def _fetch_bill_rows_by_ids(sor, bill_ids): rows = [] for bid in bill_ids: one = await _fetch_bill_row_via_R(sor, bid) if one: rows.append(one) return rows async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size): """ 先查全部符合条件的 bill_id(通常数量可控),再在 Python 分页; 明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。 """ id_sql = _bill_ids_sql(where_sql) id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params)) all_ids = [] seen = set() for r in id_rows: bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) if bid and bid not in seen: seen.add(bid) all_ids.append(bid) total = len(all_ids) page_ids = all_ids[offset: offset + page_size] if not page_ids: return total, [] ordered = await _fetch_bill_rows_by_ids(sor, page_ids) return total, ordered async def _finance_report_debug(sor, where_sql, params, page_size, offset): """debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。""" dbg = {} try: c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params)) dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None except Exception as e: dbg['count_error'] = str(e) try: ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) dbg['ids_query_len'] = len(ids) except Exception as e: dbg['ids_error'] = str(e) try: lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset) lim = _sql_rows(await sor.sqlExe(lim_sql, params)) dbg['ids_with_limit_len'] = len(lim) except Exception as e: dbg['ids_with_limit_error'] = str(e) try: page_ids = [] id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params)) if id_rows: dbg['first_id_row_keys'] = list(id_rows[0].keys()) dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID') for r in id_rows[offset: offset + page_size]: bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID')) if bid: page_ids.append(bid) dbg['page_ids_len'] = len(page_ids) if page_ids: dbg['sample_page_id'] = page_ids[0] rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'}) dbg['sor_R_bill_count'] = len(rb) if rb else 0 one = await _fetch_bill_row_via_R(sor, page_ids[0]) dbg['sor_R_row_ok'] = one is not None except Exception as e: dbg['detail_fetch_error'] = str(e) return dbg async def finance_order_report(ns=None): """ 管理人员 — 客户订单财务列表(分页)。 入参 ns: accounting_orgid (必填) 账本机构 include_sub_reseller_customers (可选) 默认 false;true 时含下级分销商的客户订单 userid, start_date, end_date, customerid, productid, order_id, bill_state current_page (默认1), page_size (默认20, 最大100) 返回 finance.settle_upstream_*(统一口径,取本级账本「待结转*销售收入」贷方): - 本机构:应付供应商 - 分销商:应付直接上级机构(金额=本级成本,与本级账本待结转一致) """ ns = ns or {} accounting_orgid = ns.get('accounting_orgid') if not accounting_orgid: return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) current_page, page_size, offset = _parse_page(ns) db = DBPools() async with db.sqlorContext(DBNAME) as sor: ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) if not ok: return {'status': False, 'msg': err} scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( sor, accounting_orgid, include_sub, ) is_owner = await _is_business_owner(sor, accounting_orgid) conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] params = dict(scope_params) if ns.get('start_date'): conditions.append('b.bill_date >= ${start_date}$') params['start_date'] = ns['start_date'] if ns.get('end_date'): conditions.append('b.bill_date <= ${end_date}$') params['end_date'] = ns['end_date'] if ns.get('customerid'): conditions.append('b.customerid = ${customerid}$') params['customerid'] = ns['customerid'] if ns.get('productid'): conditions.append('b.productid = ${productid}$') params['productid'] = ns['productid'] if ns.get('order_id'): conditions.append('b.orderid = ${order_id}$') params['order_id'] = ns['order_id'] if ns.get('bill_state') is not None: conditions.append('b.bill_state = ${bill_state}$') params['bill_state'] = str(ns['bill_state']) where_sql = ' AND '.join(conditions) total_count, rows = await _fetch_bill_rows_page( sor, where_sql, params, offset, page_size, ) if total_count > 0 and offset >= total_count: current_page = 1 offset = 0 total_count, rows = await _fetch_bill_rows_page( sor, where_sql, params, offset, page_size, ) debug_info = None if _parse_bool(ns.get('debug'), False): debug_info = await _finance_report_debug( sor, where_sql, params, page_size, offset, ) debug_info['page_ids_fetched'] = len(rows) items = [] sum_profit = 0.0 sum_pay = 0.0 sum_upstream = 0.0 row_errors = [] for row in rows: try: item = await _build_report_row(sor, row, accounting_orgid, is_owner) except Exception as exc: row_errors.append({ 'bill_id': _row_get(row, 'bill_id'), 'error': str(exc), }) continue items.append(item) fin = item['finance'] if fin.get('profit_amount') is not None: sum_profit += fin['profit_amount'] pay = item['pricing'].get('customer_pay_amount') if pay is not None: sum_pay += pay up = fin.get('settle_upstream_amount') if up is not None: sum_upstream += up return { 'status': True, 'msg': 'ok', 'data': { 'accounting_orgid': accounting_orgid, 'accounting_orgname': await _org_name(sor, accounting_orgid), 'is_business_owner': is_owner, 'customer_scope': { 'include_sub_reseller_customers': include_sub, 'descendant_reseller_ids': reseller_ids, 'descendant_reseller_count': len(reseller_ids), }, 'total_count': total_count, 'current_page': current_page, 'page_size': page_size, 'summary': { 'customer_pay_total': _round_money(sum_pay), 'profit_total': _round_money(sum_profit), 'settle_upstream_total': _round_money(sum_upstream), }, 'items': items, 'row_errors': row_errors, 'list_row_count': len(rows), 'debug': debug_info, }, } async def finance_order_report_detail(ns=None): """ 管理人员 — 单笔账单财务明细。 入参:accounting_orgid, bill_id, include_sub_reseller_customers(与列表一致,用于校验客户范围) """ ns = ns or {} accounting_orgid = ns.get('accounting_orgid') bill_id = ns.get('bill_id') if not accounting_orgid or not bill_id: return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'} include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False) db = DBPools() async with db.sqlorContext(DBNAME) as sor: ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) if not ok: return {'status': False, 'msg': err} _, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub) is_owner = await _is_business_owner(sor, accounting_orgid) rows = await _fetch_bill_rows_by_ids(sor, [bill_id]) if not rows: return {'status': False, 'msg': '账单不存在'} row = rows[0] if not await _customer_in_scope( _row_get(row, 'customer_parentid'), accounting_orgid, reseller_ids, include_sub, ): return {'status': False, 'msg': '该账单不在当前查询客户范围内'} detail = await _build_report_row(sor, row, accounting_orgid, is_owner) detail['customer_scope'] = {'include_sub_reseller_customers': include_sub} return {'status': True, 'msg': 'ok', 'data': detail} async def finance_billing_overview(ns=None): """ 计费概览页 — 销售/利润/应付上级 汇总(单接口,建议概览页只调此接口)。 入参 ns: accounting_orgid (必填) 当前账本机构 include_sub_reseller_customers (可选) 默认 true;是否含下级分销商客户带来的金额 start_date / end_date (可选) 按 bill.bill_date 筛选 bill_state (可选) 仅统计指定账单状态,默认不筛(含未记账则利润/结算为估算) only_accounted (可选) true 时仅 bill_state=1(推荐概览页使用) userid (可选) 权限 max_bills (可选) 默认 5000,超出则截断并返回 truncated=true 口径(与订单明细接口一致): sales_total = 客户实付合计 bill.amount profit_total = 本级账本折扣收入+底价收入(bill_detail)或协议估算 settle_upstream_total= 本级账本「待结转*销售收入」贷方合计(本级成本/应付上级,业主与分销统一) 分段: direct_customers 直属客户(cust.parentid = accounting_orgid) from_sub_resellers 下级分销商的客户(cust.parentid in 下级分销 org) 返回 by_provider_product:按供应商 orgid + 产品 id 拆分上述三项及合计。 """ ns = ns or {} accounting_orgid = ns.get('accounting_orgid') if not accounting_orgid: return {'status': False, 'msg': '缺少 accounting_orgid(当前账本机构)'} include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), True) only_accounted = _parse_bool(ns.get('only_accounted'), False) try: max_bills = int(ns.get('max_bills', 50000) or 50000) except (TypeError, ValueError): max_bills = 50000 max_bills = max(100, min(max_bills, 200000)) db = DBPools() async with db.sqlorContext(DBNAME) as sor: ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid')) if not ok: return {'status': False, 'msg': err} scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql( sor, accounting_orgid, include_sub, ) is_owner = await _is_business_owner(sor, accounting_orgid) conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql] params = dict(scope_params) if ns.get('start_date'): conditions.append('b.bill_date >= ${start_date}$') params['start_date'] = ns['start_date'] if ns.get('end_date'): conditions.append('b.bill_date <= ${end_date}$') params['end_date'] = ns['end_date'] if only_accounted: conditions.append("b.bill_state = '1'") elif ns.get('bill_state') is not None: conditions.append('b.bill_state = ${bill_state}$') params['bill_state'] = str(ns['bill_state']) where_sql = ' AND '.join(conditions) fetch = await _fetch_overview_bill_rows(sor, where_sql, params, max_bills) bill_rows = fetch['rows'] truncated = fetch['truncated'] reseller_id_set = {_norm_org_id(r) for r in reseller_ids if _norm_org_id(r)} totals = { 'direct_customers': _new_amount_bucket(), 'from_sub_resellers': _new_amount_bucket(), 'grand_total': _new_amount_bucket(), } by_pp = {} provider_names = {} product_names = {} skipped = 0 errors = [] for row in bill_rows: seg = _customer_segment( _row_get(row, 'customer_parentid'), accounting_orgid, reseller_id_set, ) if seg is None: skipped += 1 continue try: sales, profit, settle, _src = await _bill_finance_amounts( sor, row, accounting_orgid, ) except Exception as exc: errors.append({ 'bill_id': _row_get(row, 'bill_id'), 'error': str(exc), }) continue _bucket_add(totals[seg], sales, profit, settle) _bucket_add(totals['grand_total'], sales, profit, settle) pid = _row_get(row, 'providerid') or '' prid = _row_get(row, 'productid') or '' key = (pid, prid) if key not in by_pp: by_pp[key] = { 'provider_orgid': pid, 'product_id': prid, 'direct_customers': _new_amount_bucket(), 'from_sub_resellers': _new_amount_bucket(), 'total': _new_amount_bucket(), } _bucket_add(by_pp[key][seg], sales, profit, settle) _bucket_add(by_pp[key]['total'], sales, profit, settle) breakdown = [] for (pid, prid), node in by_pp.items(): breakdown.append({ 'provider': { 'orgid': pid, 'name': await _provider_name_cache(sor, provider_names, pid), }, 'product': { 'id': prid, 'name': await _product_name_cache(sor, product_names, prid), }, 'direct_customers': _bucket_round(node['direct_customers']), 'from_sub_resellers': _bucket_round(node['from_sub_resellers']), 'total': _bucket_round(node['total']), }) breakdown.sort(key=lambda x: float(x['total']['sales_total'] or 0), reverse=True) settle_target = await _settle_upstream_meta(sor, accounting_orgid, None) if is_owner: settle_label = '应付供应商' else: settle_label = '应付上级机构' data = { 'accounting_orgid': accounting_orgid, 'accounting_orgname': await _org_name(sor, accounting_orgid), 'is_business_owner': is_owner, 'settle_upstream_label': settle_label, 'settle_upstream_target': { 'type': settle_target['settle_upstream_type'], 'orgid': settle_target['settle_upstream_orgid'], 'name': settle_target['settle_upstream_orgname'], }, 'period': { 'start_date': ns.get('start_date'), 'end_date': ns.get('end_date'), }, 'customer_scope': { 'include_sub_reseller_customers': include_sub, 'descendant_reseller_ids': reseller_ids, 'descendant_reseller_count': len(reseller_ids), }, 'filters': { 'only_accounted': only_accounted, 'bill_state': ns.get('bill_state'), }, 'bill_count': len(bill_rows), 'bill_id_count': fetch['total_ids'], 'id_query_len': fetch['id_query_len'], 'truncated': truncated, 'max_bills': max_bills, 'skipped_out_of_scope': skipped, 'totals': { 'direct_customers': _bucket_round(totals['direct_customers']), 'from_sub_resellers': _bucket_round(totals['from_sub_resellers']), 'grand_total': _bucket_round(totals['grand_total']), }, 'by_provider_product': breakdown, 'errors': errors, } if _parse_bool(ns.get('debug'), False) or ( data['totals']['grand_total']['bill_count'] == 0 and fetch['id_query_len'] > 0 ): data['debug'] = { 'id_query_len': fetch['id_query_len'], 'bill_id_count': fetch['total_ids'], 'overview_rows_len': len(bill_rows), 'skipped_out_of_scope': skipped, 'sample_parentid': _row_get(bill_rows[0], 'customer_parentid') if bill_rows else None, } return {'status': True, 'msg': 'ok', 'data': data} # _report = params_kw.get('report') or params_kw.get('api') or 'order_list' _report = 'overview' if _report in ('overview', 'billing_overview', 'finance_billing_overview'): ret = await finance_billing_overview(params_kw) elif _report in ('detail', 'order_detail'): ret = await finance_order_report_detail(params_kw) else: ret = await finance_order_report(params_kw) return ret