DBPools() was called without config.databases, causing NoneType error. Load config from sage_root and pass config.databases like the other migration script does.
245 lines
8.9 KiB
Python
245 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
llmcatelog ID 迁移脚本
|
||
将 llmcatelog.id 和 llm_api_map.llmcatelogid 从旧ID迁移为有意义的缩写ID。
|
||
|
||
执行顺序:
|
||
1. 先更新 llm_api_map.llmcatelogid(外键表)
|
||
2. 再更新 llmcatelog.id(主表)
|
||
3. 验证迁移结果
|
||
|
||
用法:
|
||
# 预览模式(不执行,只显示将要做的变更)
|
||
python migrate_llmcatelog_ids.py --dry-run
|
||
|
||
# 正式执行
|
||
python migrate_llmcatelog_ids.py
|
||
|
||
# 指定数据库名(默认 llmage)
|
||
python migrate_llmcatelog_ids.py --dbname my_llmage
|
||
"""
|
||
|
||
import asyncio
|
||
import argparse
|
||
import sys
|
||
import os
|
||
|
||
# 确保 Sage 虚拟环境的包可用
|
||
sage_root = os.path.expanduser('~/repos/sage')
|
||
sys.path.insert(0, sage_root)
|
||
sys.path.insert(0, os.path.join(sage_root, 'py3/lib/python3.10/site-packages'))
|
||
|
||
from appPublic.jsonConfig import getConfig
|
||
|
||
# 旧ID -> 新ID 映射表
|
||
ID_MAP = {
|
||
'text2text': 't2t',
|
||
'text2image': 't2i',
|
||
'-i2ET0YkhfVQdHONfk9pX': 't2v',
|
||
'RdsO6pXgXcUTvUj819-7X': 'i2v',
|
||
'fHrfsOnAFCz53DAILMO7G': 'r2v',
|
||
'text2speech': 'tts',
|
||
'audio2text': 'asr',
|
||
'image2text': 'vision',
|
||
'9_P5y-qiQzQASacTVk2Lq': 'ai_search',
|
||
'czKvk-clQTRLS2KVddSWo': 'digital_human',
|
||
'HaRXiNCaAACurZsmEqpsU': 'music_gen',
|
||
'Rqj-QBj1v4560l-FPCrIU': 'text_cls',
|
||
's6-nhQtEvDKxG_qDPWwT7': '3d_gen',
|
||
'sRmpG8draTM-tsbO5nMJO': 'video_tool',
|
||
't7sUuj8BCnsD762PwMUKM': 'translate',
|
||
}
|
||
|
||
|
||
async def migrate(dry_run=False, dbname='llmage'):
|
||
from sqlor.dbpools import DBPools
|
||
from appPublic.log import debug
|
||
|
||
config = getConfig(sage_root)
|
||
db = DBPools(config.databases)
|
||
|
||
# 如果传入的 dbname 不在配置中,尝试使用第一个数据库
|
||
if dbname not in config.databases:
|
||
available = list(config.databases.keys())
|
||
print(f"Warning: '{dbname}' not in config.databases, available: {available}")
|
||
if available:
|
||
dbname = available[0]
|
||
print(f"Using '{dbname}' instead")
|
||
|
||
async with db.sqlorContext(dbname) as sor:
|
||
print(f"{'='*60}")
|
||
print(f"llmcatelog ID 迁移脚本")
|
||
print(f"数据库: {dbname}")
|
||
print(f"模式: {'预览(DRY-RUN)' if dry_run else '正式执行'}")
|
||
print(f"{'='*60}\n")
|
||
|
||
# ===== 阶段0: 检查当前数据 =====
|
||
print("[阶段0] 检查当前 llmcatelog 数据...")
|
||
current = await sor.sqlExe("SELECT id, name FROM llmcatelog ORDER BY name", {})
|
||
if not current:
|
||
print(" llmcatelog 表为空,无需迁移。")
|
||
return
|
||
|
||
print(f" 当前共 {len(current)} 条记录:\n")
|
||
print(f" {'旧ID':<30} {'name':<15} {'新ID':<15} {'状态'}")
|
||
print(f" {'-'*30} {'-'*15} {'-'*15} {'-'*10}")
|
||
|
||
valid_records = []
|
||
unmapped = []
|
||
for row in current:
|
||
old_id = row['id']
|
||
name = row['name']
|
||
new_id = ID_MAP.get(old_id)
|
||
if new_id:
|
||
# 检查是否已经迁移过(old_id == new_id 的情况不会发生,
|
||
# 但如果 id 已经是新值则跳过)
|
||
if old_id == new_id:
|
||
status = '已迁移'
|
||
else:
|
||
status = '待迁移'
|
||
valid_records.append((old_id, new_id, name))
|
||
print(f" {old_id:<30} {name:<15} {new_id:<15} {status}")
|
||
else:
|
||
status = '无映射!'
|
||
unmapped.append((old_id, name))
|
||
print(f" {old_id:<30} {name:<15} {'---':<15} {status}")
|
||
|
||
if unmapped:
|
||
print(f"\n ⚠ 警告: {len(unmapped)} 条记录无映射关系,将跳过:")
|
||
for uid, uname in unmapped:
|
||
print(f" - {uid} ({uname})")
|
||
|
||
if not valid_records:
|
||
print("\n 没有需要迁移的记录。")
|
||
return
|
||
|
||
print(f"\n 共 {len(valid_records)} 条记录需要迁移。\n")
|
||
|
||
# ===== 阶段1: 检查 llm_api_map 关联 =====
|
||
print("[阶段1] 检查 llm_api_map 关联...")
|
||
for old_id, new_id, name in valid_records:
|
||
maps = await sor.sqlExe(
|
||
"SELECT COUNT(*) as cnt FROM llm_api_map WHERE llmcatelogid = ${old_id}$",
|
||
{'old_id': old_id}
|
||
)
|
||
cnt = maps[0]['cnt'] if maps else 0
|
||
print(f" {name}({old_id}): {cnt} 条映射")
|
||
|
||
# ===== 阶段2: 检查新ID是否已被占用 =====
|
||
print(f"\n[阶段2] 检查新ID是否已被占用...")
|
||
conflict = False
|
||
for old_id, new_id, name in valid_records:
|
||
check = await sor.sqlExe(
|
||
"SELECT id, name FROM llmcatelog WHERE id = ${new_id}$",
|
||
{'new_id': new_id}
|
||
)
|
||
if check:
|
||
# 如果新ID已存在且就是当前记录(已经迁移过),跳过
|
||
if check[0]['id'] == old_id:
|
||
print(f" {new_id}: 已是当前记录,跳过")
|
||
else:
|
||
print(f" ✗ 冲突! 新ID '{new_id}' 已被 {check[0]['name']} 使用")
|
||
conflict = True
|
||
else:
|
||
print(f" ✓ {new_id}: 可用")
|
||
|
||
if conflict:
|
||
print("\n ✗ 存在ID冲突,终止迁移!")
|
||
return
|
||
|
||
if dry_run:
|
||
print(f"\n{'='*60}")
|
||
print("预览模式结束。以上是将会执行的变更。")
|
||
print("去掉 --dry-run 参数以正式执行。")
|
||
print(f"{'='*60}")
|
||
return
|
||
|
||
# ===== 阶段3: 执行迁移 =====
|
||
print(f"\n[阶段3] 开始执行迁移...")
|
||
|
||
# 3a: 先更新 llm_api_map(外键表)
|
||
print(f"\n --- 3a: 更新 llm_api_map.llmcatelogid ---")
|
||
for old_id, new_id, name in valid_records:
|
||
try:
|
||
await sor.sqlExe(
|
||
"UPDATE llm_api_map SET llmcatelogid = ${new_id}$ WHERE llmcatelogid = ${old_id}$",
|
||
{'new_id': new_id, 'old_id': old_id}
|
||
)
|
||
maps = await sor.sqlExe(
|
||
"SELECT COUNT(*) as cnt FROM llm_api_map WHERE llmcatelogid = ${new_id}$",
|
||
{'new_id': new_id}
|
||
)
|
||
cnt = maps[0]['cnt'] if maps else 0
|
||
print(f" ✓ {name}: {old_id} -> {new_id} (关联 {cnt} 条)")
|
||
except Exception as e:
|
||
print(f" ✗ {name}: 更新 llm_api_map 失败: {e}")
|
||
print(f" 回滚中...")
|
||
raise
|
||
|
||
# 3b: 再更新 llmcatelog(主表)
|
||
print(f"\n --- 3b: 更新 llmcatelog.id ---")
|
||
for old_id, new_id, name in valid_records:
|
||
try:
|
||
await sor.sqlExe(
|
||
"UPDATE llmcatelog SET id = ${new_id}$ WHERE id = ${old_id}$",
|
||
{'new_id': new_id, 'old_id': old_id}
|
||
)
|
||
print(f" ✓ {name}: {old_id} -> {new_id}")
|
||
except Exception as e:
|
||
print(f" ✗ {name}: 更新 llmcatelog 失败: {e}")
|
||
raise
|
||
|
||
# ===== 阶段4: 验证 =====
|
||
print(f"\n[阶段4] 验证迁移结果...")
|
||
|
||
# 验证 llmcatelog
|
||
catelogs = await sor.sqlExe("SELECT id, name FROM llmcatelog ORDER BY id", {})
|
||
print(f"\n llmcatelog ({len(catelogs)} 条):")
|
||
for row in catelogs:
|
||
print(f" {row['id']:<20} {row['name']}")
|
||
|
||
# 验证关联完整性
|
||
orphans = await sor.sqlExe("""
|
||
SELECT m.llmcatelogid, COUNT(*) as cnt
|
||
FROM llm_api_map m
|
||
LEFT JOIN llmcatelog c ON m.llmcatelogid = c.id
|
||
WHERE c.id IS NULL
|
||
GROUP BY m.llmcatelogid
|
||
""", {})
|
||
if orphans:
|
||
print(f"\n ✗ 发现孤立关联:")
|
||
for o in orphans:
|
||
print(f" llmcatelogid={o['llmcatelogid']}: {o['cnt']} 条无对应主记录")
|
||
else:
|
||
print(f"\n ✓ 所有 llm_api_map 关联完整,无孤立记录")
|
||
|
||
# 验证映射表
|
||
map_stats = await sor.sqlExe("""
|
||
SELECT m.llmcatelogid, c.name, COUNT(*) as cnt
|
||
FROM llm_api_map m
|
||
JOIN llmcatelog c ON m.llmcatelogid = c.id
|
||
GROUP BY m.llmcatelogid, c.name
|
||
ORDER BY m.llmcatelogid
|
||
""", {})
|
||
if map_stats:
|
||
print(f"\n llm_api_map 关联统计:")
|
||
for row in map_stats:
|
||
print(f" {row['llmcatelogid']:<20} {row['name']:<15} {row['cnt']} 条映射")
|
||
|
||
print(f"\n{'='*60}")
|
||
print("迁移完成!")
|
||
print(f"{'='*60}")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='llmcatelog ID 迁移脚本')
|
||
parser.add_argument('--dry-run', action='store_true', help='预览模式,不执行实际变更')
|
||
parser.add_argument('--dbname', default='llmage', help='数据库名 (默认: llmage)')
|
||
args = parser.parse_args()
|
||
|
||
asyncio.run(migrate(dry_run=args.dry_run, dbname=args.dbname))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|