From a1bc8593384b597ea2e94f5fc0f2f3ac213efca5 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Mon, 15 Jun 2026 16:52:11 +0800 Subject: [PATCH] feat: add recover_usages API - recover null usages from ioinfo files - Add recover_usages() function to bugfix/init.py - Reads ioinfo JSON files for llmusage records with null usages - Extracts usage from last output entry - Falls back to scanning all outputs in reverse for usage field - Updates llmusage.usages in database - Supports single record (by id) or batch mode (limit param) - Add recover_usages.dspy API endpoint - Register new path in load_path.py RBAC config --- bugfix/init.py | 148 ++++++++++++++++++++++++++++++++ scripts/load_path.py | 1 + wwwroot/api/recover_usages.dspy | 5 ++ 3 files changed, 154 insertions(+) create mode 100644 wwwroot/api/recover_usages.dspy diff --git a/bugfix/init.py b/bugfix/init.py index a44431d..8cd67c3 100644 --- a/bugfix/init.py +++ b/bugfix/init.py @@ -190,11 +190,159 @@ async def tail_log_file(filename, reset=False): return {'status': 'error', 'error': str(e)} +async def recover_usages(llmusage_id=None, limit=100): + """从ioinfo文件中恢复usages字段 + + 读取llmusage记录中ioinfo指向的文件,从最后一条output中提取usage, + 回写到llmusage.usages字段。 + + Args: + llmusage_id: 指定单条记录ID,为None时批量处理所有usages为null的记录 + limit: 批量模式下最多处理条数,默认100 + + Returns: + dict: {status, processed, recovered, failed, details} + """ + from ahserver.filestorage import FileStorage + import json as _json + import os + + env = ServerEnv() + dbname = env.get_module_dbname('llmage') + + details = [] + recovered = 0 + failed = 0 + + try: + async with get_sor_context(env, dbname) as sor: + # 查询usages为null的记录 + if llmusage_id: + sql = """select a.id, a.llmid, a.ioinfo, a.status, a.use_date, + b.model + from llmusage a + left join llm b on a.llmid = b.id + where a.id = ${id}$""" + ns = {'id': llmusage_id} + else: + sql = """select a.id, a.llmid, a.ioinfo, a.status, a.use_date, + b.model + from llmusage a + left join llm b on a.llmid = b.id + where a.usages is null + and a.status = 'SUCCEEDED' + order by a.use_date desc""" + ns = {'page': 1, 'rows': limit} + + recs = await sor.sqlExe(sql, ns) + rows = recs.get('rows', recs) if isinstance(recs, dict) else recs + + if not rows: + return { + 'status': 'ok', + 'processed': 0, + 'recovered': 0, + 'failed': 0, + 'details': [], + 'message': '没有找到需要恢复的记录' + } + + fs = FileStorage() + + for r in rows: + item = { + 'id': r.get('id', r.id if hasattr(r, 'id') else ''), + 'model': r.get('model', r.model if hasattr(r, 'model') else ''), + 'status': 'pending' + } + rid = item['id'] + + # 获取ioinfo路径 + ioinfo = r.get('ioinfo', r.ioinfo if hasattr(r, 'ioinfo') else None) + if not ioinfo: + item['status'] = 'skipped' + item['reason'] = 'ioinfo为空' + details.append(item) + failed += 1 + continue + + try: + # 读取ioinfo文件 + real_path = fs.realPath(ioinfo) + if not os.path.isfile(real_path): + item['status'] = 'skipped' + item['reason'] = f'文件不存在: {ioinfo}' + details.append(item) + failed += 1 + continue + + with open(real_path, 'r', encoding='utf-8') as f: + io_data = _json.load(f) + + outputs = io_data.get('output', []) + if not outputs: + item['status'] = 'skipped' + item['reason'] = 'output为空' + details.append(item) + failed += 1 + continue + + # 从最后一条output中获取usage + last_output = outputs[-1] + usage = last_output.get('usage') if isinstance(last_output, dict) else None + + if not usage: + # 尝试从所有output中倒序找第一个有usage的 + for out in reversed(outputs): + if isinstance(out, dict) and out.get('usage'): + usage = out['usage'] + break + + if not usage: + item['status'] = 'skipped' + item['reason'] = 'output中未找到usage字段' + details.append(item) + failed += 1 + continue + + # 回写到llmusage.usages + usages_str = _json.dumps(usage, ensure_ascii=False) + await sor.U('llmusage', { + 'id': rid, + 'usages': usages_str + }) + + item['status'] = 'recovered' + item['usage'] = usage + details.append(item) + recovered += 1 + + except Exception as e: + item['status'] = 'error' + item['reason'] = str(e) + details.append(item) + failed += 1 + exception(f'recover_usages error for {rid}: {e}') + + return { + 'status': 'ok', + 'processed': recovered + failed, + 'recovered': recovered, + 'failed': failed, + 'details': details + } + + except Exception as e: + exception(f'recover_usages error: {e}') + return {'status': 'error', 'error': str(e)} + + def load_bugfix(): """注册函数到 ServerEnv""" env = ServerEnv() env.execute_select_sql = execute_select_sql env.read_log_file = read_log_file env.tail_log_file = tail_log_file + env.recover_usages = recover_usages debug(f'[bugfix] module loaded') return True diff --git a/scripts/load_path.py b/scripts/load_path.py index 0251f8b..d149be7 100644 --- a/scripts/load_path.py +++ b/scripts/load_path.py @@ -45,6 +45,7 @@ PATHS_DEVELOPER = [ f"/{MOD}/api/execute_sql.dspy", f"/{MOD}/api/read_log.dspy", f"/{MOD}/api/tail_log.dspy", + f"/{MOD}/api/recover_usages.dspy", ] diff --git a/wwwroot/api/recover_usages.dspy b/wwwroot/api/recover_usages.dspy new file mode 100644 index 0000000..557b2a3 --- /dev/null +++ b/wwwroot/api/recover_usages.dspy @@ -0,0 +1,5 @@ +llmusage_id = params_kw.get('id') or params_kw.get('llmusage_id') or None +limit = int(params_kw.get('limit') or 100) + +result = await recover_usages(llmusage_id=llmusage_id, limit=limit) +return result