feat: add recover_usages API - recover null usages from ioinfo files
- Add recover_usages() function to bugfix/init.py - Reads ioinfo JSON files for llmusage records with null usages - Extracts usage from last output entry - Falls back to scanning all outputs in reverse for usage field - Updates llmusage.usages in database - Supports single record (by id) or batch mode (limit param) - Add recover_usages.dspy API endpoint - Register new path in load_path.py RBAC config
This commit is contained in:
parent
a820c64ddb
commit
a1bc859338
148
bugfix/init.py
148
bugfix/init.py
@ -190,11 +190,159 @@ async def tail_log_file(filename, reset=False):
|
||||
return {'status': 'error', 'error': str(e)}
|
||||
|
||||
|
||||
async def recover_usages(llmusage_id=None, limit=100):
|
||||
"""从ioinfo文件中恢复usages字段
|
||||
|
||||
读取llmusage记录中ioinfo指向的文件,从最后一条output中提取usage,
|
||||
回写到llmusage.usages字段。
|
||||
|
||||
Args:
|
||||
llmusage_id: 指定单条记录ID,为None时批量处理所有usages为null的记录
|
||||
limit: 批量模式下最多处理条数,默认100
|
||||
|
||||
Returns:
|
||||
dict: {status, processed, recovered, failed, details}
|
||||
"""
|
||||
from ahserver.filestorage import FileStorage
|
||||
import json as _json
|
||||
import os
|
||||
|
||||
env = ServerEnv()
|
||||
dbname = env.get_module_dbname('llmage')
|
||||
|
||||
details = []
|
||||
recovered = 0
|
||||
failed = 0
|
||||
|
||||
try:
|
||||
async with get_sor_context(env, dbname) as sor:
|
||||
# 查询usages为null的记录
|
||||
if llmusage_id:
|
||||
sql = """select a.id, a.llmid, a.ioinfo, a.status, a.use_date,
|
||||
b.model
|
||||
from llmusage a
|
||||
left join llm b on a.llmid = b.id
|
||||
where a.id = ${id}$"""
|
||||
ns = {'id': llmusage_id}
|
||||
else:
|
||||
sql = """select a.id, a.llmid, a.ioinfo, a.status, a.use_date,
|
||||
b.model
|
||||
from llmusage a
|
||||
left join llm b on a.llmid = b.id
|
||||
where a.usages is null
|
||||
and a.status = 'SUCCEEDED'
|
||||
order by a.use_date desc"""
|
||||
ns = {'page': 1, 'rows': limit}
|
||||
|
||||
recs = await sor.sqlExe(sql, ns)
|
||||
rows = recs.get('rows', recs) if isinstance(recs, dict) else recs
|
||||
|
||||
if not rows:
|
||||
return {
|
||||
'status': 'ok',
|
||||
'processed': 0,
|
||||
'recovered': 0,
|
||||
'failed': 0,
|
||||
'details': [],
|
||||
'message': '没有找到需要恢复的记录'
|
||||
}
|
||||
|
||||
fs = FileStorage()
|
||||
|
||||
for r in rows:
|
||||
item = {
|
||||
'id': r.get('id', r.id if hasattr(r, 'id') else ''),
|
||||
'model': r.get('model', r.model if hasattr(r, 'model') else ''),
|
||||
'status': 'pending'
|
||||
}
|
||||
rid = item['id']
|
||||
|
||||
# 获取ioinfo路径
|
||||
ioinfo = r.get('ioinfo', r.ioinfo if hasattr(r, 'ioinfo') else None)
|
||||
if not ioinfo:
|
||||
item['status'] = 'skipped'
|
||||
item['reason'] = 'ioinfo为空'
|
||||
details.append(item)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
# 读取ioinfo文件
|
||||
real_path = fs.realPath(ioinfo)
|
||||
if not os.path.isfile(real_path):
|
||||
item['status'] = 'skipped'
|
||||
item['reason'] = f'文件不存在: {ioinfo}'
|
||||
details.append(item)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
with open(real_path, 'r', encoding='utf-8') as f:
|
||||
io_data = _json.load(f)
|
||||
|
||||
outputs = io_data.get('output', [])
|
||||
if not outputs:
|
||||
item['status'] = 'skipped'
|
||||
item['reason'] = 'output为空'
|
||||
details.append(item)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# 从最后一条output中获取usage
|
||||
last_output = outputs[-1]
|
||||
usage = last_output.get('usage') if isinstance(last_output, dict) else None
|
||||
|
||||
if not usage:
|
||||
# 尝试从所有output中倒序找第一个有usage的
|
||||
for out in reversed(outputs):
|
||||
if isinstance(out, dict) and out.get('usage'):
|
||||
usage = out['usage']
|
||||
break
|
||||
|
||||
if not usage:
|
||||
item['status'] = 'skipped'
|
||||
item['reason'] = 'output中未找到usage字段'
|
||||
details.append(item)
|
||||
failed += 1
|
||||
continue
|
||||
|
||||
# 回写到llmusage.usages
|
||||
usages_str = _json.dumps(usage, ensure_ascii=False)
|
||||
await sor.U('llmusage', {
|
||||
'id': rid,
|
||||
'usages': usages_str
|
||||
})
|
||||
|
||||
item['status'] = 'recovered'
|
||||
item['usage'] = usage
|
||||
details.append(item)
|
||||
recovered += 1
|
||||
|
||||
except Exception as e:
|
||||
item['status'] = 'error'
|
||||
item['reason'] = str(e)
|
||||
details.append(item)
|
||||
failed += 1
|
||||
exception(f'recover_usages error for {rid}: {e}')
|
||||
|
||||
return {
|
||||
'status': 'ok',
|
||||
'processed': recovered + failed,
|
||||
'recovered': recovered,
|
||||
'failed': failed,
|
||||
'details': details
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
exception(f'recover_usages error: {e}')
|
||||
return {'status': 'error', 'error': str(e)}
|
||||
|
||||
|
||||
def load_bugfix():
|
||||
"""注册函数到 ServerEnv"""
|
||||
env = ServerEnv()
|
||||
env.execute_select_sql = execute_select_sql
|
||||
env.read_log_file = read_log_file
|
||||
env.tail_log_file = tail_log_file
|
||||
env.recover_usages = recover_usages
|
||||
debug(f'[bugfix] module loaded')
|
||||
return True
|
||||
|
||||
@ -45,6 +45,7 @@ PATHS_DEVELOPER = [
|
||||
f"/{MOD}/api/execute_sql.dspy",
|
||||
f"/{MOD}/api/read_log.dspy",
|
||||
f"/{MOD}/api/tail_log.dspy",
|
||||
f"/{MOD}/api/recover_usages.dspy",
|
||||
]
|
||||
|
||||
|
||||
|
||||
5
wwwroot/api/recover_usages.dspy
Normal file
5
wwwroot/api/recover_usages.dspy
Normal file
@ -0,0 +1,5 @@
|
||||
llmusage_id = params_kw.get('id') or params_kw.get('llmusage_id') or None
|
||||
limit = int(params_kw.get('limit') or 100)
|
||||
|
||||
result = await recover_usages(llmusage_id=llmusage_id, limit=limit)
|
||||
return result
|
||||
Loading…
x
Reference in New Issue
Block a user