# -*- coding: utf-8 -*- """ 财务结算汇总查询。 按供应商或分销商聚合日结/月结应结金额、平台收入和销售额。 金额以已记账 bill_detail 为准,不使用未记账估算。 """ DBNAME = 'kboss' INCOME_SUBJECTS = ('折扣收入', '底价收入') PARENT_SETTLE_SUBJECT = '分销商存放资金' def _round_money(v): return round(float(v or 0), 8) def _is_reverse_op(op): return str(op or '').upper().endswith('_REVERSE') def _parse_page(ns, default_page=1, default_size=20, max_size=200): try: current_page = int(ns.get('current_page', default_page) or default_page) except (TypeError, ValueError): current_page = default_page try: page_size = int(ns.get('page_size', default_size) or default_size) except (TypeError, ValueError): page_size = default_size current_page = max(1, current_page) page_size = max(1, min(page_size, max_size)) return current_page, page_size, (current_page - 1) * page_size def _period_key(bill_date, period_type): text = str(bill_date or '')[:10] if period_type == 'month': return text[:7] return text def _validate_params(ns): debug(ns) accounting_orgid = ns.get('accounting_orgid') counterparty_type = ns.get('counterparty_type') period_type = ns.get('period_type') or 'day' start_date = ns.get('start_date') or ns.get('period_start') end_date = ns.get('end_date') or ns.get('period_end') if not accounting_orgid: return None, '缺少 accounting_orgid' if counterparty_type not in ('supplier', 'reseller'): return None, 'counterparty_type 仅支持 supplier / reseller' if period_type not in ('day', 'month'): return None, 'period_type 仅支持 day / month' if not start_date or not end_date: return None, '缺少 start_date / end_date' return { 'accounting_orgid': accounting_orgid, 'counterparty_type': counterparty_type, 'period_type': period_type, 'start_date': start_date, 'end_date': end_date, 'counterparty_orgid': ns.get('counterparty_orgid'), }, None async def _fetch_source_rows(sor, args): params = { 'accounting_orgid': args['accounting_orgid'], 'start_date': args['start_date'], 'end_date': args['end_date'], } filters = [ "b.del_flg = '0'", "b.bill_state = '1'", "b.bill_date >= ${start_date}$", "b.bill_date <= ${end_date}$", ] if args['counterparty_type'] == 'supplier': counterparty_expr = 'b.providerid' name_expr = 'COALESCE(pv.name, cp.orgname)' join_sql = """ LEFT JOIN provider pv ON pv.orgid = b.providerid AND pv.del_flg = '0' LEFT JOIN organization cp ON cp.id = b.providerid AND cp.del_flg = '0' """ settlement_expr = """ SELECT SUM(bd.amount) FROM bill_detail bd WHERE bd.billid = b.id AND bd.accounting_orgid = ${accounting_orgid}$ AND bd.del_flg = '0' AND bd.accounting_dir = '贷' AND bd.subjectname LIKE '待结转%' """ if args.get('counterparty_orgid'): filters.append('b.providerid = ${counterparty_orgid}$') params['counterparty_orgid'] = args['counterparty_orgid'] else: counterparty_expr = 'cust.parentid' name_expr = 'cp.orgname' join_sql = "LEFT JOIN organization cp ON cp.id = cust.parentid AND cp.del_flg = '0'" settlement_expr = """ SELECT SUM(bd.amount) FROM bill_detail bd WHERE bd.billid = b.id AND bd.accounting_orgid = ${accounting_orgid}$ AND bd.del_flg = '0' AND bd.accounting_dir = '借' AND bd.subjectname = '分销商存放资金' AND bd.participantid = cust.parentid """ filters.append("cust.parentid IS NOT NULL AND cust.parentid != ''") if args.get('counterparty_orgid'): filters.append('cust.parentid = ${counterparty_orgid}$') params['counterparty_orgid'] = args['counterparty_orgid'] where_sql = ' AND '.join(filters) sql = """ SELECT b.id AS bill_id, b.orderid AS order_id, b.bill_date, b.customerid, b.providerid, b.productid, b.business_op, b.amount AS sales_amount, %s AS counterparty_orgid, %s AS counterparty_name, COALESCE((%s), 0) AS settlement_amount, COALESCE(( SELECT SUM(bd.amount) FROM bill_detail bd WHERE bd.billid = b.id AND bd.accounting_orgid = ${accounting_orgid}$ AND bd.del_flg = '0' AND bd.accounting_dir = '贷' AND bd.subjectname IN ('折扣收入', '底价收入') ), 0) AS platform_income_amount FROM bill b INNER JOIN organization cust ON cust.id = b.customerid AND cust.del_flg = '0' %s WHERE %s ORDER BY b.bill_date DESC, b.create_at DESC """ % (counterparty_expr, name_expr, settlement_expr, join_sql, where_sql) return await sor.sqlExe(sql, params) def _aggregate_rows(rows, period_type): buckets = {} for row in rows: counterparty_orgid = row.get('counterparty_orgid') if not counterparty_orgid: continue period = _period_key(row.get('bill_date'), period_type) key = (period, counterparty_orgid) if key not in buckets: buckets[key] = { 'period': period, 'counterparty_orgid': counterparty_orgid, 'counterparty_name': row.get('counterparty_name'), 'sales_amount': 0.0, 'settlement_amount': 0.0, 'platform_income_amount': 0.0, 'bill_count': 0, } sign = -1 if _is_reverse_op(row.get('business_op')) else 1 bucket = buckets[key] bucket['sales_amount'] += sign * float(row.get('sales_amount') or 0) bucket['settlement_amount'] += sign * float(row.get('settlement_amount') or 0) bucket['platform_income_amount'] += sign * float(row.get('platform_income_amount') or 0) bucket['bill_count'] += 1 items = [] for item in buckets.values(): item['sales_amount'] = _round_money(item['sales_amount']) item['settlement_amount'] = _round_money(item['settlement_amount']) item['platform_income_amount'] = _round_money(item['platform_income_amount']) items.append(item) return sorted(items, key=lambda x: (x['period'], x['settlement_amount']), reverse=True) def _summary(items): return { 'sales_amount': _round_money(sum(i.get('sales_amount') or 0 for i in items)), 'settlement_amount': _round_money(sum(i.get('settlement_amount') or 0 for i in items)), 'platform_income_amount': _round_money(sum(i.get('platform_income_amount') or 0 for i in items)), 'bill_count': sum(i.get('bill_count') or 0 for i in items), } async def finance_settlement_summary(ns={}): args, err = _validate_params(ns) if err: return {'status': False, 'msg': err} current_page, page_size, offset = _parse_page(ns) db = DBPools() async with db.sqlorContext(DBNAME) as sor: try: rows = await _fetch_source_rows(sor, args) items = _aggregate_rows(rows, args['period_type']) total_count = len(items) page_items = items[offset:offset + page_size] return { 'status': True, 'msg': 'ok', 'data': { 'accounting_orgid': args['accounting_orgid'], 'counterparty_type': args['counterparty_type'], 'period_type': args['period_type'], 'start_date': args['start_date'], 'end_date': args['end_date'], 'summary': _summary(items), 'total_count': total_count, 'current_page': current_page, 'page_size': page_size, 'items': page_items, }, } except Exception as e: return {'status': False, 'msg': '查询失败, %s' % str(e)} ret = await finance_settlement_summary(params_kw) return ret