feat(llmage): 日期变更触发备份(use_date<昨天) + llmusage添加(accounting_status,use_date)组合索引
This commit is contained in:
parent
8755ab5d6d
commit
9fa8f9fa62
@ -1,7 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from appPublic.log import exception, debug
|
from appPublic.log import exception, debug
|
||||||
from appPublic.uniqueID import getID
|
from appPublic.uniqueID import getID
|
||||||
from appPublic.dictObject import DictObject
|
from appPublic.dictObject import DictObject
|
||||||
@ -239,20 +239,18 @@ async def llm_accoung_failed(luid, reason=None):
|
|||||||
await sor.C('llmusage_accounting_failed', failed_rec)
|
await sor.C('llmusage_accounting_failed', failed_rec)
|
||||||
|
|
||||||
|
|
||||||
async def backup_accounted_llmusage():
|
async def backup_accounted_llmusage(cutoff_date):
|
||||||
"""Backup accounted records with use_date before today to history table."""
|
"""Backup accounted records with use_date < cutoff_date to history table."""
|
||||||
env = ServerEnv()
|
env = ServerEnv()
|
||||||
today = datetime.now().strftime('%Y-%m-%d')
|
|
||||||
ts = env.timestampstr()
|
ts = env.timestampstr()
|
||||||
batched = 0
|
batched = 0
|
||||||
async with get_sor_context(env, 'llmage') as sor:
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
# Select records with use_date < today (i.e. yesterday and earlier)
|
|
||||||
sql = """select * from llmusage
|
sql = """select * from llmusage
|
||||||
where accounting_status='accounted'
|
where accounting_status='accounted'
|
||||||
and use_date < ${today}$"""
|
and use_date < ${cutoff_date}$"""
|
||||||
recs = await sor.sqlExe(sql, {'today': today})
|
recs = await sor.sqlExe(sql, {'cutoff_date': cutoff_date})
|
||||||
if not recs:
|
if not recs:
|
||||||
debug(f'backup_accounted_llmusage: no records to backup for use_date < {today}')
|
debug(f'backup_accounted_llmusage: no records to backup for use_date < {cutoff_date}')
|
||||||
return 0
|
return 0
|
||||||
debug(f'backup_accounted_llmusage: {len(recs)} records to backup')
|
debug(f'backup_accounted_llmusage: {len(recs)} records to backup')
|
||||||
for r in recs:
|
for r in recs:
|
||||||
@ -280,7 +278,7 @@ where accounting_status='accounted'
|
|||||||
# Delete from main table
|
# Delete from main table
|
||||||
await sor.D('llmusage', {'id': r.id})
|
await sor.D('llmusage', {'id': r.id})
|
||||||
batched += 1
|
batched += 1
|
||||||
debug(f'backup_accounted_llmusage: backed up {batched} records')
|
debug(f'backup_accounted_llmusage: backed up {batched} records for use_date < {cutoff_date}')
|
||||||
return batched
|
return batched
|
||||||
|
|
||||||
|
|
||||||
@ -335,14 +333,13 @@ order by failed_time desc limit {page_size} offset {offset}"""
|
|||||||
async def backend_accounting():
|
async def backend_accounting():
|
||||||
env = ServerEnv()
|
env = ServerEnv()
|
||||||
debug(f'backend accounting started ...')
|
debug(f'backend accounting started ...')
|
||||||
backup_counter = 0
|
last_backup_date = None
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
lus = await get_accounting_llmusages()
|
lus = await get_accounting_llmusages()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
exception(f'{e}')
|
exception(f'{e}')
|
||||||
lus = []
|
lus = []
|
||||||
# debug(f'{len(lus)=} need to accounting........')
|
|
||||||
for lu in lus:
|
for lu in lus:
|
||||||
try:
|
try:
|
||||||
tpac = await get_user_tpac(lu.userid)
|
tpac = await get_user_tpac(lu.userid)
|
||||||
@ -356,12 +353,14 @@ async def backend_accounting():
|
|||||||
exception(f'{e}, {lu.id=}')
|
exception(f'{e}, {lu.id=}')
|
||||||
await llm_accoung_failed(lu.id, reason=str(e))
|
await llm_accoung_failed(lu.id, reason=str(e))
|
||||||
|
|
||||||
# Run backup every 30 iterations (~5 minutes)
|
# Check if date changed, trigger backup once per day
|
||||||
backup_counter += 1
|
today = datetime.now().strftime('%Y-%m-%d')
|
||||||
if backup_counter >= 30:
|
if today != last_backup_date:
|
||||||
backup_counter = 0
|
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||||||
|
last_backup_date = today
|
||||||
try:
|
try:
|
||||||
await backup_accounted_llmusage()
|
debug(f'date changed to {today}, triggering backup for use_date < {yesterday}')
|
||||||
|
await backup_accounted_llmusage(yesterday)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
exception(f'backup_accounted_llmusage failed: {e}')
|
exception(f'backup_accounted_llmusage failed: {e}')
|
||||||
|
|
||||||
|
|||||||
@ -123,6 +123,14 @@
|
|||||||
"idxfields": [
|
"idxfields": [
|
||||||
"userid"
|
"userid"
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "idx_llmusage_accounting",
|
||||||
|
"idxtype": "index",
|
||||||
|
"idxfields": [
|
||||||
|
"accounting_status",
|
||||||
|
"use_date"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@ -69,6 +69,9 @@ CREATE INDEX idx_laf_llmid ON llmusage_accounting_failed(llmid);
|
|||||||
CREATE INDEX idx_laf_handled ON llmusage_accounting_failed(handled);
|
CREATE INDEX idx_laf_handled ON llmusage_accounting_failed(handled);
|
||||||
CREATE INDEX idx_laf_failed_time ON llmusage_accounting_failed(failed_time);
|
CREATE INDEX idx_laf_failed_time ON llmusage_accounting_failed(failed_time);
|
||||||
|
|
||||||
|
-- 3. 为 llmusage 表添加组合索引(优化备份查询: accounting_status + use_date)
|
||||||
|
CREATE INDEX idx_llmusage_accounting ON llmusage(accounting_status, use_date);
|
||||||
|
|
||||||
-- ============================================================
|
-- ============================================================
|
||||||
-- 验证步骤(执行后运行):
|
-- 验证步骤(执行后运行):
|
||||||
-- 1. 确认表创建成功:
|
-- 1. 确认表创建成功:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user