feat: add llmcatelog ID migration script (random IDs to meaningful abbreviations)

This commit is contained in:
yumoqing 2026-05-30 12:07:19 +08:00
parent d84cc1d859
commit 3ba1c50eb6

View File

@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""
llmcatelog ID 迁移脚本
llmcatelog.id llm_api_map.llmcatelogid 从旧ID迁移为有意义的缩写ID
执行顺序
1. 先更新 llm_api_map.llmcatelogid外键表
2. 再更新 llmcatelog.id主表
3. 验证迁移结果
用法
# 预览模式(不执行,只显示将要做的变更)
python migrate_llmcatelog_ids.py --dry-run
# 正式执行
python migrate_llmcatelog_ids.py
# 指定数据库名(默认 llmage
python migrate_llmcatelog_ids.py --dbname my_llmage
"""
import asyncio
import argparse
import sys
import os
# 旧ID -> 新ID 映射表
ID_MAP = {
'text2text': 't2t',
'text2image': 't2i',
'-i2ET0YkhfVQdHONfk9pX': 't2v',
'RdsO6pXgXcUTvUj819-7X': 'i2v',
'fHrfsOnAFCz53DAILMO7G': 'r2v',
'text2speech': 'tts',
'audio2text': 'asr',
'image2text': 'vision',
'9_P5y-qiQzQASacTVk2Lq': 'ai_search',
'czKvk-clQTRLS2KVddSWo': 'digital_human',
'HaRXiNCaAACurZsmEqpsU': 'music_gen',
'Rqj-QBj1v4560l-FPCrIU': 'text_cls',
's6-nhQtEvDKxG_qDPWwT7': '3d_gen',
'sRmpG8draTM-tsbO5nMJO': 'video_tool',
't7sUuj8BCnsD762PwMUKM': 'translate',
}
async def migrate(dry_run=False, dbname='llmage'):
from sqlor.dbpools import DBPools
from appPublic.log import debug
db = DBPools()
async with db.sqlorContext(dbname) as sor:
print(f"{'='*60}")
print(f"llmcatelog ID 迁移脚本")
print(f"数据库: {dbname}")
print(f"模式: {'预览(DRY-RUN)' if dry_run else '正式执行'}")
print(f"{'='*60}\n")
# ===== 阶段0: 检查当前数据 =====
print("[阶段0] 检查当前 llmcatelog 数据...")
current = await sor.sqlExe("SELECT id, name FROM llmcatelog ORDER BY name", {})
if not current:
print(" llmcatelog 表为空,无需迁移。")
return
print(f" 当前共 {len(current)} 条记录:\n")
print(f" {'旧ID':<30} {'name':<15} {'新ID':<15} {'状态'}")
print(f" {'-'*30} {'-'*15} {'-'*15} {'-'*10}")
valid_records = []
unmapped = []
for row in current:
old_id = row['id']
name = row['name']
new_id = ID_MAP.get(old_id)
if new_id:
# 检查是否已经迁移过old_id == new_id 的情况不会发生,
# 但如果 id 已经是新值则跳过)
if old_id == new_id:
status = '已迁移'
else:
status = '待迁移'
valid_records.append((old_id, new_id, name))
print(f" {old_id:<30} {name:<15} {new_id:<15} {status}")
else:
status = '无映射!'
unmapped.append((old_id, name))
print(f" {old_id:<30} {name:<15} {'---':<15} {status}")
if unmapped:
print(f"\n ⚠ 警告: {len(unmapped)} 条记录无映射关系,将跳过:")
for uid, uname in unmapped:
print(f" - {uid} ({uname})")
if not valid_records:
print("\n 没有需要迁移的记录。")
return
print(f"\n{len(valid_records)} 条记录需要迁移。\n")
# ===== 阶段1: 检查 llm_api_map 关联 =====
print("[阶段1] 检查 llm_api_map 关联...")
for old_id, new_id, name in valid_records:
maps = await sor.sqlExe(
"SELECT COUNT(*) as cnt FROM llm_api_map WHERE llmcatelogid = ${old_id}$",
{'old_id': old_id}
)
cnt = maps[0]['cnt'] if maps else 0
print(f" {name}({old_id}): {cnt} 条映射")
# ===== 阶段2: 检查新ID是否已被占用 =====
print(f"\n[阶段2] 检查新ID是否已被占用...")
conflict = False
for old_id, new_id, name in valid_records:
check = await sor.sqlExe(
"SELECT id, name FROM llmcatelog WHERE id = ${new_id}$",
{'new_id': new_id}
)
if check:
# 如果新ID已存在且就是当前记录已经迁移过跳过
if check[0]['id'] == old_id:
print(f" {new_id}: 已是当前记录,跳过")
else:
print(f" ✗ 冲突! 新ID '{new_id}' 已被 {check[0]['name']} 使用")
conflict = True
else:
print(f"{new_id}: 可用")
if conflict:
print("\n ✗ 存在ID冲突终止迁移")
return
if dry_run:
print(f"\n{'='*60}")
print("预览模式结束。以上是将会执行的变更。")
print("去掉 --dry-run 参数以正式执行。")
print(f"{'='*60}")
return
# ===== 阶段3: 执行迁移 =====
print(f"\n[阶段3] 开始执行迁移...")
# 3a: 先更新 llm_api_map外键表
print(f"\n --- 3a: 更新 llm_api_map.llmcatelogid ---")
for old_id, new_id, name in valid_records:
try:
await sor.sqlExe(
"UPDATE llm_api_map SET llmcatelogid = ${new_id}$ WHERE llmcatelogid = ${old_id}$",
{'new_id': new_id, 'old_id': old_id}
)
maps = await sor.sqlExe(
"SELECT COUNT(*) as cnt FROM llm_api_map WHERE llmcatelogid = ${new_id}$",
{'new_id': new_id}
)
cnt = maps[0]['cnt'] if maps else 0
print(f"{name}: {old_id} -> {new_id} (关联 {cnt} 条)")
except Exception as e:
print(f"{name}: 更新 llm_api_map 失败: {e}")
print(f" 回滚中...")
raise
# 3b: 再更新 llmcatelog主表
print(f"\n --- 3b: 更新 llmcatelog.id ---")
for old_id, new_id, name in valid_records:
try:
await sor.sqlExe(
"UPDATE llmcatelog SET id = ${new_id}$ WHERE id = ${old_id}$",
{'new_id': new_id, 'old_id': old_id}
)
print(f"{name}: {old_id} -> {new_id}")
except Exception as e:
print(f"{name}: 更新 llmcatelog 失败: {e}")
raise
# ===== 阶段4: 验证 =====
print(f"\n[阶段4] 验证迁移结果...")
# 验证 llmcatelog
catelogs = await sor.sqlExe("SELECT id, name FROM llmcatelog ORDER BY id", {})
print(f"\n llmcatelog ({len(catelogs)} 条):")
for row in catelogs:
print(f" {row['id']:<20} {row['name']}")
# 验证关联完整性
orphans = await sor.sqlExe("""
SELECT m.llmcatelogid, COUNT(*) as cnt
FROM llm_api_map m
LEFT JOIN llmcatelog c ON m.llmcatelogid = c.id
WHERE c.id IS NULL
GROUP BY m.llmcatelogid
""", {})
if orphans:
print(f"\n ✗ 发现孤立关联:")
for o in orphans:
print(f" llmcatelogid={o['llmcatelogid']}: {o['cnt']} 条无对应主记录")
else:
print(f"\n ✓ 所有 llm_api_map 关联完整,无孤立记录")
# 验证映射表
map_stats = await sor.sqlExe("""
SELECT m.llmcatelogid, c.name, COUNT(*) as cnt
FROM llm_api_map m
JOIN llmcatelog c ON m.llmcatelogid = c.id
GROUP BY m.llmcatelogid, c.name
ORDER BY m.llmcatelogid
""", {})
if map_stats:
print(f"\n llm_api_map 关联统计:")
for row in map_stats:
print(f" {row['llmcatelogid']:<20} {row['name']:<15} {row['cnt']} 条映射")
print(f"\n{'='*60}")
print("迁移完成!")
print(f"{'='*60}")
def main():
parser = argparse.ArgumentParser(description='llmcatelog ID 迁移脚本')
parser.add_argument('--dry-run', action='store_true', help='预览模式,不执行实际变更')
parser.add_argument('--dbname', default='llmage', help='数据库名 (默认: llmage)')
args = parser.parse_args()
asyncio.run(migrate(dry_run=args.dry_run, dbname=args.dbname))
if __name__ == '__main__':
main()