# -*- coding: utf-8 -*- """财务结算单生成前预览。""" DBNAME = 'kboss' def _round_money(v): return round(float(v or 0), 8) def _is_reverse_op(op): return str(op or '').upper().endswith('_REVERSE') def _sale_mode_from_subject(subjectname): text = str(subjectname or '') if '代付费' in text or '返佣' in text: return '1' if '底价' in text or '低价' in text: return '2' return '0' def _parse_page(ns, default_page=1, default_size=50, max_size=500): try: current_page = int(ns.get('current_page', default_page) or default_page) except (TypeError, ValueError): current_page = default_page try: page_size = int(ns.get('page_size', default_size) or default_size) except (TypeError, ValueError): page_size = default_size current_page = max(1, current_page) page_size = max(1, min(page_size, max_size)) return current_page, page_size, (current_page - 1) * page_size def _validate_params(ns): required = ('accounting_orgid', 'counterparty_type', 'counterparty_orgid') for key in required: if not ns.get(key): return None, '缺少 %s' % key counterparty_type = ns.get('counterparty_type') if counterparty_type not in ('supplier', 'reseller'): return None, 'counterparty_type 仅支持 supplier / reseller' period_type = ns.get('period_type') or 'day' if period_type not in ('day', 'month'): return None, 'period_type 仅支持 day / month' period_start = ns.get('period_start') or ns.get('start_date') period_end = ns.get('period_end') or ns.get('end_date') if not period_start or not period_end: return None, '缺少 period_start / period_end' return { 'accounting_orgid': ns.get('accounting_orgid'), 'counterparty_type': counterparty_type, 'counterparty_orgid': ns.get('counterparty_orgid'), 'period_type': period_type, 'period_start': period_start, 'period_end': period_end, }, None async def _ensure_schema(sor): await sor.sqlExe(""" CREATE TABLE IF NOT EXISTS finance_settlement ( id varchar(32) NOT NULL, settlement_no varchar(64), accounting_orgid varchar(32), counterparty_type varchar(16), counterparty_orgid varchar(32), counterparty_name varchar(255), period_type varchar(16), period_start date, period_end date, sales_amount double(18,8) DEFAULT 0, settlement_amount double(18,8) DEFAULT 0, platform_income_amount double(18,8) DEFAULT 0, bill_count int DEFAULT 0, status varchar(16) DEFAULT 'draft', approval_id varchar(64), approval_status varchar(16), failure_reason varchar(1000), settled_at datetime, created_by varchar(32), del_flg varchar(1) DEFAULT '0', create_at datetime DEFAULT CURRENT_TIMESTAMP, update_at datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY finance_settlement_un (accounting_orgid, counterparty_type, counterparty_orgid, period_start, period_end, del_flg) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """, {}) await sor.sqlExe(""" CREATE TABLE IF NOT EXISTS finance_settlement_detail ( id varchar(32) NOT NULL, settlement_id varchar(32), bill_id varchar(32), order_id varchar(32), bill_date date, customerid varchar(32), providerid varchar(32), productid varchar(32), business_op varchar(64), sale_mode varchar(1), sales_amount double(18,8) DEFAULT 0, settlement_amount double(18,8) DEFAULT 0, platform_income_amount double(18,8) DEFAULT 0, amount_source varchar(32) DEFAULT 'bill_detail', del_flg varchar(1) DEFAULT '0', create_at datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY finance_settlement_detail_idx1 (settlement_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """, {}) async def _existing_settlement(sor, args): rows = await sor.sqlExe(""" SELECT id, settlement_no, status FROM finance_settlement WHERE accounting_orgid=${accounting_orgid}$ AND counterparty_type=${counterparty_type}$ AND counterparty_orgid=${counterparty_orgid}$ AND period_start=${period_start}$ AND period_end=${period_end}$ AND del_flg='0' LIMIT 1 """, args) return rows[0] if rows else None async def _counterparty_name(sor, args): if args['counterparty_type'] == 'supplier': rows = await sor.sqlExe(""" SELECT COALESCE(p.name, o.orgname) AS name FROM organization o LEFT JOIN provider p ON p.orgid=o.id AND p.del_flg='0' WHERE o.id=${counterparty_orgid}$ AND o.del_flg='0' LIMIT 1 """, args) else: rows = await sor.sqlExe(""" SELECT orgname AS name FROM organization WHERE id=${counterparty_orgid}$ AND del_flg='0' LIMIT 1 """, args) return rows[0].get('name') if rows else None async def _fetch_source_rows(sor, args): params = dict(args) filters = [ "b.del_flg='0'", "b.bill_state='1'", "b.bill_date >= ${period_start}$", "b.bill_date <= ${period_end}$", ] if args['counterparty_type'] == 'supplier': filters.append('b.providerid=${counterparty_orgid}$') settlement_join = """ LEFT JOIN bill_detail sd ON sd.billid=b.id AND sd.accounting_orgid=${accounting_orgid}$ AND sd.del_flg='0' AND sd.accounting_dir='贷' AND sd.subjectname LIKE '待结转%' """ settlement_expr = 'COALESCE(sd.amount, 0)' sale_mode_expr = 'sd.subjectname' else: filters.append('cust.parentid=${counterparty_orgid}$') settlement_join = """ LEFT JOIN bill_detail sd ON sd.billid=b.id AND sd.accounting_orgid=${accounting_orgid}$ AND sd.del_flg='0' AND sd.accounting_dir='借' AND sd.subjectname='分销商存放资金' AND sd.participantid=cust.parentid """ settlement_expr = 'COALESCE(sd.amount, 0)' sale_mode_expr = 'sd.subjectname' where_sql = ' AND '.join(filters) sql = """ SELECT b.id AS bill_id, b.orderid AS order_id, b.bill_date, b.customerid, b.providerid, b.productid, b.business_op, b.amount AS sales_amount, %s AS settlement_amount, %s AS settlement_subject, COALESCE(( SELECT SUM(bd.amount) FROM bill_detail bd WHERE bd.billid=b.id AND bd.accounting_orgid=${accounting_orgid}$ AND bd.del_flg='0' AND bd.accounting_dir='贷' AND bd.subjectname IN ('折扣收入', '底价收入') ), 0) AS platform_income_amount FROM bill b INNER JOIN organization cust ON cust.id=b.customerid AND cust.del_flg='0' %s WHERE %s ORDER BY b.bill_date DESC, b.create_at DESC """ % (settlement_expr, sale_mode_expr, settlement_join, where_sql) rows = await sor.sqlExe(sql, params) items = [] seen_platform_bill = set() for row in rows: sign = -1 if _is_reverse_op(row.get('business_op')) else 1 bill_id = row.get('bill_id') platform_income = float(row.get('platform_income_amount') or 0) if bill_id in seen_platform_bill: platform_income = 0 seen_platform_bill.add(bill_id) items.append({ 'bill_id': bill_id, 'order_id': row.get('order_id'), 'bill_date': str(row.get('bill_date'))[:10] if row.get('bill_date') else None, 'customerid': row.get('customerid'), 'providerid': row.get('providerid'), 'productid': row.get('productid'), 'business_op': row.get('business_op'), 'sale_mode': _sale_mode_from_subject(row.get('settlement_subject')), 'sales_amount': _round_money(sign * float(row.get('sales_amount') or 0)), 'settlement_amount': _round_money(sign * float(row.get('settlement_amount') or 0)), 'platform_income_amount': _round_money(sign * platform_income), 'amount_source': 'bill_detail', }) return items def _summary(items): return { 'sales_amount': _round_money(sum(i.get('sales_amount') or 0 for i in items)), 'settlement_amount': _round_money(sum(i.get('settlement_amount') or 0 for i in items)), 'platform_income_amount': _round_money(sum(i.get('platform_income_amount') or 0 for i in items)), 'bill_count': len({i.get('bill_id') for i in items if i.get('bill_id')}), } async def finance_settlement_preview(ns={}): args, err = _validate_params(ns) if err: return {'status': False, 'msg': err} current_page, page_size, offset = _parse_page(ns) db = DBPools() async with db.sqlorContext(DBNAME) as sor: try: await _ensure_schema(sor) existing = await _existing_settlement(sor, args) items = await _fetch_source_rows(sor, args) total_count = len(items) return { 'status': True, 'msg': 'ok', 'data': { 'accounting_orgid': args['accounting_orgid'], 'counterparty_type': args['counterparty_type'], 'counterparty_orgid': args['counterparty_orgid'], 'counterparty_name': await _counterparty_name(sor, args), 'period_type': args['period_type'], 'period_start': args['period_start'], 'period_end': args['period_end'], 'existing_settlement': existing, 'can_create': existing is None and total_count > 0, 'summary': _summary(items), 'total_count': total_count, 'current_page': current_page, 'page_size': page_size, 'items': items[offset:offset + page_size], }, } except Exception as e: return {'status': False, 'msg': '预览失败, %s' % str(e)} ret = await finance_settlement_preview(params_kw) return ret