kboss/b/bill/finance_order_report1.dspy
2026-05-25 01:04:16 +08:00

887 lines
31 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
管理人员财务订单报表接口。
视角:以 accounting_orgid账本机构为查看主体展示客户订单的定价、利润、本级应付结算。
依赖运行环境DBPools, sor.sqlExe, sor.R
业务口径(已确认):
- product_salemode.providerid = provider.orgid
- 分销商不直接对供应商结算仅本机构parentid 为空)对供应商结算
- 一级分销对本机构结算,二级分销对一级分销结算(即对直接上级结算)
- settle_upstream_amount本级应付本机构→供应商分销→直接上级机构
- profit_amount本级账本「折扣收入」「底价收入」贷方合计
- include_sub_reseller_customers是否包含下级分销商发展的客户
"""
async def get_parent_orgid(sor, orgid):
sql = """select a.id from organization a, organization b
where b.parentid = a.id
and a.del_flg = '0'
and b.del_flg = '0'
and b.id = ${orgid}$"""
recs = await sor.sqlExe(sql, {'orgid':orgid})
if len(recs) == 0:
return None
return recs[0]['id']
DBNAME = 'kboss'
RESELLER_ORG = '1'
OWNER_OGR = '0'
CORP_CUSTOMER = '2'
PERSONAL = '3'
PROVIDER = '4'
PUBLISHER = '5'
UNDERWRITER = '6'
PARTY_OWNER = '本机构'
PARTY_CUSTOMER = '客户'
PARTY_RESELLER = '分销商'
PARTY_PROVIDER = '供应商'
PARTY_PUBLISHER = '算力券发行方'
PARTY_UNDERWRITER = '算力券承销方'
DEBT = '借'
CREDIT = '贷'
ACTNAME_BUY = '付费'
ACTNAME_RECHARGE = '充值'
ACTNAME_RECHARGE_ALIPAY = '支付宝充值'
ACTNAME_SETTLE = '结算'
SALEMODE_DISCOUNT = '折扣'
SALEMODE_REBATE = '代付费'
SALEMODE_FLOORPRICE = '底价'
ACTION_RECHARGE_ALIPAY = 'RECHARGE_ALIPAY'
ACTION_RECHARGE_ALIPAY_REVERSE = 'RECHARGE_ALIPAY_REVERSE'
ACTION_RECHARGE = 'RECHARGE'
ACTION_RECHARGE_REVERSE = 'RECHARGE_REVERSE'
ACTION_BUY = 'BUY'
ACTION_REVERSE_BUY = 'BUY_REVERSE'
ACTION_RENEW = 'RENEW'
ACTION_RENEW_REVERSE = 'RENEW_REVERSE'
ACTION_SETTLE = 'SETTLE'
ACTION_SETTLE_REVERSE = 'SETTLE_REVERSE'
DBNAME = 'kboss'
CUSTOMER_ORG_TYPES = ('2', '3')
SALEMODE_LABEL = {
'0': SALEMODE_DISCOUNT,
'1': SALEMODE_REBATE,
'2': SALEMODE_FLOORPRICE,
}
INCOME_SUBJECTS = ('折扣收入', '底价收入')
PARENT_SETTLE_SUBJECT = '分销商存放资金'
SUPPLIER_SETTLE_PREFIX = '待结转'
_SALEMODE_SQL_OWN = """
SELECT a.salemode, a.settle_mode, b.discount, b.price
FROM saleprotocol a, product_salemode b
WHERE a.id = b.protocolid
AND a.bid_orgid = ${bid_orgid}$
AND (b.productid = ${productid}$ OR b.productid = '*')
AND b.providerid = ${providerid}$
AND a.start_date <= ${curdate}$
AND a.end_date > ${curdate}$
AND a.del_flg = '0'
AND b.del_flg = '0'
ORDER BY b.productid DESC
LIMIT 1
"""
_SALEMODE_SQL_OFFER = """
SELECT a.salemode, a.settle_mode, b.discount, b.price
FROM saleprotocol a, product_salemode b
WHERE a.id = b.protocolid
AND a.offer_orgid = ${offer_orgid}$
AND a.bid_orgid = ${bid_orgid}$
AND (b.productid = ${productid}$ OR b.productid = '*')
AND b.providerid = ${providerid}$
AND a.start_date <= ${curdate}$
AND a.end_date > ${curdate}$
AND a.del_flg = '0'
AND b.del_flg = '0'
ORDER BY b.productid DESC
LIMIT 1
"""
def _round_money(v):
if v is None:
return None
return round(float(v), 2)
def _salemode_label(code):
if code is None:
return None
return SALEMODE_LABEL.get(str(code), str(code))
def _parse_bool(v, default=False):
if v is None:
return default
if isinstance(v, bool):
return v
return str(v).strip().lower() in ('1', 'true', 'yes', 'on')
def _parse_page(ns, default_page=1, default_size=20, max_size=100):
"""解析分页,避免 page_size=0 导致 LIMIT 0 无数据。"""
try:
page_size = int(ns.get('page_size', default_size) or default_size)
except (TypeError, ValueError):
page_size = default_size
try:
current_page = int(ns.get('current_page', default_page) or default_page)
except (TypeError, ValueError):
current_page = default_page
page_size = max(1, min(page_size, max_size))
current_page = max(1, current_page)
offset = (current_page - 1) * page_size
return current_page, page_size, offset
def _sql_rows(result):
"""统一 sqlExe 查询结果为 list[dict]。"""
if result is None:
return []
if isinstance(result, list):
return result
if isinstance(result, dict):
return [result]
return list(result)
def _row_get(row, *keys, default=None):
"""兼容不同驱动返回的大小写字段名。"""
if not row:
return default
for key in keys:
if key in row:
return row[key]
lower = key.lower()
for k, v in row.items():
if k.lower() == lower:
return v
return default
async def _check_viewer(sor, accounting_orgid, userid=None):
if not userid:
return True, None
users = await sor.R('users', {'id': userid, 'del_flg': '0'})
if not users:
return False, '用户不存在'
user_orgid = users[0].get('orgid')
if user_orgid == accounting_orgid:
return True, None
parent = await get_parent_orgid(sor, accounting_orgid)
if parent and user_orgid == parent:
return True, None
return False, '无权查看该机构财务数据'
async def _org_name(sor, orgid):
if not orgid:
return None
rows = await sor.R('organization', {'id': orgid, 'del_flg': '0'})
return rows[0]['orgname'] if rows else None
async def _is_business_owner(sor, orgid):
rows = await sor.sqlExe(
"SELECT id FROM organization WHERE id=${id}$ AND parentid IS NULL AND del_flg='0'",
{'id': orgid},
)
return len(rows) > 0
async def _collect_descendant_reseller_ids(sor, root_orgid):
"""递归收集 root 下所有下级分销商 organization.idorg_type=1。"""
found = []
queue = [root_orgid]
seen = {root_orgid}
while queue:
pid = queue.pop(0)
rows = await sor.sqlExe(
"""SELECT id FROM organization
WHERE parentid=${pid}$ AND org_type=${org_type}$ AND del_flg='0'""",
{'pid': pid, 'org_type': RESELLER_ORG},
)
for r in rows:
cid = r['id']
if cid not in seen:
seen.add(cid)
found.append(cid)
queue.append(cid)
return found
async def _build_customer_scope_sql(sor, accounting_orgid, include_sub_reseller_customers):
"""
客户范围 SQL 片段与参数。
直属cust.parentid = accounting_orgid
含下级分销:直属 + cust.parentid IN (下级分销商 id 列表)
"""
params = {'accounting_orgid': accounting_orgid}
if not include_sub_reseller_customers:
return "cust.parentid = ${accounting_orgid}$", params, []
reseller_ids = await _collect_descendant_reseller_ids(sor, accounting_orgid)
if not reseller_ids:
return "cust.parentid = ${accounting_orgid}$", params, []
in_keys = []
for i, rid in enumerate(reseller_ids):
key = 'reseller_%d' % i
params[key] = rid
in_keys.append('${%s}$' % key)
in_sql = ', '.join(in_keys)
cond = "(cust.parentid = ${accounting_orgid}$ OR cust.parentid IN (%s))" % in_sql
return cond, params, reseller_ids
async def _customer_in_scope(customer_parentid, accounting_orgid, reseller_ids, include_sub):
if customer_parentid == accounting_orgid:
return True
if include_sub and customer_parentid in reseller_ids:
return True
return False
async def _fetch_salemode_row(sor, sql, params):
rows = await sor.sqlExe(sql, params)
return rows[0] if rows else None
async def _settle_upstream_meta(sor, accounting_orgid, provider_orgid):
"""本级应付对象:本机构→供应商;分销→直接上级。"""
if await _is_business_owner(sor, accounting_orgid):
return {
'settle_upstream_type': 'supplier',
'settle_upstream_orgid': provider_orgid,
'settle_upstream_orgname': await _org_name(sor, provider_orgid),
'immediate_parent_orgid': None,
}
parent_orgid = await get_parent_orgid(sor, accounting_orgid)
return {
'settle_upstream_type': 'parent_org',
'settle_upstream_orgid': parent_orgid,
'settle_upstream_orgname': await _org_name(sor, parent_orgid),
'immediate_parent_orgid': parent_orgid,
}
async def _protocol_snapshot(sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity):
curdate = bill_date
if hasattr(curdate, 'strftime'):
curdate = curdate.strftime('%Y-%m-%d')
elif curdate is not None:
curdate = str(curdate)[:10]
qty = int(quantity or 1)
base = {'providerid': providerid, 'productid': productid, 'curdate': curdate}
own = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OWN, dict(base, bid_orgid=accounting_orgid),
)
cust = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OFFER,
dict(base, offer_orgid=accounting_orgid, bid_orgid=customerid),
)
if not cust:
cust = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OFFER,
dict(base, offer_orgid=accounting_orgid, bid_orgid='*'),
)
parent_orgid = await get_parent_orgid(sor, accounting_orgid)
reseller = None
if parent_orgid:
reseller = await _fetch_salemode_row(
sor, _SALEMODE_SQL_OFFER,
dict(base, offer_orgid=parent_orgid, bid_orgid=accounting_orgid),
)
return {
'parent_orgid': parent_orgid,
'parent_orgname': await _org_name(sor, parent_orgid),
'own_salemode': own.get('salemode') if own else None,
'own_discount': own.get('discount') if own else None,
'own_floor_unit_price': own.get('price') if own else None,
'customer_salemode': cust.get('salemode') if cust else None,
'customer_discount': cust.get('discount') if cust else None,
'customer_floor_unit_price': cust.get('price') if cust else None,
'reseller_discount': reseller.get('discount') if reseller else None,
'reseller_salemode': reseller.get('salemode') if reseller else None,
'reseller_floor_unit_price': reseller.get('price') if reseller else None,
'quantity': qty,
}
def _estimate_finance(catalog_amount, protocol, is_owner):
"""未记账:估算本级利润与本级应付(不含代付费单独字段)。"""
catalog_amount = float(catalog_amount or 0)
qty = protocol.get('quantity') or 1
profit = None
settle_upstream = None
own_mode = protocol.get('own_salemode')
cust_mode = protocol.get('customer_salemode')
own_disc = protocol.get('own_discount')
cust_disc = protocol.get('customer_discount')
reseller_disc = protocol.get('reseller_discount')
if own_mode == '0' or cust_mode == '0':
if own_disc is not None and cust_disc is not None:
profit = catalog_amount * (float(cust_disc) - float(own_disc))
if is_owner:
if own_disc is not None:
settle_upstream = catalog_amount * float(own_disc)
elif reseller_disc is not None:
settle_upstream = catalog_amount * float(reseller_disc)
elif own_mode == '2' or cust_mode == '2':
own_price = protocol.get('own_floor_unit_price')
cust_price = protocol.get('customer_floor_unit_price')
reseller_price = protocol.get('reseller_floor_unit_price')
if own_price is not None and cust_price is not None:
profit = (float(cust_price) - float(own_price)) * qty
if is_owner and own_price is not None:
settle_upstream = float(own_price) * qty
elif not is_owner and reseller_price is not None:
settle_upstream = float(reseller_price) * qty
elif not is_owner and cust_price is not None:
settle_upstream = float(cust_price) * qty
return {
'profit_amount': _round_money(profit),
'settle_upstream_amount': _round_money(settle_upstream),
'amount_source': 'estimated',
}
async def _finance_from_bill_detail(sor, billid, accounting_orgid, is_owner, parent_orgid):
"""已记账:从 bill_detail 取本级利润与本级应付。"""
sql = """
SELECT accounting_orgid, subjectname, accounting_dir, participantid, participanttype, amount
FROM bill_detail WHERE billid = ${billid}$ AND del_flg = '0'
"""
rows = await sor.sqlExe(sql, {'billid': billid})
profit = 0.0
settle_upstream = 0.0
legs = []
for r in rows:
legs.append({
'accounting_orgid': r['accounting_orgid'],
'subjectname': r['subjectname'],
'accounting_dir': r['accounting_dir'],
'participanttype': r.get('participanttype'),
'participantid': r.get('participantid'),
'amount': _round_money(r['amount']),
})
amt = float(r['amount'] or 0)
book = r['accounting_orgid']
subj = r['subjectname'] or ''
direction = r['accounting_dir']
if book == accounting_orgid and subj in INCOME_SUBJECTS and direction == '贷':
profit += amt
if is_owner:
if (
book == accounting_orgid
and subj.startswith(SUPPLIER_SETTLE_PREFIX)
and direction == '贷'
):
settle_upstream += amt
else:
if (
parent_orgid
and book == parent_orgid
and subj == PARENT_SETTLE_SUBJECT
and direction == '借'
and r.get('participantid') == accounting_orgid
):
settle_upstream += amt
return {
'profit_amount': _round_money(profit),
'settle_upstream_amount': _round_money(settle_upstream) if settle_upstream else _round_money(0),
'amount_source': 'bill_detail',
'bill_detail_legs': legs,
}
async def _build_report_row(sor, row, accounting_orgid, is_owner):
bill_id = _row_get(row, 'bill_id')
customerid = _row_get(row, 'customerid')
providerid = _row_get(row, 'providerid')
productid = _row_get(row, 'productid')
bill_date = _row_get(row, 'bill_date')
quantity = _row_get(row, 'quantity') or 1
catalog_amount = _round_money(_row_get(row, 'catalog_amount'))
customer_pay = _round_money(_row_get(row, 'customer_pay_amount'))
customer_parentid = _row_get(row, 'customer_parentid')
protocol = await _protocol_snapshot(
sor, accounting_orgid, customerid, providerid, productid, bill_date, quantity,
)
parent_orgid = protocol['parent_orgid']
settle_meta = await _settle_upstream_meta(sor, accounting_orgid, providerid)
bill_state = _row_get(row, 'bill_state')
if str(bill_state) == '1':
amounts = await _finance_from_bill_detail(
sor, bill_id, accounting_orgid, is_owner, parent_orgid,
)
else:
amounts = _estimate_finance(catalog_amount, protocol, is_owner)
amounts['bill_detail_legs'] = []
product_name = _row_get(row, 'product_name')
if not product_name and productid:
prows = await sor.R('product', {'id': productid, 'del_flg': '0'})
if prows:
product_name = prows[0].get('name')
provider_name = _row_get(row, 'provider_name')
if not provider_name and providerid:
prov_rows = await sor.sqlExe(
"SELECT name FROM provider WHERE orgid=${orgid}$ AND del_flg='0' LIMIT 1",
{'orgid': providerid},
)
if prov_rows:
provider_name = prov_rows[0].get('name')
serving_reseller_id = None
serving_reseller_name = None
is_direct = customer_parentid == accounting_orgid
if not is_direct and customer_parentid:
serving_reseller_id = customer_parentid
serving_reseller_name = await _org_name(sor, customer_parentid)
order_date = _row_get(row, 'order_date')
servicename = _row_get(row, 'servicename')
return {
'bill_id': bill_id,
'order_id': _row_get(row, 'order_id'),
'bill_date': str(bill_date)[:10] if bill_date else None,
'order_date': str(order_date)[:19] if order_date else None,
'business_op': _row_get(row, 'business_op'),
'bill_state': bill_state,
'accounted': str(bill_state) == '1',
'customer': {
'id': customerid,
'name': _row_get(row, 'customer_name'),
'parent_orgid': customer_parentid,
'is_direct_customer': is_direct,
'serving_reseller': {
'id': serving_reseller_id,
'name': serving_reseller_name,
} if serving_reseller_id else None,
},
'product': {
'id': productid,
'name': product_name or servicename,
'servicename': servicename,
},
'provider': {
'orgid': providerid,
'name': provider_name,
},
'quantity': quantity,
'pricing': {
'catalog_amount': catalog_amount,
'list_price_unit': _round_money(_row_get(row, 'list_price')),
'order_discount': _round_money(_row_get(row, 'order_discount'))
if _row_get(row, 'order_discount') is not None
else None,
'order_unit_price': _round_money(_row_get(row, 'order_unit_price')),
'customer_pay_amount': customer_pay,
},
'viewer_org': {
'id': accounting_orgid,
'is_business_owner': is_owner,
'immediate_parent': {
'id': settle_meta.get('immediate_parent_orgid'),
'name': await _org_name(sor, settle_meta.get('immediate_parent_orgid')),
} if settle_meta.get('immediate_parent_orgid') else None,
},
'protocol': {
'parent_salemode': _salemode_label(protocol.get('own_salemode')),
'parent_discount_to_us': _round_money(protocol.get('own_discount')),
'parent_floor_unit_price': _round_money(protocol.get('own_floor_unit_price')),
'our_salemode_to_customer': _salemode_label(protocol.get('customer_salemode')),
'our_discount_to_customer': _round_money(protocol.get('customer_discount')),
'our_floor_unit_price_to_customer': _round_money(protocol.get('customer_floor_unit_price')),
'our_discount_as_reseller_to_parent': _round_money(protocol.get('reseller_discount')),
},
'finance': {
'profit_amount': amounts['profit_amount'],
'settle_upstream_amount': amounts['settle_upstream_amount'],
'settle_upstream_type': settle_meta['settle_upstream_type'],
'settle_upstream_target': {
'id': settle_meta['settle_upstream_orgid'],
'name': settle_meta['settle_upstream_orgname'],
},
'amount_source': amounts['amount_source'],
},
'bill_detail_legs': amounts.get('bill_detail_legs', []),
}
def _bill_core_from_clause():
"""统计/查 ID 用最小 JOIN兼容无 ordergoodsid 字段的库)。"""
return """
FROM bill b
INNER JOIN bz_order o ON b.orderid = o.id
INNER JOIN organization cust ON b.customerid = cust.id
"""
def _bill_ids_sql(where_sql):
"""不在 SQL 里写 LIMIT部分环境 sqlExe 对 LIMIT 会返回空列表。"""
return f"""
SELECT b.id AS bill_id
{_bill_core_from_clause()}
WHERE {where_sql}
ORDER BY b.bill_date DESC, b.create_at DESC
"""
def _count_sql(where_sql):
return f"""
SELECT COUNT(*) AS total_count
{_bill_core_from_clause()}
WHERE {where_sql}
"""
def _normalize_bill_id(bill_id):
if bill_id is None:
return None
s = str(bill_id).strip()
return s if s else None
async def _fetch_bill_row_via_R(sor, bill_id):
"""
用 sor.R 按表组装明细(与 mu_ban / process_user_billing 一致)。
已证实:本环境 sqlExe 多表 JOIN + WHERE b.id 会返回空,但 sor.R('bill') 可用。
"""
bid = _normalize_bill_id(bill_id)
if not bid:
return None
bills = await sor.R('bill', {'id': bid, 'del_flg': '0'})
if not bills:
bills = await sor.R('bill', {'id': bid})
if not bills:
return None
b = bills[0]
o = {}
oid = b.get('orderid')
if oid:
orders = await sor.R('bz_order', {'id': oid, 'del_flg': '0'})
if not orders:
orders = await sor.R('bz_order', {'id': oid})
if orders:
o = orders[0]
cust = {}
cid = b.get('customerid')
if cid:
custs = await sor.R('organization', {'id': cid, 'del_flg': '0'})
if not custs:
custs = await sor.R('organization', {'id': cid})
if custs:
cust = custs[0]
og = {}
ogid = b.get('ordergoodsid')
if ogid:
ogs = await sor.R('order_goods', {'id': ogid, 'del_flg': '0'})
if ogs:
og = ogs[0]
if not og and oid:
ogs = await sor.sqlExe(
"""SELECT list_price, discount, price FROM order_goods
WHERE orderid=${oid}$ AND del_flg='0' LIMIT 1""",
{'oid': oid},
)
if ogs:
og = ogs[0]
return {
'bill_id': b.get('id'),
'order_id': oid,
'customerid': cid,
'productid': b.get('productid'),
'providerid': b.get('providerid'),
'bill_date': b.get('bill_date'),
'bill_state': b.get('bill_state'),
'catalog_amount': b.get('provider_amt'),
'customer_pay_amount': b.get('amount'),
'quantity': b.get('quantity'),
'order_date': o.get('order_date'),
'business_op': o.get('business_op'),
'servicename': o.get('servicename'),
'customer_name': cust.get('orgname'),
'customer_parentid': cust.get('parentid'),
'list_price': og.get('list_price'),
'order_discount': og.get('discount'),
'order_unit_price': og.get('price'),
}
async def _fetch_bill_rows_by_ids(sor, bill_ids):
rows = []
for bid in bill_ids:
one = await _fetch_bill_row_via_R(sor, bid)
if one:
rows.append(one)
return rows
async def _fetch_bill_rows_page(sor, where_sql, base_params, offset, page_size):
"""
先查全部符合条件的 bill_id通常数量可控再在 Python 分页;
明细用 IN 字面量或逐条查询,不用 IN (${bid_n}$) 占位符。
"""
id_sql = _bill_ids_sql(where_sql)
id_rows = _sql_rows(await sor.sqlExe(id_sql, base_params))
all_ids = []
seen = set()
for r in id_rows:
bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID'))
if bid and bid not in seen:
seen.add(bid)
all_ids.append(bid)
total = len(all_ids)
page_ids = all_ids[offset: offset + page_size]
if not page_ids:
return total, []
ordered = await _fetch_bill_rows_by_ids(sor, page_ids)
return total, ordered
async def _finance_report_debug(sor, where_sql, params, page_size, offset):
"""debug=true 时返回,便于在服务器上对比哪一步 SQL 无数据。"""
dbg = {}
try:
c = _sql_rows(await sor.sqlExe(_count_sql(where_sql), params))
dbg['count_total'] = int(_row_get(c[0], 'total_count', 0) or 0) if c else None
except Exception as e:
dbg['count_error'] = str(e)
try:
ids = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params))
dbg['ids_query_len'] = len(ids)
except Exception as e:
dbg['ids_error'] = str(e)
try:
lim_sql = _bill_ids_sql(where_sql).strip() + ' LIMIT %d OFFSET %d' % (page_size, offset)
lim = _sql_rows(await sor.sqlExe(lim_sql, params))
dbg['ids_with_limit_len'] = len(lim)
except Exception as e:
dbg['ids_with_limit_error'] = str(e)
try:
page_ids = []
id_rows = _sql_rows(await sor.sqlExe(_bill_ids_sql(where_sql), params))
if id_rows:
dbg['first_id_row_keys'] = list(id_rows[0].keys())
dbg['first_id_raw'] = _row_get(id_rows[0], 'bill_id', 'id', 'ID')
for r in id_rows[offset: offset + page_size]:
bid = _normalize_bill_id(_row_get(r, 'bill_id', 'id', 'ID'))
if bid:
page_ids.append(bid)
dbg['page_ids_len'] = len(page_ids)
if page_ids:
dbg['sample_page_id'] = page_ids[0]
rb = await sor.R('bill', {'id': page_ids[0], 'del_flg': '0'})
dbg['sor_R_bill_count'] = len(rb) if rb else 0
one = await _fetch_bill_row_via_R(sor, page_ids[0])
dbg['sor_R_row_ok'] = one is not None
except Exception as e:
dbg['detail_fetch_error'] = str(e)
return dbg
async def finance_order_report(ns=None):
"""
管理人员 — 客户订单财务列表(分页)。
入参 ns
accounting_orgid (必填) 账本机构
include_sub_reseller_customers (可选) 默认 falsetrue 时含下级分销商的客户订单
userid, start_date, end_date, customerid, productid, order_id, bill_state
current_page (默认1), page_size (默认20, 最大100)
返回 finance.settle_upstream_*
- 本机构:应付供应商(待结转* 贷方,仅本机构账本)
- 分销商应付直接上级上级账本「分销商存放资金」借方participant=本级)
"""
ns = ns or {}
accounting_orgid = ns.get('accounting_orgid')
if not accounting_orgid:
return {'status': False, 'msg': '缺少 accounting_orgid当前账本机构'}
include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False)
current_page, page_size, offset = _parse_page(ns)
db = DBPools()
async with db.sqlorContext(DBNAME) as sor:
ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid'))
if not ok:
return {'status': False, 'msg': err}
scope_sql, scope_params, reseller_ids = await _build_customer_scope_sql(
sor, accounting_orgid, include_sub,
)
is_owner = await _is_business_owner(sor, accounting_orgid)
conditions = ["b.del_flg = '0'", "o.del_flg = '0'", scope_sql]
params = dict(scope_params)
if ns.get('start_date'):
conditions.append('b.bill_date >= ${start_date}$')
params['start_date'] = ns['start_date']
if ns.get('end_date'):
conditions.append('b.bill_date <= ${end_date}$')
params['end_date'] = ns['end_date']
if ns.get('customerid'):
conditions.append('b.customerid = ${customerid}$')
params['customerid'] = ns['customerid']
if ns.get('productid'):
conditions.append('b.productid = ${productid}$')
params['productid'] = ns['productid']
if ns.get('order_id'):
conditions.append('b.orderid = ${order_id}$')
params['order_id'] = ns['order_id']
if ns.get('bill_state') is not None:
conditions.append('b.bill_state = ${bill_state}$')
params['bill_state'] = str(ns['bill_state'])
where_sql = ' AND '.join(conditions)
total_count, rows = await _fetch_bill_rows_page(
sor, where_sql, params, offset, page_size,
)
if total_count > 0 and offset >= total_count:
current_page = 1
offset = 0
total_count, rows = await _fetch_bill_rows_page(
sor, where_sql, params, offset, page_size,
)
debug_info = None
if _parse_bool(ns.get('debug'), False):
debug_info = await _finance_report_debug(
sor, where_sql, params, page_size, offset,
)
debug_info['page_ids_fetched'] = len(rows)
items = []
sum_profit = 0.0
sum_pay = 0.0
sum_upstream = 0.0
row_errors = []
for row in rows:
try:
item = await _build_report_row(sor, row, accounting_orgid, is_owner)
except Exception as exc:
row_errors.append({
'bill_id': _row_get(row, 'bill_id'),
'error': str(exc),
})
continue
items.append(item)
fin = item['finance']
if fin.get('profit_amount') is not None:
sum_profit += fin['profit_amount']
pay = item['pricing'].get('customer_pay_amount')
if pay is not None:
sum_pay += pay
up = fin.get('settle_upstream_amount')
if up is not None:
sum_upstream += up
return {
'status': True,
'msg': 'ok',
'data': {
'accounting_orgid': accounting_orgid,
'accounting_orgname': await _org_name(sor, accounting_orgid),
'is_business_owner': is_owner,
'customer_scope': {
'include_sub_reseller_customers': include_sub,
'descendant_reseller_ids': reseller_ids,
'descendant_reseller_count': len(reseller_ids),
},
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'summary': {
'customer_pay_total': _round_money(sum_pay),
'profit_total': _round_money(sum_profit),
'settle_upstream_total': _round_money(sum_upstream),
},
'items': items,
'row_errors': row_errors,
'list_row_count': len(rows),
'debug': debug_info,
},
}
async def finance_order_report_detail(ns=None):
"""
管理人员 — 单笔账单财务明细。
入参accounting_orgid, bill_id, include_sub_reseller_customers与列表一致用于校验客户范围
"""
ns = ns or {}
accounting_orgid = ns.get('accounting_orgid')
bill_id = ns.get('bill_id')
if not accounting_orgid or not bill_id:
return {'status': False, 'msg': '缺少 accounting_orgid 或 bill_id'}
include_sub = _parse_bool(ns.get('include_sub_reseller_customers'), False)
db = DBPools()
async with db.sqlorContext(DBNAME) as sor:
ok, err = await _check_viewer(sor, accounting_orgid, ns.get('userid'))
if not ok:
return {'status': False, 'msg': err}
_, _, reseller_ids = await _build_customer_scope_sql(sor, accounting_orgid, include_sub)
is_owner = await _is_business_owner(sor, accounting_orgid)
rows = await _fetch_bill_rows_by_ids(sor, [bill_id])
if not rows:
return {'status': False, 'msg': '账单不存在'}
row = rows[0]
if not await _customer_in_scope(
_row_get(row, 'customer_parentid'),
accounting_orgid,
reseller_ids,
include_sub,
):
return {'status': False, 'msg': '该账单不在当前查询客户范围内'}
detail = await _build_report_row(sor, row, accounting_orgid, is_owner)
detail['customer_scope'] = {'include_sub_reseller_customers': include_sub}
return {'status': True, 'msg': 'ok', 'data': detail}
ret = await finance_order_report(params_kw)
return ret