diff --git a/import_codes.py b/import_codes.py index 214758f..f38de8d 100644 --- a/import_codes.py +++ b/import_codes.py @@ -3,13 +3,14 @@ 导入 init/data.json 中的 appcodes / appcodes_kv 编码字典到 sage 数据库。 用法: - python import_codes.py [more_paths ...] - python import_codes.py --host 127.0.0.1 ../product_management/init/data.json + cd ~/repos/sage + py3/bin/python import_codes.py ../supplychain/init/data.json + py3/bin/python import_codes.py ../product_management/init/data.json ../supplychain/init/data.json """ import sys import json import os -import re +import asyncio import argparse # 修正 venv site-packages 路径(venv 可能是旧版 Python 创建的) @@ -21,49 +22,18 @@ if os.path.isdir(_venv_lib): if os.path.isdir(_sp) and _sp not in sys.path: sys.path.insert(0, _sp) -# appPublic editable install 可能指向 Docker 路径,修正为本地路径 -_appbase_dir = os.path.join(os.path.dirname(_script_dir), 'apppublic') -if os.path.isdir(_appbase_dir) and _appbase_dir not in sys.path: - sys.path.insert(0, _appbase_dir) +# 添加本地依赖路径(editable install 可能指向 Docker 路径) +_repos_dir = os.path.dirname(_script_dir) +for _dep in ['apppublic', 'sqlor']: + _dep_dir = os.path.join(_repos_dir, _dep) + if os.path.isdir(_dep_dir) and _dep_dir not in sys.path: + sys.path.insert(0, _dep_dir) + +from appPublic.jsonConfig import getConfig +from sqlor.dbpools import DBPools -def load_config(host_override=None, port_override=None): - """读取 conf/config.json 并返回数据库连接参数(已解密密码)""" - conf_path = os.path.join(_script_dir, 'conf', 'config.json') - with open(conf_path, 'r', encoding='utf-8') as f: - raw = f.read() - - # 提取 password_key - m = re.search(r'"password_key"\s*:\s*"([^"]+)"', raw) - if not m: - raise ValueError("config.json 中未找到 password_key") - password_key = m.group(1) - - # 提取 databases 段并解析为 JSON - m = re.search(r'"databases"\s*:\s*(\{.*?\n\t\})', raw, re.DOTALL) - if not m: - raise ValueError("config.json 中未找到 databases 段") - db_section = json.loads(m.group(1)) - db_cfg = db_section['sage']['kwargs'] - - from appPublic.rc4 import unpassword - params = { - 'host': db_cfg['host'], - 'user': db_cfg['user'], - 'password': unpassword(db_cfg['password'], key=password_key), - 'db': db_cfg['db'], - 'charset': db_cfg.get('charset', 'utf8mb4'), - } - if 'port' in db_cfg: - params['port'] = int(db_cfg['port']) - if host_override: - params['host'] = host_override - if port_override: - params['port'] = int(port_override) - return params - - -def import_data_json(cursor, filepath): +async def import_data_json(sor, filepath): """将单个 data.json 中的 appcodes 和 appcodes_kv 导入数据库""" with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) @@ -71,56 +41,58 @@ def import_data_json(cursor, filepath): # 导入 appcodes appcodes = data.get('appcodes', []) for item in appcodes: - cursor.execute(""" + await sor.sqlExe(""" INSERT INTO appcodes (id, name, hierarchy_flg) - VALUES (%s, %s, %s) + VALUES (${id}$, ${name}$, ${hierarchy_flg}$) ON DUPLICATE KEY UPDATE name=VALUES(name), hierarchy_flg=VALUES(hierarchy_flg) - """, (item['id'], item['name'], item.get('hierarchy_flg', '0'))) + """, { + 'id': item['id'], + 'name': item['name'], + 'hierarchy_flg': item.get('hierarchy_flg', '0') + }) print(f" appcodes: {len(appcodes)} 条") # 导入 appcodes_kv appcodes_kv = data.get('appcodes_kv', []) for item in appcodes_kv: - cursor.execute(""" + await sor.sqlExe(""" INSERT INTO appcodes_kv (id, parentid, k, v) - VALUES (%s, %s, %s, %s) + VALUES (${id}$, ${parentid}$, ${k}$, ${v}$) ON DUPLICATE KEY UPDATE id=VALUES(id), v=VALUES(v) - """, (item['id'], item['parentid'], item['k'], item['v'])) + """, { + 'id': item['id'], + 'parentid': item['parentid'], + 'k': item['k'], + 'v': item['v'] + }) print(f" appcodes_kv: {len(appcodes_kv)} 条") return len(appcodes), len(appcodes_kv) -def main(): +async def main(): parser = argparse.ArgumentParser(description='导入编码字典到 sage 数据库') parser.add_argument('files', nargs='+', help='init/data.json 文件路径') - parser.add_argument('--host', help='覆盖数据库主机地址') - parser.add_argument('--port', help='覆盖数据库端口') args = parser.parse_args() - import pymysql + # 使用 getConfig() 获取配置,DBPools 初始化数据库池 + config = getConfig('.') + db = DBPools(config.databases) - db_params = load_config(args.host, args.port) - print(f"连接数据库: {db_params['user']}@{db_params['host']}/{db_params['db']}") - conn = pymysql.connect(**db_params) - try: - cursor = conn.cursor() - total_codes = 0 - total_kv = 0 + print(f"连接数据库: sage") + + total_codes = 0 + total_kv = 0 + + async with db.sqlorContext('sage') as sor: for filepath in args.files: print(f"导入: {filepath}") - c, kv = import_data_json(cursor, filepath) + c, kv = await import_data_json(sor, filepath) total_codes += c total_kv += kv - conn.commit() - print(f"\n完成! 共导入 appcodes: {total_codes} 条, appcodes_kv: {total_kv} 条") - except Exception as e: - conn.rollback() - print(f"错误,已回滚: {e}", file=sys.stderr) - sys.exit(1) - finally: - conn.close() + + print(f"\n完成! 共导入 appcodes: {total_codes} 条, appcodes_kv: {total_kv} 条") if __name__ == '__main__': - main() + asyncio.run(main())