feat(llmage): 备份改用INSERT SELECT+DELETE单SQL语句 + 新增失败记录重试功能

This commit is contained in:
yumoqing 2026-05-24 21:57:50 +08:00
parent 686b05d8fe
commit 7911750127
3 changed files with 79 additions and 18 deletions

View File

@ -240,27 +240,28 @@ async def llm_accoung_failed(luid, reason=None):
async def backup_accounted_llmusage(cutoff_date):
"""Backup accounted records with use_date < cutoff_date to history table."""
"""Backup accounted records with use_date < cutoff_date to history table using single SQL statements."""
env = ServerEnv()
ts = env.timestampstr()
batched = 0
recs = []
async with get_sor_context(env, 'llmage') as sor:
sql = """select * from llmusage
where accounting_status='accounted'
and use_date < ${cutoff_date}$"""
recs = await sor.sqlExe(sql, {'cutoff_date': cutoff_date})
if not recs:
debug(f'backup_accounted_llmusage: no records to backup for use_date < {cutoff_date}')
return 0
debug(f'backup_accounted_llmusage: {cutoff_date} {len(recs)} records to backup')
for r in recs:
async with get_sor_context(env, 'llmage') as sor:
await sor.C('llmusage_history', r.copy())
await sor.D('llmusage', {'id': r.id})
batched += 1
debug(f'backup_accounted_llmusage: backed up {batched} records for use_date < {cutoff_date}')
return batched
# Step 1: INSERT INTO history SELECT from main table
insert_sql = """INSERT INTO llmusage_history
(id, llmid, use_date, use_time, userid, usages, ioinfo, transno, responsed_seconds, finish_seconds, status, taskid, amount, cost, userorgid, ownerid, accounting_status, backup_time)
SELECT id, llmid, use_date, use_time, userid, usages, ioinfo, transno, responsed_seconds, finish_seconds, status, taskid, amount, cost, userorgid, ownerid, accounting_status, ${ts}$
FROM llmusage
WHERE accounting_status='accounted' AND use_date < ${cutoff_date}$"""
result = await sor.execute(insert_sql, {'cutoff_date': cutoff_date, 'ts': ts})
inserted = result if isinstance(result, int) else 0
debug(f'backup_accounted_llmusage: {inserted} records inserted to history')
if inserted > 0:
# Step 2: DELETE from main table
delete_sql = """DELETE FROM llmusage
WHERE accounting_status='accounted' AND use_date < ${cutoff_date}$"""
await sor.execute(delete_sql, {'cutoff_date': cutoff_date})
debug(f'backup_accounted_llmusage: {inserted} records deleted from main table')
return inserted
async def get_failed_accounting_records(filters=None, page=1, page_size=50):

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
import json
from datetime import datetime
result = {'success': False, 'message': ''}
try:
dbname = get_module_dbname('llmage')
luid = params_kw.get('id') or params_kw.get('llmusageid')
if not luid:
result['message'] = '缺少llmusageid参数'
else:
async with DBPools().sqlorContext(dbname) as sor:
# 1. 重置 llmusage 记账状态为 created让后台循环重新处理
await sor.U('llmusage', {
'id': luid,
'accounting_status': 'created'
})
# 2. 更新失败记录:标记已处理,增加重试次数
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
await sor.execute("""
UPDATE llmusage_accounting_failed
SET handled = '1',
retry_count = IFNULL(retry_count, 0) + 1,
handled_time = ${now}$,
handled_note = CONCAT(IFNULL(handled_note, ''), '[', ${now}$, '] 触发重试; ')
WHERE llmusageid = ${luid}$
""", {'luid': luid, 'now': now})
result['success'] = True
result['message'] = '已重置为待记账状态,后台循环将重新处理'
except Exception as e:
result['message'] = str(e)
return json.dumps(result, ensure_ascii=False, default=str)

View File

@ -80,6 +80,30 @@
}]
}
]
},
{
"widgettype": "VBox",
"options": {"spacing": 4},
"subwidgets": [
{"widgettype": "Text", "options": {"text": "", "fontSize": "12px"}},
{
"widgettype": "Button",
"id": "retry_btn",
"options": {
"label": "重试",
"bgcolor": "#4caf50",
"color": "#ffffff",
"width": "80px"
},
"binds": [{
"wid": "self",
"event": "click",
"actiontype": "script",
"target": "self",
"script": "var dv = this.root.getElementById('failed_table'); var row = dv.selected_row || (dv.selected_rows && dv.selected_rows[0]); if(!row || !row.llmusageid) { alert('请先选中一条记录'); return; } var url = bricks.build_url ? bricks.build_url('/llmage/api/retry_accounting.dspy') : '/llmage/api/retry_accounting.dspy'; fetch(url + '?id=' + row.llmusageid).then(function(r){return r.json();}).then(function(d){ if(d.success) { alert(d.message); dv.load({}); } else { alert('失败: ' + d.message); } }).catch(function(e){ alert('请求异常: ' + e); });"
}]
}
]
}
]
},