kboss/b/bill/finance_settlement_preview.dspy
2026-06-18 17:48:01 +08:00

282 lines
11 KiB
Plaintext

# -*- coding: utf-8 -*-
"""财务结算单生成前预览。"""
DBNAME = 'kboss'
def _round_money(v):
return round(float(v or 0), 8)
def _is_reverse_op(op):
return str(op or '').upper().endswith('_REVERSE')
def _sale_mode_from_subject(subjectname):
text = str(subjectname or '')
if '代付费' in text or '返佣' in text:
return '1'
if '底价' in text or '低价' in text:
return '2'
return '0'
def _parse_page(ns, default_page=1, default_size=50, max_size=500):
try:
current_page = int(ns.get('current_page', default_page) or default_page)
except (TypeError, ValueError):
current_page = default_page
try:
page_size = int(ns.get('page_size', default_size) or default_size)
except (TypeError, ValueError):
page_size = default_size
current_page = max(1, current_page)
page_size = max(1, min(page_size, max_size))
return current_page, page_size, (current_page - 1) * page_size
def _validate_params(ns):
required = ('accounting_orgid', 'counterparty_type', 'counterparty_orgid')
for key in required:
if not ns.get(key):
return None, '缺少 %s' % key
counterparty_type = ns.get('counterparty_type')
if counterparty_type not in ('supplier', 'reseller'):
return None, 'counterparty_type 仅支持 supplier / reseller'
period_type = ns.get('period_type') or 'day'
if period_type not in ('day', 'month'):
return None, 'period_type 仅支持 day / month'
period_start = ns.get('period_start') or ns.get('start_date')
period_end = ns.get('period_end') or ns.get('end_date')
if not period_start or not period_end:
return None, '缺少 period_start / period_end'
return {
'accounting_orgid': ns.get('accounting_orgid'),
'counterparty_type': counterparty_type,
'counterparty_orgid': ns.get('counterparty_orgid'),
'period_type': period_type,
'period_start': period_start,
'period_end': period_end,
}, None
async def _ensure_schema(sor):
await sor.sqlExe("""
CREATE TABLE IF NOT EXISTS finance_settlement (
id varchar(32) NOT NULL,
settlement_no varchar(64),
accounting_orgid varchar(32),
counterparty_type varchar(16),
counterparty_orgid varchar(32),
counterparty_name varchar(255),
period_type varchar(16),
period_start date,
period_end date,
sales_amount double(18,8) DEFAULT 0,
settlement_amount double(18,8) DEFAULT 0,
platform_income_amount double(18,8) DEFAULT 0,
bill_count int DEFAULT 0,
status varchar(16) DEFAULT 'draft',
approval_id varchar(64),
approval_status varchar(16),
failure_reason varchar(1000),
settled_at datetime,
created_by varchar(32),
del_flg varchar(1) DEFAULT '0',
create_at datetime DEFAULT CURRENT_TIMESTAMP,
update_at datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY finance_settlement_un (accounting_orgid, counterparty_type, counterparty_orgid, period_start, period_end, del_flg)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""", {})
await sor.sqlExe("""
CREATE TABLE IF NOT EXISTS finance_settlement_detail (
id varchar(32) NOT NULL,
settlement_id varchar(32),
bill_id varchar(32),
order_id varchar(32),
bill_date date,
customerid varchar(32),
providerid varchar(32),
productid varchar(32),
business_op varchar(64),
sale_mode varchar(1),
sales_amount double(18,8) DEFAULT 0,
settlement_amount double(18,8) DEFAULT 0,
platform_income_amount double(18,8) DEFAULT 0,
amount_source varchar(32) DEFAULT 'bill_detail',
del_flg varchar(1) DEFAULT '0',
create_at datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY finance_settlement_detail_idx1 (settlement_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""", {})
async def _existing_settlement(sor, args):
rows = await sor.sqlExe("""
SELECT id, settlement_no, status
FROM finance_settlement
WHERE accounting_orgid=${accounting_orgid}$
AND counterparty_type=${counterparty_type}$
AND counterparty_orgid=${counterparty_orgid}$
AND period_start=${period_start}$
AND period_end=${period_end}$
AND del_flg='0'
LIMIT 1
""", args)
return rows[0] if rows else None
async def _counterparty_name(sor, args):
if args['counterparty_type'] == 'supplier':
rows = await sor.sqlExe("""
SELECT COALESCE(p.name, o.orgname) AS name
FROM organization o
LEFT JOIN provider p ON p.orgid=o.id AND p.del_flg='0'
WHERE o.id=${counterparty_orgid}$ AND o.del_flg='0'
LIMIT 1
""", args)
else:
rows = await sor.sqlExe("""
SELECT orgname AS name FROM organization
WHERE id=${counterparty_orgid}$ AND del_flg='0'
LIMIT 1
""", args)
return rows[0].get('name') if rows else None
async def _fetch_source_rows(sor, args):
params = dict(args)
filters = [
"b.del_flg='0'",
"b.bill_state='1'",
"b.bill_date >= ${period_start}$",
"b.bill_date <= ${period_end}$",
]
if args['counterparty_type'] == 'supplier':
filters.append('b.providerid=${counterparty_orgid}$')
settlement_join = """
LEFT JOIN bill_detail sd ON sd.billid=b.id
AND sd.accounting_orgid=${accounting_orgid}$
AND sd.del_flg='0'
AND sd.accounting_dir='贷'
AND sd.subjectname LIKE '待结转%'
"""
settlement_expr = 'COALESCE(sd.amount, 0)'
sale_mode_expr = 'sd.subjectname'
else:
filters.append('cust.parentid=${counterparty_orgid}$')
settlement_join = """
LEFT JOIN bill_detail sd ON sd.billid=b.id
AND sd.accounting_orgid=${accounting_orgid}$
AND sd.del_flg='0'
AND sd.accounting_dir='借'
AND sd.subjectname='分销商存放资金'
AND sd.participantid=cust.parentid
"""
settlement_expr = 'COALESCE(sd.amount, 0)'
sale_mode_expr = 'sd.subjectname'
where_sql = ' AND '.join(filters)
sql = """
SELECT b.id AS bill_id,
b.orderid AS order_id,
b.bill_date,
b.customerid,
b.providerid,
b.productid,
b.business_op,
b.amount AS sales_amount,
%s AS settlement_amount,
%s AS settlement_subject,
COALESCE((
SELECT SUM(bd.amount)
FROM bill_detail bd
WHERE bd.billid=b.id
AND bd.accounting_orgid=${accounting_orgid}$
AND bd.del_flg='0'
AND bd.accounting_dir='贷'
AND bd.subjectname IN ('折扣收入', '底价收入')
), 0) AS platform_income_amount
FROM bill b
INNER JOIN organization cust ON cust.id=b.customerid AND cust.del_flg='0'
%s
WHERE %s
ORDER BY b.bill_date DESC, b.create_at DESC
""" % (settlement_expr, sale_mode_expr, settlement_join, where_sql)
rows = await sor.sqlExe(sql, params)
items = []
seen_platform_bill = set()
for row in rows:
sign = -1 if _is_reverse_op(row.get('business_op')) else 1
bill_id = row.get('bill_id')
platform_income = float(row.get('platform_income_amount') or 0)
if bill_id in seen_platform_bill:
platform_income = 0
seen_platform_bill.add(bill_id)
items.append({
'bill_id': bill_id,
'order_id': row.get('order_id'),
'bill_date': str(row.get('bill_date'))[:10] if row.get('bill_date') else None,
'customerid': row.get('customerid'),
'providerid': row.get('providerid'),
'productid': row.get('productid'),
'business_op': row.get('business_op'),
'sale_mode': _sale_mode_from_subject(row.get('settlement_subject')),
'sales_amount': _round_money(sign * float(row.get('sales_amount') or 0)),
'settlement_amount': _round_money(sign * float(row.get('settlement_amount') or 0)),
'platform_income_amount': _round_money(sign * platform_income),
'amount_source': 'bill_detail',
})
return items
def _summary(items):
return {
'sales_amount': _round_money(sum(i.get('sales_amount') or 0 for i in items)),
'settlement_amount': _round_money(sum(i.get('settlement_amount') or 0 for i in items)),
'platform_income_amount': _round_money(sum(i.get('platform_income_amount') or 0 for i in items)),
'bill_count': len({i.get('bill_id') for i in items if i.get('bill_id')}),
}
async def finance_settlement_preview(ns={}):
args, err = _validate_params(ns)
if err:
return {'status': False, 'msg': err}
current_page, page_size, offset = _parse_page(ns)
db = DBPools()
async with db.sqlorContext(DBNAME) as sor:
try:
await _ensure_schema(sor)
existing = await _existing_settlement(sor, args)
items = await _fetch_source_rows(sor, args)
total_count = len(items)
return {
'status': True,
'msg': 'ok',
'data': {
'accounting_orgid': args['accounting_orgid'],
'counterparty_type': args['counterparty_type'],
'counterparty_orgid': args['counterparty_orgid'],
'counterparty_name': await _counterparty_name(sor, args),
'period_type': args['period_type'],
'period_start': args['period_start'],
'period_end': args['period_end'],
'existing_settlement': existing,
'can_create': existing is None and total_count > 0,
'summary': _summary(items),
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'items': items[offset:offset + page_size],
},
}
except Exception as e:
return {'status': False, 'msg': '预览失败, %s' % str(e)}
ret = await finance_settlement_preview(params_kw)
return ret