kboss/b/bill/finance_order_report.dspy
2026-05-29 11:40:24 +08:00

1240 lines
44 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
管理人员财务订单报表接口。
视角:以 accounting_orgid账本机构为查看主体展示客户订单的定价、利润、本级应付结算。
依赖运行环境DBPools, sor.sqlExe, sor.R
业务口径(已确认):
- product_salemode.providerid = provider.orgid
- 分销商不直接对供应商结算仅本机构parentid 为空)对供应商结算
- 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算)
- settle_upstream_amount本级应付本机构→供应商分销→直接上级机构
- profit_amount本级账本「折扣收入」「底价收入」贷方合计
- include_sub_reseller_customers是否包含下级分销商发展的客户
"""
async def get_parent_orgid(sor, orgid):
sql = """select a.id from organization a, organization b
where b.parentid = a.id
and a.del_flg = '0'
and b.del_flg = '0'
and b.id = ${orgid}$"""
recs = await sor.sqlExe(sql, {'orgid':orgid})
if len(recs) == 0:
return None
return recs[0]['id']
DBNAME = 'kboss'
RESELLER_ORG = '1'
OWNER_OGR = '0'
CORP_CUSTOMER = '2'
PERSONAL = '3'
PROVIDER = '4'
PUBLISHER = '5'
UNDERWRITER = '6'
PARTY_OWNER = '本机构'
PARTY_CUSTOMER = '客户'
PARTY_RESELLER = '分销商'
PARTY_PROVIDER = '供应商'
PARTY_PUBLISHER = '算力券发行方'
PARTY_UNDERWRITER = '算力券承销方'
DEBT = '借'
CREDIT = '贷'
ACTNAME_BUY = '付费'
ACTNAME_RECHARGE = '充值'
ACTNAME_RECHARGE_ALIPAY = '支付宝充值'
ACTNAME_SETTLE = '结算'
SALEMODE_DISCOUNT = '折扣'
SALEMODE_REBATE = '代付费'
SALEMODE_FLOORPRICE = '底价'
ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY'
ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE'
ACTION_RECHARGE = 'RECHARGE'
ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE'
ACTION_BUY = 'BUY'
ACTION_REVERSE_BUY = 'BUY_REVERSE'
ACTION_RENEW = 'RENEW'
ACTION_RENEW_REVERSE = 'RENEW_REVERSE'
ACTION_SETTLE = 'SETTLE'
ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE'
DBNAME = 'kboss'
CUSTOMER_ORG_TYPES = ('2', '3')
SALEMODE_LABEL = {
'0': SALEMODE_DISCOUNT,
'1': SALEMODE_REBATE,
'2': SALEMODE_FLOORPRICE,
}
INCOME_SUBJECTS = ('折扣收入', '底价收入')
PARENT_SETTLE_SUBJECT = '分销商存放资金'
SUPPLIER_SETTLE_PREFIX = '待结转'
_SALEMODE_SQL_OWN = """
SELECT a.salemode, a.settle_mode, b.discount, b.price
FROM saleprotocol a, product_salemode b
WHERE a.id = b.protocolid
AND a.bid_orgid = ${bid_orgid}$
AND (b.productid = ${productid}$ OR b.productid = '*')
AND b.providerid = ${providerid}$
AND a.start_date <= ${curdate}$
AND a.end_date > ${curdate}$
AND a.del_flg = '0'
AND b.del_flg = '0'
ORDER BY b.productid DESC
LIMIT 1
"""
_SALEMODE_SQL_OFFER = """
SELECT a.salemode, a.settle_mode, b.discount, b.price
FROM saleprotocol a, product_salemode b
WHERE a.id = b.protocolid
AND a.offer_orgid = ${offer_orgid}$
AND a.bid_orgid = ${bid_orgid}$
AND (b.productid = ${productid}$ OR b.productid = '*')
AND b.providerid = ${providerid}$
AND a.start_date <= ${curdate}$
AND a.end_date > ${curdate}$
AND a.del_flg = '0'
AND b.del_flg = '0'
ORDER BY b.productid DESC
LIMIT 1
"""
def _round_money(v):
if v is None:
return None
return round(float(v), 2)
def _salemode_label(code):
if code is None:
return None
return SALEMODE_LABEL.get(str(code), str(code))
def _parse_bool(v, default=False):
if v is None:
return default
if isinstance(v, bool):
return v
return str(v).strip().lower() in ('1', 'true', 'yes', 'on')
def _parse_page(ns, default_page=1, default_size=20, max_size=100):
"""解析分页,避免 page_size=0 导致 LIMIT 0 无数据。"""
try:
page_size = int(ns.get('page_size', default_size) or default_size)
except (TypeError, ValueError):
page_size = default_size
try:
current_page = int(ns.get('current_page', default_page) or default_page)
except (TypeError, ValueError):
current_page = default_page
page_size = max(1, min(page_size, max_size))
current_page = max(1, current_page)
offset = (current_page - 1) * page_size
return current_page, page_size, offset
def _sql_rows(result):
"""统一 sqlExe 查询结果为 list[dict]。"""
if result is None:
return []
if isinstance(result, list):
return result
if isinstance(result, dict):
return [result]
return list(result)
def _row_get(row, *keys, default=None):
"""兼容不同驱动返回的大小写字段名。"""
if not row:
return default
for key in keys:
if key in row:
return row[key]
lower = key.lower()
for k, v in row.items():
if k.lower() == lower:
return v
return default
async def _check_viewer(sor, accounting_orgid, userid=None):
if not userid:
return True, None
users = await sor.R('users', {'id': userid, 'del_flg': '0'})
if not users:
return False, '用户不存在'
user_orgid = users[0].get('orgid')
if user_orgid == accounting_orgid:
return True, None
parent = await get_parent_orgid(sor, accounting_orgid)
if parent and user_orgid == parent:
return True, None
return False, '无权查看该机构财务数据'
async def _org_name(sor, orgid):
if not orgid:
return None
rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'})
return rows[0]['orgname'] if rows else None
async def _is_business_owner(sor, orgid):
rows = await sor.sqlExe(
"SELECT id FROM organization WHERE id=${id}$ AND parentid IS NULL AND del_flg='0'",
{'id': orgid},
)
return len(rows) > 0
async def _collect_descendant_reseller_ids(sor, root_orgid):
"""递归收集 root 下所有下级分销商 organization.idorg_type=1。"""
found = []
queue = [root_orgid]
seen = {root_orgid}
while queue:
pid = queue.pop(0)
rows = await sor.sqlExe(
"""SELECT id FROM organization
WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""",
{'pid': pid, 'org_type': RESELLER_ORG},
)
for r in rows:
cid = r['id']
if cid not in seen:
seen.add(cid)
found.append(cid)
queue.append(cid)
return found
async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers):
"""
客户范围 SQL 片段与参数。
直属cust.parentid = accounting_orgid
含下级分销:直属 + cust.parentid IN (下级分销商 id 列表)
"""
params = {'accounting_orgid': accounting_orgid}
if not include_sub_reseller_customers:
return "cust.parentid = ${accounting_orgid}$", params, []
reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid)
if not reseller_ids:
return "cust.parentid = ${accounting_orgid}$", params, []
in_keys = []
for i, rid in enumerate(reseller_ids):
key = 'reseller_%d' % i
params[key] = rid
in_keys.append('${%s}$' % key)
in_sql = ', '.join(in_keys)
cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql
return cond, params, reseller_ids
async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub):
if customer_parentid == accounting_orgid:
return True
if include_sub and customer_parentid in reseller_ids:
return True
return False
async def _fetch_salemode_row(sor, sql, params):
rows = await sor.sqlExe(sql, params)
return rows[0] if rows else None
async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid):
"""本级应付对象:本机构→供应商;分销→直接上级。"""
if await _is_business_owner(sor, accounting_orgid):
return {
'settle_upstream_type': 'supplier',
'settle_upstream_orgid': provider_orgid,
'settle_upstream_orgname': await _org_name(sor, provider_orgid),
'immediate_parent_orgid': None,
}
parent_orgid = await get_parent_orgid(sor, accounting_orgid)
return {
'settle_upstream_type': 'parent_org',
'settle_upstream_orgid': parent_orgid,
'settle_upstream_orgname': await _org_name(sor, parent_orgid),
'immediate_parent_orgid': parent_orgid,
}
async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity):
curdate = bill_date
if hasattr(curdate, 'strftime'):
curdate = curdate.strftime('%Y-%m-%d')
elif curdate is not None:
curdate = str(curdate)[:10]
qty = int(quantity or 1)
base = {'providerid': providerid, 'productid': productid, 'curdate': curdate}
own = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid),
)
cust = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OFFER,
dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid),
)
if not cust:
cust = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OFFER,
dict(base, offer_orgid=accounting_orgid, bid_orgid='*'),
)
parent_orgid = await get_parent_orgid(sor, accounting_orgid)
reseller = None
if parent_orgid:
reseller = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OFFER,
dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid),
)
return {
'parent_orgid': parent_orgid,
'parent_orgname': await _org_name(sor, parent_orgid),
'own_salemode': own.get('salemode') if own else None,
'own_discount': own.get('discount') if own else None,
'own_floor_unit_price': own.get('price') if own else None,
'customer_salemode': cust.get('salemode') if cust else None,
'customer_discount': cust.get('discount') if cust else None,
'customer_floor_unit_price': cust.get('price') if cust else None,
'reseller_discount': reseller.get('discount') if reseller else None,
'reseller_salemode': reseller.get('salemode') if reseller else None,
'reseller_floor_unit_price': reseller.get('price') if reseller else None,
'quantity': qty,
}
def _estimate_finance(catalog_amount, protocol, is_owner):
"""未记账:估算本级利润与本级应付(不含代付费单独字段)。"""
catalog_amount = float(catalog_amount or 0)
qty = protocol.get('quantity') or 1
profit = None
settle_upstream = None
own_mode = protocol.get('own_salemode')
cust_mode = protocol.get('customer_salemode')
own_disc = protocol.get('own_discount')
cust_disc = protocol.get('customer_discount')
reseller_disc = protocol.get('reseller_discount')
if own_mode == '0' or cust_mode == '0':
if own_disc is not None and cust_disc is not None:
profit = catalog_amount * (float(cust_disc) - float(own_disc))
if is_owner:
if own_disc is not None:
settle_upstream = catalog_amount * float(own_disc)
elif reseller_disc is not None:
settle_upstream = catalog_amount * float(reseller_disc)
elif own_mode == '2' or cust_mode == '2':
own_price = protocol.get('own_floor_unit_price')
cust_price = protocol.get('customer_floor_unit_price')
reseller_price = protocol.get('reseller_floor_unit_price')
if own_price is not None and cust_price is not None:
profit = (float(cust_price) - float(own_price)) * qty
if is_owner and own_price is not None:
settle_upstream = float(own_price) * qty
elif not is_owner and reseller_price is not None:
settle_upstream = float(reseller_price) * qty
elif not is_owner and cust_price is not None:
settle_upstream = float(cust_price) * qty
return {
'profit_amount': _round_money(profit),
'settle_upstream_amount': _round_money(settle_upstream),
'amount_source': 'estimated',
}
async def _bill_detail_rows(sor, billid):
"""单表读 bill_detail优先 sor.R避免 sqlExe 多表问题)。"""
rows = await sor.R('bill_detail', {'billid': billid, 'del_flg': '0'})
if rows:
return rows
return _sql_rows(await sor.sqlExe(
"""SELECT accounting_orgid, subjectname, accounting_dir,
participantid, participanttype, amount
FROM bill_detail WHERE billid=${billid}$ AND del_flg='0'""",
{'billid': billid},
))
async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_owner, parent_orgid):
"""已记账:从 bill_detail 取本级利润与本级应付。"""
rows = await _bill_detail_rows(sor, billid)
profit = 0.0
settle_upstream = 0.0
legs = []
for r in rows:
legs.append({
'accounting_orgid': r['accounting_orgid'],
'subjectname': r['subjectname'],
'accounting_dir': r['accounting_dir'],
'participanttype': r.get('participanttype'),
'participantid': r.get('participantid'),
'amount': _round_money(r['amount']),
})
amt = float(r['amount'] or 0)
book = r['accounting_orgid']
subj = r['subjectname'] or ''
direction = r['accounting_dir']
if book == accounting_orgid and subj in INCOME_SUBJECTS and direction == '贷':
profit += amt
if is_owner:
if (
book == accounting_orgid
and subj.startswith(SUPPLIER_SETTLE_PREFIX)
and direction == '贷'
):
settle_upstream += amt
else:
if (
parent_orgid
and book == parent_orgid
and subj == PARENT_SETTLE_SUBJECT
and direction == '借'
and r.get('participantid') == accounting_orgid
):
settle_upstream += amt
return {
'profit_amount': _round_money(profit),
'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0),
'amount_source': 'bill_detail',
'bill_detail_legs': legs,
}
async def _build_report_row(sor, row, accounting_orgid, is_owner):
bill_id = _row_get(row, 'bill_id')
customerid = _row_get(row, 'customerid')
providerid = _row_get(row, 'providerid')
productid = _row_get(row, 'productid')
bill_date = _row_get(row, 'bill_date')
quantity = _row_get(row, 'quantity') or 1
catalog_amount = _round_money(_row_get(row, 'catalog_amount'))
customer_pay = _round_money(_row_get(row, 'customer_pay_amount'))
customer_parentid = _row_get(row, 'customer_parentid')
protocol = await _protocol_snapshot(
sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity,
)
parent_orgid = protocol['parent_orgid']
settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid)
bill_state = _row_get(row, 'bill_state')
if str(bill_state) == '1':
amounts = await _finance_from_bill_detail(
sor, bill_id, accounting_orgid, is_owner, parent_orgid,
)
else:
amounts = _estimate_finance(catalog_amount, protocol, is_owner)
amounts['bill_detail_legs'] = []
product_name = _row_get(row, 'product_name')
if not product_name and productid:
prows = await sor.R('product', {'id': productid, 'del_flg': '0'})
if prows:
product_name = prows[0].get('name')
provider_name = _row_get(row, 'provider_name')
if not provider_name and providerid:
prov_rows = await sor.sqlExe(
"SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1",
{'orgid': providerid},
)
if prov_rows:
provider_name = prov_rows[0].get('name')
serving_reseller_id = None
serving_reseller_name = None
is_direct = customer_parentid == accounting_orgid
if not is_direct and customer_parentid:
serving_reseller_id = customer_parentid
serving_reseller_name = await _org_name(sor, customer_parentid)
order_date = _row_get(row, 'order_date')
servicename = _row_get(row, 'servicename')
return {
'bill_id': bill_id,
'order_id': _row_get(row, 'order_id'),
'bill_date': str(bill_date)[:10] if bill_date else None,
'order_date': str(order_date)[:19] if order_date else None,
'business_op': _row_get(row, 'business_op'),
'bill_state': bill_state,
'accounted': str(bill_state) == '1',
'customer': {
'id': customerid,
'name': _row_get(row, 'customer_name'),
'parent_orgid': customer_parentid,
'is_direct_customer': is_direct,
'serving_reseller': {
'id': serving_reseller_id,
'name': serving_reseller_name,
} if serving_reseller_id else None,
},
'product': {
'id': productid,
'name': product_name or servicename,
'servicename': servicename,
},
'provider': {
'orgid': providerid,
'name': provider_name,
},
'quantity': quantity,
'pricing': {
'catalog_amount': catalog_amount,
'list_price_unit': _round_money(_row_get(row, 'list_price')),
'order_discount': _round_money(_row_get(row, 'order_discount'))
if _row_get(row, 'order_discount') is not None
else None,
'order_unit_price': _round_money(_row_get(row, 'order_unit_price')),
'customer_pay_amount': customer_pay,
},
'viewer_org': {
'id': accounting_orgid,
'is_business_owner': is_owner,
'immediate_parent': {
'id': settle_meta.get('immediate_parent_orgid'),
'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')),
} if settle_meta.get('immediate_parent_orgid') else None,
},
'protocol': {
'parent_salemode': _salemode_label(protocol.get('own_salemode')),
'parent_discount_to_us': _round_money(protocol.get('own_discount')),
'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')),
'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')),
'our_discount_to_customer': _round_money(protocol.get('customer_discount')),
'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')),
'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')),
},
'finance': {
'profit_amount': amounts['profit_amount'],
'settle_upstream_amount': amounts['settle_upstream_amount'],
'settle_upstream_type': settle_meta['settle_upstream_type'],
'settle_upstream_target': {
'id': settle_meta['settle_upstream_orgid'],
'name': settle_meta['settle_upstream_orgname'],
},
'amount_source': amounts['amount_source'],
},
'bill_detail_legs': amounts.get('bill_detail_legs', []),
}
def _bill_core_from_clause():
"""统计/查 ID 用最小 JOIN兼容无 ordergoodsid 字段的库)。"""
return """
FROM bill b
INNER JOIN bz_order o ON b.orderid = o.id
INNER JOIN organization cust ON b.customerid = cust.id
"""
def _bill_ids_sql(where_sql):
"""不在 SQL 里写 LIMIT部分环境 sqlExe 对 LIMIT 会返回空列表。"""
return f"""
SELECT b.id AS bill_id
{_bill_core_from_clause()}
WHERE {where_sql}
ORDER BY b.bill_date DESC, b.create_at DESC
"""
def _sql_quote_bill_id(bill_id):
if bill_id is None:
return None
s = str(bill_id).strip()
if not s:
return None
for ch in s:
if not (ch.isalnum() or ch in ('_', '-')):
return None
return "'" + s.replace("'", "''") + "'"
async def _fetch_overview_bill_rows(sor, where_sql, params, max_bills):
"""
概览取数:先用仅 bill_id 的 SQL已验证可用再 sor.R 组装明细。
多列 + JOIN 的 _bill_scope_list_sql 在本环境 sqlExe 会返回空列表。
"""
id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params))
all_ids = []
seen = set()
for r in id_rows:
bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID'))
if bid and bid not in seen:
seen.add(bid)
all_ids.append(bid)
total_ids = len(all_ids)
truncated = total_ids > max_bills
if truncated:
all_ids = all_ids[:max_bills]
rows = []
for bid in all_ids:
row = await _fetch_bill_row_via_R(sor, bid)
if row:
rows.append(row)
return {
'total_ids': total_ids,
'truncated': truncated,
'id_query_len': len(id_rows),
'rows': rows,
}
def _new_amount_bucket():
return {
'sales_total': 0.0,
'profit_total': 0.0,
'settle_upstream_total': 0.0,
'bill_count': 0,
}
def _bucket_add(bucket, sales, profit, settle):
bucket['sales_total'] += float(sales or 0)
bucket['profit_total'] += float(profit or 0)
bucket['settle_upstream_total'] += float(settle or 0)
bucket['bill_count'] += 1
def _bucket_round(bucket):
return {
'sales_total': _round_money(bucket['sales_total']),
'profit_total': _round_money(bucket['profit_total']),
'settle_upstream_total': _round_money(bucket['settle_upstream_total']),
'bill_count': bucket['bill_count'],
}
def _norm_org_id(orgid):
if orgid is None:
return None
s = str(orgid).strip()
return s if s else None
def _customer_segment(customer_parentid, accounting_orgid, reseller_id_set):
pid = _norm_org_id(customer_parentid)
acc = _norm_org_id(accounting_orgid)
if pid == acc:
return 'direct_customers'
if pid in reseller_id_set:
return 'from_sub_resellers'
return None
async def _bill_finance_amounts(sor, row, accounting_orgid, is_owner, parent_orgid):
"""单笔:销售额、本级利润、本级应付上级/供应商。"""
sales = float(_row_get(row, 'customer_pay_amount') or 0)
bill_id = _row_get(row, 'bill_id')
bill_state = _row_get(row, 'bill_state')
quantity = int(_row_get(row, 'quantity') or 1)
if str(bill_state) == '1':
fin = await _finance_from_bill_detail(
sor, bill_id, accounting_orgid, is_owner, parent_orgid,
)
else:
catalog = float(_row_get(row, 'catalog_amount') or 0)
protocol = await _protocol_snapshot(
sor,
accounting_orgid,
_row_get(row, 'customerid'),
_row_get(row, 'providerid'),
_row_get(row, 'productid'),
_row_get(row, 'bill_date'),
quantity,
)
fin = _estimate_finance(catalog, protocol, is_owner)
profit = float(fin.get('profit_amount') or 0)
settle = float(fin.get('settle_upstream_amount') or 0)
return sales, profit, settle, fin.get('amount_source', 'unknown')
async def _provider_name_cache(sor, cache, provider_orgid):
if not provider_orgid:
return None
if provider_orgid in cache:
return cache[provider_orgid]
name = None
rows = await sor.sqlExe(
"SELECT name FROM provider WHERE orgid=${oid}$ AND del_flg='0' LIMIT 1",
{'oid': provider_orgid},
)
if rows:
name = rows[0].get('name')
cache[provider_orgid] = name
return name
async def _product_name_cache(sor, cache, productid):
if not productid:
return None
if productid in cache:
return cache[productid]
name = None
rows = await sor.R('product', {'id': productid, 'del_flg': '0'})
if rows:
name = rows[0].get('name')
cache[productid] = name
return name
def _count_sql(where_sql):
return f"""
SELECT COUNT(*) AS total_count
{_bill_core_from_clause()}
WHERE {where_sql}
"""
def _normalize_bill_id(bill_id):
if bill_id is None:
return None
s = str(bill_id).strip()
return s if s else None
async def _fetch_bill_row_via_R(sor, bill_id):
"""
用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。
已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。
"""
bid = _normalize_bill_id(bill_id)
if not bid:
return None
bills = await sor.R('bill', {'id': bid, 'del_flg': '0'})
if not bills:
bills = await sor.R('bill', {'id': bid})
if not bills:
return None
b = bills[0]
o = {}
oid = b.get('orderid')
if oid:
orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'})
if not orders:
orders = await sor.R('bz_order', {'id': oid})
if orders:
o = orders[0]
cust = {}
cid = b.get('customerid')
if cid:
custs = await sor.R('organization', {'id': cid, 'del_flg': '0'})
if not custs:
custs = await sor.R('organization', {'id': cid})
if custs:
cust = custs[0]
og = {}
ogid = b.get('ordergoodsid')
if ogid:
ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'})
if ogs:
og = ogs[0]
if not og and oid:
ogs = await sor.sqlExe(
"""SELECT list_price, discount, price FROM order_goods
WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""",
{'oid': oid},
)
if ogs:
og = ogs[0]
return {
'bill_id': b.get('id'),
'order_id': oid,
'customerid': cid,
'productid': b.get('productid'),
'providerid': b.get('providerid'),
'bill_date': b.get('bill_date'),
'bill_state': b.get('bill_state'),
'catalog_amount': b.get('provider_amt'),
'customer_pay_amount': b.get('amount'),
'quantity': b.get('quantity'),
'order_date': o.get('order_date'),
'business_op': o.get('business_op'),
'servicename': o.get('servicename'),
'customer_name': cust.get('orgname'),
'customer_parentid': cust.get('parentid'),
'list_price': og.get('list_price'),
'order_discount': og.get('discount'),
'order_unit_price': og.get('price'),
}
async def _fetch_bill_rows_by_ids(sor, bill_ids):
rows = []
for bid in bill_ids:
one = await _fetch_bill_row_via_R(sor, bid)
if one:
rows.append(one)
return rows
async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size):
"""
先查全部符合条件的 bill_id通常数量可控再在 Python 分页;
明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。
"""
id_sql = _bill_ids_sql(where_sql)
id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params))
all_ids = []
seen = set()
for r in id_rows:
bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID'))
if bid and bid not in seen:
seen.add(bid)
all_ids.append(bid)
total = len(all_ids)
page_ids = all_ids[offset: offset + page_size]
if not page_ids:
return total, []
ordered = await _fetch_bill_rows_by_ids(sor, page_ids)
return total, ordered
async def _finance_report_debug(sor, where_sql, params, page_size, offset):
"""debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。"""
dbg = {}
try:
c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params))
dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None
except Exception as e:
dbg['count_error'] = str(e)
try:
ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params))
dbg['ids_query_len'] = len(ids)
except Exception as e:
dbg['ids_error'] = str(e)
try:
lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset)
lim = _sql_rows(await sor.sqlExe(lim_sql, params))
dbg['ids_with_limit_len'] = len(lim)
except Exception as e:
dbg['ids_with_limit_error'] = str(e)
try:
page_ids = []
id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params))
if id_rows:
dbg['first_id_row_keys'] = list(id_rows[0].keys())
dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID')
for r in id_rows[offset: offset + page_size]:
bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID'))
if bid:
page_ids.append(bid)
dbg['page_ids_len'] = len(page_ids)
if page_ids:
dbg['sample_page_id'] = page_ids[0]
rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'})
dbg['sor_R_bill_count'] = len(rb) if rb else 0
one = await _fetch_bill_row_via_R(sor, page_ids[0])
dbg['sor_R_row_ok'] = one is not None
except Exception as e:
dbg['detail_fetch_error'] = str(e)
return dbg
async def finance_order_report(ns=None):
"""
管理人员 — 客户订单财务列表(分页)。
入参 ns
accounting_orgid (必填) 账本机构
include_sub_reseller_customers (可选) 默认 falsetrue 时含下级分销商的客户订单
userid, start_date, end_date, customerid, productid, order_id, bill_state
current_page (默认1), page_size (默认20, 最大100)
返回 finance.settle_upstream_*
- 本机构:应付供应商(待结转* 贷方,仅本机构账本)
- 分销商应付直接上级上级账本「分销商存放资金」借方participant=本级)
"""
ns = ns or {}
accounting_orgid = ns.get('accounting_orgid')
if not accounting_orgid:
return {'status': False, 'msg': '缺少 accounting_orgid当前账本机构'}
include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False)
current_page, page_size, offset = _parse_page(ns)
db = DBPools()
async with db.sqlorContext(DBNAME) as sor:
ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid'))
if not ok:
return {'status': False, 'msg': err}
scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql(
sor, accounting_orgid, include_sub,
)
is_owner = await _is_business_owner(sor, accounting_orgid)
conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql]
params = dict(scope_params)
if ns.get('start_date'):
conditions.append('b.bill_date >= ${start_date}$')
params['start_date'] = ns['start_date']
if ns.get('end_date'):
conditions.append('b.bill_date <= ${end_date}$')
params['end_date'] = ns['end_date']
if ns.get('customerid'):
conditions.append('b.customerid = ${customerid}$')
params['customerid'] = ns['customerid']
if ns.get('productid'):
conditions.append('b.productid = ${productid}$')
params['productid'] = ns['productid']
if ns.get('order_id'):
conditions.append('b.orderid = ${order_id}$')
params['order_id'] = ns['order_id']
if ns.get('bill_state') is not None:
conditions.append('b.bill_state = ${bill_state}$')
params['bill_state'] = str(ns['bill_state'])
where_sql = ' AND '.join(conditions)
total_count, rows = await _fetch_bill_rows_page(
sor, where_sql, params, offset, page_size,
)
if total_count > 0 and offset >= total_count:
current_page = 1
offset = 0
total_count, rows = await _fetch_bill_rows_page(
sor, where_sql, params, offset, page_size,
)
debug_info = None
if _parse_bool(ns.get('debug'), False):
debug_info = await _finance_report_debug(
sor, where_sql, params, page_size, offset,
)
debug_info['page_ids_fetched'] = len(rows)
items = []
sum_profit = 0.0
sum_pay = 0.0
sum_upstream = 0.0
row_errors = []
for row in rows:
try:
item = await _build_report_row(sor, row, accounting_orgid, is_owner)
except Exception as exc:
row_errors.append({
'bill_id': _row_get(row, 'bill_id'),
'error': str(exc),
})
continue
items.append(item)
fin = item['finance']
if fin.get('profit_amount') is not None:
sum_profit += fin['profit_amount']
pay = item['pricing'].get('customer_pay_amount')
if pay is not None:
sum_pay += pay
up = fin.get('settle_upstream_amount')
if up is not None:
sum_upstream += up
return {
'status': True,
'msg': 'ok',
'data': {
'accounting_orgid': accounting_orgid,
'accounting_orgname': await _org_name(sor, accounting_orgid),
'is_business_owner': is_owner,
'customer_scope': {
'include_sub_reseller_customers': include_sub,
'descendant_reseller_ids': reseller_ids,
'descendant_reseller_count': len(reseller_ids),
},
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'summary': {
'customer_pay_total': _round_money(sum_pay),
'profit_total': _round_money(sum_profit),
'settle_upstream_total': _round_money(sum_upstream),
},
'items': items,
'row_errors': row_errors,
'list_row_count': len(rows),
'debug': debug_info,
},
}
async def finance_order_report_detail(ns=None):
"""
管理人员 — 单笔账单财务明细。
入参accounting_orgid, bill_id, include_sub_reseller_customers与列表一致用于校验客户范围
"""
ns = ns or {}
accounting_orgid = ns.get('accounting_orgid')
bill_id = ns.get('bill_id')
if not accounting_orgid or not bill_id:
return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'}
include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False)
db = DBPools()
async with db.sqlorContext(DBNAME) as sor:
ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid'))
if not ok:
return {'status': False, 'msg': err}
_, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub)
is_owner = await _is_business_owner(sor, accounting_orgid)
rows = await _fetch_bill_rows_by_ids(sor, [bill_id])
if not rows:
return {'status': False, 'msg': '账单不存在'}
row = rows[0]
if not await _customer_in_scope(
_row_get(row, 'customer_parentid'),
accounting_orgid,
reseller_ids,
include_sub,
):
return {'status': False, 'msg': '该账单不在当前查询客户范围内'}
detail = await _build_report_row(sor, row, accounting_orgid, is_owner)
detail['customer_scope'] = {'include_sub_reseller_customers': include_sub}
return {'status': True, 'msg': 'ok', 'data': detail}
async def finance_billing_overview(ns=None):
"""
计费概览页 — 销售/利润/应付上级 汇总(单接口,建议概览页只调此接口)。
入参 ns
accounting_orgid (必填) 当前账本机构
include_sub_reseller_customers (可选) 默认 true是否含下级分销商客户带来的金额
start_date / end_date (可选) 按 bill.bill_date 筛选
bill_state (可选) 仅统计指定账单状态,默认不筛(含未记账则利润/结算为估算)
only_accounted (可选) true 时仅 bill_state=1推荐概览页使用
userid (可选) 权限
max_bills (可选) 默认 5000超出则截断并返回 truncated=true
口径(与订单明细接口一致):
sales_total = 客户实付合计 bill.amount
profit_total = 本级账本折扣收入+底价收入bill_detail或协议估算
settle_upstream_total= 本机构→供应商待结转;分销→上级账本分销商存放资金借方
分段:
direct_customers 直属客户cust.parentid = accounting_orgid
from_sub_resellers 下级分销商的客户cust.parentid in 下级分销 org
返回 by_provider_product按供应商 orgid + 产品 id 拆分上述三项及合计。
"""
ns = ns or {}
accounting_orgid = ns.get('accounting_orgid')
if not accounting_orgid:
return {'status': False, 'msg': '缺少 accounting_orgid当前账本机构'}
include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), True)
only_accounted = _parse_bool(ns.get('only_accounted'), False)
try:
max_bills = int(ns.get('max_bills', 50000) or 50000)
except (TypeError, ValueError):
max_bills = 50000
max_bills = max(100, min(max_bills, 200000))
db = DBPools()
async with db.sqlorContext(DBNAME) as sor:
ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid'))
if not ok:
return {'status': False, 'msg': err}
scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql(
sor, accounting_orgid, include_sub,
)
is_owner = await _is_business_owner(sor, accounting_orgid)
parent_orgid = await get_parent_orgid(sor, accounting_orgid)
conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql]
params = dict(scope_params)
if ns.get('start_date'):
conditions.append('b.bill_date >= ${start_date}$')
params['start_date'] = ns['start_date']
if ns.get('end_date'):
conditions.append('b.bill_date <= ${end_date}$')
params['end_date'] = ns['end_date']
if only_accounted:
conditions.append("b.bill_state = '1'")
elif ns.get('bill_state') is not None:
conditions.append('b.bill_state = ${bill_state}$')
params['bill_state'] = str(ns['bill_state'])
where_sql = ' AND '.join(conditions)
fetch = await _fetch_overview_bill_rows(sor, where_sql, params, max_bills)
bill_rows = fetch['rows']
truncated = fetch['truncated']
reseller_id_set = {_norm_org_id(r) for r in reseller_ids if _norm_org_id(r)}
totals = {
'direct_customers': _new_amount_bucket(),
'from_sub_resellers': _new_amount_bucket(),
'grand_total': _new_amount_bucket(),
}
by_pp = {}
provider_names = {}
product_names = {}
skipped = 0
errors = []
for row in bill_rows:
seg = _customer_segment(
_row_get(row, 'customer_parentid'),
accounting_orgid,
reseller_id_set,
)
if seg is None:
skipped += 1
continue
try:
sales, profit, settle, _src = await _bill_finance_amounts(
sor, row, accounting_orgid, is_owner, parent_orgid,
)
except Exception as exc:
errors.append({
'bill_id': _row_get(row, 'bill_id'),
'error': str(exc),
})
continue
_bucket_add(totals[seg], sales, profit, settle)
_bucket_add(totals['grand_total'], sales, profit, settle)
pid = _row_get(row, 'providerid') or ''
prid = _row_get(row, 'productid') or ''
key = (pid, prid)
if key not in by_pp:
by_pp[key] = {
'provider_orgid': pid,
'product_id': prid,
'direct_customers': _new_amount_bucket(),
'from_sub_resellers': _new_amount_bucket(),
'total': _new_amount_bucket(),
}
_bucket_add(by_pp[key][seg], sales, profit, settle)
_bucket_add(by_pp[key]['total'], sales, profit, settle)
breakdown = []
for (pid, prid), node in by_pp.items():
breakdown.append({
'provider': {
'orgid': pid,
'name': await _provider_name_cache(sor, provider_names, pid),
},
'product': {
'id': prid,
'name': await _product_name_cache(sor, product_names, prid),
},
'direct_customers': _bucket_round(node['direct_customers']),
'from_sub_resellers': _bucket_round(node['from_sub_resellers']),
'total': _bucket_round(node['total']),
})
breakdown.sort(key=lambda x: float(x['total']['sales_total'] or 0), reverse=True)
settle_target = await _settle_upstream_meta(sor, accounting_orgid, None)
if is_owner:
settle_label = '应付供应商'
else:
settle_label = '应付上级机构'
data = {
'accounting_orgid': accounting_orgid,
'accounting_orgname': await _org_name(sor, accounting_orgid),
'is_business_owner': is_owner,
'settle_upstream_label': settle_label,
'settle_upstream_target': {
'type': settle_target['settle_upstream_type'],
'orgid': settle_target['settle_upstream_orgid'],
'name': settle_target['settle_upstream_orgname'],
},
'period': {
'start_date': ns.get('start_date'),
'end_date': ns.get('end_date'),
},
'customer_scope': {
'include_sub_reseller_customers': include_sub,
'descendant_reseller_ids': reseller_ids,
'descendant_reseller_count': len(reseller_ids),
},
'filters': {
'only_accounted': only_accounted,
'bill_state': ns.get('bill_state'),
},
'bill_count': len(bill_rows),
'bill_id_count': fetch['total_ids'],
'id_query_len': fetch['id_query_len'],
'truncated': truncated,
'max_bills': max_bills,
'skipped_out_of_scope': skipped,
'totals': {
'direct_customers': _bucket_round(totals['direct_customers']),
'from_sub_resellers': _bucket_round(totals['from_sub_resellers']),
'grand_total': _bucket_round(totals['grand_total']),
},
'by_provider_product': breakdown,
'errors': errors,
}
if _parse_bool(ns.get('debug'), False) or (
data['totals']['grand_total']['bill_count'] == 0
and fetch['id_query_len'] > 0
):
data['debug'] = {
'id_query_len': fetch['id_query_len'],
'bill_id_count': fetch['total_ids'],
'overview_rows_len': len(bill_rows),
'skipped_out_of_scope': skipped,
'sample_parentid': _row_get(bill_rows[0], 'customer_parentid') if bill_rows else None,
}
return {'status': True, 'msg': 'ok', 'data': data}
# _report = params_kw.get('report') or params_kw.get('api') or 'order_list'
_report = None
if _report in ('overview', 'billing_overview', 'finance_billing_overview'):
ret = await finance_billing_overview(params_kw)
elif _report in ('detail', 'order_detail'):
ret = await finance_order_report_detail(params_kw)
else:
ret = await finance_order_report(params_kw)
return ret
return ret