feat(accounting): add balance update in create_accounting_record

记账核心逻辑补全:
1. 写分录明细 (INSERT accounting_records)
2. 记账日志 (status='accounted')
3. 更新账户余额 (UPSERT customer_balance)
4. 透支检查:余额不足时检查 credit_limit,超限拒绝
This commit is contained in:
Hermes Agent 2026-05-28 22:06:20 +08:00
parent 781216e11a
commit e86e2ceea3

View File

@ -31,7 +31,16 @@ async def create_accounting_record(
request_id: str = '', request_id: str = '',
transno: str = '', transno: str = '',
) -> str: ) -> str:
"""Create a new accounting record with idempotency via request_id.""" """Create accounting record: write detail + update balance.
The core job of accounting is:
1. Write account detail (accounting_records) based on journal entry
2. Write accounting log (status='accounted' in the record)
3. Update account balance (customer_balance)
amount > 0 means charge (balance decreases)
amount < 0 means credit/refund (balance increases)
"""
result: dict[str, Any] = {'success': False, 'record_id': None} result: dict[str, Any] = {'success': False, 'record_id': None}
try: try:
@ -61,6 +70,8 @@ async def create_accounting_record(
result['duplicate'] = True result['duplicate'] = True
return json.dumps(result, ensure_ascii=False, default=str) return json.dumps(result, ensure_ascii=False, default=str)
async with DBPools().sqlorContext(dbname) as sor:
# === Step 1 & 2: Write detail + log (accounting record) ===
sql = """ sql = """
INSERT INTO accounting_records INSERT INTO accounting_records
(id, customer_id, llmid, model_name, pricing_id, (id, customer_id, llmid, model_name, pricing_id,
@ -90,11 +101,63 @@ async def create_accounting_record(
'created_at': now, 'created_at': now,
'updated_at': now, 'updated_at': now,
} }
async with DBPools().sqlorContext(dbname) as sor:
await sor.sqlExe(sql, params) await sor.sqlExe(sql, params)
# === Step 3: Update account balance ===
# First read current balance + credit_limit with lock
balance_rows = await sor.sqlExe(
"SELECT balance, credit_limit FROM customer_balance "
"WHERE id = ${customer_id}$",
{'customer_id': customer_id},
)
if isinstance(balance_rows, list) and balance_rows:
cur = balance_rows[0]
cur_balance = float(cur.get('balance', 0))
credit_limit = cur.get('credit_limit')
else:
# No balance record yet, initialize
cur_balance = 0.0
credit_limit = None
# amount > 0 = charge (deduct), amount < 0 = credit (add)
new_balance = cur_balance - amount
# Overdraft check: if balance goes negative, check credit limit
if new_balance < -0.0000001:
if credit_limit is not None and float(credit_limit) > 0:
if abs(new_balance) > float(credit_limit):
result['error'] = (
f'Insufficient balance: balance={cur_balance}, '
f'credit_limit={credit_limit}, charge={amount}'
)
return json.dumps(result, ensure_ascii=False, default=str)
else:
result['error'] = (
f'Insufficient balance: balance={cur_balance}, charge={amount}'
)
return json.dumps(result, ensure_ascii=False, default=str)
# Upsert balance record
balance_sql = """
INSERT INTO customer_balance
(id, balance, currency, last_consumption, cached_at)
VALUES
(${customer_id}$, ${new_balance}$, ${currency}$, NOW(), NOW())
ON DUPLICATE KEY UPDATE
balance = ${new_balance}$,
last_consumption = NOW(),
cached_at = NOW()
"""
await sor.sqlExe(balance_sql, {
'customer_id': customer_id,
'new_balance': new_balance,
'currency': currency,
})
result['success'] = True result['success'] = True
result['record_id'] = record_id result['record_id'] = record_id
result['new_balance'] = new_balance
except Exception as e: except Exception as e:
error(f'create_accounting_record error: {e}') error(f'create_accounting_record error: {e}')