llmage/scripts/migrate_llm_api_map_db.py
yumoqing 17f6855b5a feat: add deployment scripts for llm_api_map migration
- scripts/migrate_llm_api_map_db.py: Direct DB migration (create table + migrate data)
  Supports --dry-run and --drop-old flags
- scripts/deploy_llmage.sh: One-click deployment script
  Orchestrates: git pull -> build -> migrate -> perms -> restart
2026-05-21 10:59:15 +08:00

265 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
llm_api_map 数据库迁移脚本
直接操作数据库,完成以下任务:
1. 创建 llm_api_map 表(如不存在)
2. 从 llm 表迁移 apiname/query_apiname/query_period/ppid 到 llm_api_map
3. 关联 llm_catalog_rel 获取 llmcatelogid
4. 可选:删除 llm 表中的旧字段(需用户确认)
运行位置Sage 虚拟环境
用法python scripts/migrate_llm_api_map_db.py [--dry-run] [--drop-old]
"""
import sys
import os
import asyncio
import argparse
# 确保 Sage 虚拟环境的包可用
sage_root = os.path.expanduser('~/repos/sage')
sys.path.insert(0, sage_root)
sys.path.insert(0, os.path.join(sage_root, 'py3/lib/python3.10/site-packages'))
from appPublic.jsonConfig import getConfig
from appPublic.uniqueID import getID
from sqlor.dbpools import DBPools
def print_sql(sql, label=""):
if label:
print(f" [{label}] {sql}")
else:
print(f" {sql}")
async def migrate(dry_run=False, drop_old=False):
"""Execute migration."""
config = getConfig(sage_root)
db = DBPools(config.databases)
dbname = list(config.databases.keys())[0]
print(f"Using database: {dbname}")
print(f"Dry run: {dry_run}")
print(f"Drop old columns: {drop_old}")
print()
async with db.sqlorContext(dbname) as sor:
# =============================================
# Step 1: Check if llm_api_map already has data
# =============================================
existing = await sor.sqlExe("SELECT COUNT(*) as cnt FROM llm_api_map", {})
existing_count = existing[0]['cnt'] if existing else 0
if existing_count > 0:
print(f"[WARNING] llm_api_map already has {existing_count} records.")
resp = input("Continue anyway? (y/N): ")
if resp.lower() != 'y':
print("Aborted.")
return False
# =============================================
# Step 2: Create llm_api_map table
# =============================================
print("[Step 1] Creating llm_api_map table...")
create_sql = """
CREATE TABLE IF NOT EXISTS llm_api_map (
id VARCHAR(32) NOT NULL PRIMARY KEY,
llmid VARCHAR(32) NOT NULL,
llmcatelogid VARCHAR(32) NOT NULL,
apiname VARCHAR(100) NOT NULL,
query_apiname VARCHAR(100),
query_period INT,
ppid VARCHAR(32)
)
"""
if dry_run:
print_sql(create_sql.strip(), "SQL")
else:
try:
await sor.sqlExe(create_sql, {})
print(" Table created (or already exists)")
except Exception as e:
if 'already exists' in str(e).lower() or 'Duplicate' in str(e):
print(" Table already exists, continuing")
else:
print(f" ERROR: {e}")
return False
# Create indexes
indexes = [
("CREATE INDEX idx_api_map_llm ON llm_api_map (llmid)", "index llmid"),
("CREATE INDEX idx_api_map_catelog ON llm_api_map (llmcatelogid)", "index catelog"),
("CREATE INDEX idx_api_map_apiname ON llm_api_map (apiname)", "index apiname"),
("CREATE UNIQUE INDEX uk_llmid_apiname ON llm_api_map (llmid, apiname)", "unique (llmid, apiname)"),
]
for sql, label in indexes:
if dry_run:
print_sql(sql, label)
else:
try:
await sor.sqlExe(sql, {})
print(f" Index created: {label}")
except Exception as e:
if 'already exists' in str(e).lower() or 'Duplicate' in str(e):
print(f" Index already exists: {label}")
else:
print(f" WARNING creating {label}: {e}")
# =============================================
# Step 3: Migrate data from llm -> llm_api_map
# =============================================
print("\n[Step 2] Migrating data...")
# Get all llm records that have apiname
llms = await sor.sqlExe("""
SELECT id, name, apiname, query_apiname, query_period, ppid
FROM llm
WHERE apiname IS NOT NULL AND apiname != ''
""", {})
print(f" Found {len(llms or [])} llm records with apiname")
if not llms:
print(" No data to migrate. Done.")
return True
# Build catalog_rel lookup
rels = await sor.sqlExe("SELECT llmid, llmcatelogid FROM llm_catalog_rel", {})
catelog_map = {}
for r in (rels or []):
catelog_map.setdefault(r['llmid'], []).append(r['llmcatelogid'])
migrated = 0
skipped = 0
errors = []
for llm in llms:
llmid = llm['id']
catelog_ids = catelog_map.get(llmid)
if not catelog_ids:
print(f" [SKIP] llm '{llm.get('name', llmid)}' has no catalog_rel entry")
skipped += 1
continue
for catelogid in catelog_ids:
if dry_run:
print(f" [DRY] Would insert: llm={llm.get('name', llmid)}, catelog={catelogid}, api={llm['apiname']}")
migrated += 1
continue
# Check if already exists
exists = await sor.sqlExe(
"SELECT id FROM llm_api_map WHERE llmid=${llmid}$ AND apiname=${apiname}$",
{'llmid': llmid, 'apiname': llm['apiname']}
)
if exists:
print(f" [SKIP] Already exists: {llm.get('name', llmid)} / {llm['apiname']}")
skipped += 1
continue
# Insert
data = {
'id': getID(),
'llmid': llmid,
'llmcatelogid': catelogid,
'apiname': llm['apiname'],
}
if llm.get('query_apiname'):
data['query_apiname'] = llm['query_apiname']
if llm.get('query_period') is not None:
data['query_period'] = int(llm['query_period']) if llm['query_period'] != '' else None
if llm.get('ppid'):
data['ppid'] = llm['ppid']
try:
await sor.C('llm_api_map', data)
migrated += 1
except Exception as e:
errors.append(f"{llm.get('name', llmid)}: {e}")
print(f"\n Migrated: {migrated}, Skipped: {skipped}")
if errors:
print(f" Errors: {len(errors)}")
for e in errors[:5]:
print(f" - {e}")
if dry_run:
print("\n (Dry run, no changes made)")
return True
# =============================================
# Step 4: Verify migration
# =============================================
print("\n[Step 3] Verifying migration...")
llm_api_count = await sor.sqlExe("SELECT COUNT(*) as cnt FROM llm_api_map", {})
api_count = llm_api_count[0]['cnt'] if llm_api_count else 0
print(f" llm_api_map total records: {api_count}")
# Check for llm records without corresponding llm_api_map
orphan_check = await sor.sqlExe("""
SELECT l.id, l.name
FROM llm l
WHERE l.apiname IS NOT NULL AND l.apiname != ''
AND NOT EXISTS (
SELECT 1 FROM llm_api_map m WHERE m.llmid = l.id
)
""", {})
if orphan_check:
print(f" [WARNING] {len(orphan_check)} llm records have no llm_api_map entry:")
for o in orphan_check[:5]:
print(f" - {o.get('name', o['id'])}")
if len(orphan_check) > 5:
print(f" ... and {len(orphan_check) - 5} more")
else:
print(" All llm records with apiname have corresponding llm_api_map entries")
# =============================================
# Step 5: Optional - drop old columns from llm
# =============================================
if drop_old:
print("\n[Step 4] Dropping old columns from llm table...")
old_columns = ['apiname', 'query_apiname', 'query_period', 'ppid']
for col in old_columns:
try:
await sor.sqlExe(f"ALTER TABLE llm DROP COLUMN {col}", {})
print(f" Dropped column: {col}")
except Exception as e:
if 'column' in str(e).lower() and ('not exist' in str(e).lower() or 'doesn\'t exist' in str(e).lower()):
print(f" Column already dropped: {col}")
else:
print(f" WARNING dropping {col}: {e}")
print(" Old columns removed from llm table")
print(" NOTE: Update code to use llm_api_map instead of llm columns")
print("\n[Done] Migration complete")
return True
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Migrate llm data to llm_api_map table')
parser.add_argument('--dry-run', action='store_true', help='Preview changes without executing')
parser.add_argument('--drop-old', action='store_true', help='Drop old columns from llm table after migration')
args = parser.parse_args()
if not args.dry_run:
print("=" * 60)
print(" llm_api_map Database Migration")
print("=" * 60)
resp = input("\nThis will modify the database. Continue? (y/N): ")
if resp.lower() != 'y':
print("Aborted.")
sys.exit(0)
print()
success = asyncio.get_event_loop().run_until_complete(
migrate(dry_run=args.dry_run, drop_old=args.drop_old)
)
sys.exit(0 if success else 1)