diff --git a/import_codes.py b/import_codes.py new file mode 100644 index 0000000..e561d3f --- /dev/null +++ b/import_codes.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +""" +导入 init/data.json 中的 appcodes / appcodes_kv 编码字典到 sage 数据库。 + +用法: + python import_codes.py [more_paths ...] + python import_codes.py --host 127.0.0.1 ../product_management/init/data.json +""" +import sys +import json +import os +import re +import argparse + +# 修正 venv site-packages 路径(venv 可能是旧版 Python 创建的) +_script_dir = os.path.dirname(os.path.abspath(__file__)) +_venv_lib = os.path.join(_script_dir, 'py3', 'lib') +if os.path.isdir(_venv_lib): + for _d in os.listdir(_venv_lib): + _sp = os.path.join(_venv_lib, _d, 'site-packages') + if os.path.isdir(_sp) and _sp not in sys.path: + sys.path.insert(0, _sp) + +# appPublic editable install 可能指向 Docker 路径,修正为本地路径 +_appbase_dir = os.path.join(os.path.dirname(_script_dir), 'apppublic') +if os.path.isdir(_appbase_dir) and _appbase_dir not in sys.path: + sys.path.insert(0, _appbase_dir) + + +def load_config(host_override=None, port_override=None): + """读取 conf/config.json 并返回数据库连接参数(已解密密码)""" + conf_path = os.path.join(_script_dir, 'conf', 'config.json') + with open(conf_path, 'r', encoding='utf-8') as f: + raw = f.read() + + # 提取 password_key + m = re.search(r'"password_key"\s*:\s*"([^"]+)"', raw) + if not m: + raise ValueError("config.json 中未找到 password_key") + password_key = m.group(1) + + # 提取 databases 段并解析为 JSON + m = re.search(r'"databases"\s*:\s*(\{.*?\n\t\})', raw, re.DOTALL) + if not m: + raise ValueError("config.json 中未找到 databases 段") + db_section = json.loads(m.group(1)) + db_cfg = db_section['sage']['kwargs'] + + from appPublic.rc4 import unpassword + params = { + 'host': db_cfg['host'], + 'user': db_cfg['user'], + 'password': unpassword(db_cfg['password'], key=password_key), + 'db': db_cfg['db'], + 'charset': db_cfg.get('charset', 'utf8mb4'), + } + if 'port' in db_cfg: + params['port'] = int(db_cfg['port']) + if host_override: + params['host'] = host_override + if port_override: + params['port'] = int(port_override) + return params + + +def import_data_json(cursor, filepath): + """将单个 data.json 中的 appcodes 和 appcodes_kv 导入数据库""" + with open(filepath, 'r', encoding='utf-8') as f: + data = json.load(f) + + # 导入 appcodes + appcodes = data.get('appcodes', []) + for item in appcodes: + cursor.execute(""" + INSERT INTO appcodes (id, name, hierarchy_flg) + VALUES (%s, %s, %s) + ON DUPLICATE KEY UPDATE name=VALUES(name), hierarchy_flg=VALUES(hierarchy_flg) + """, (item['id'], item['name'], item.get('hierarchy_flg', '0'))) + print(f" appcodes: {len(appcodes)} 条") + + # 导入 appcodes_kv + appcodes_kv = data.get('appcodes_kv', []) + for item in appcodes_kv: + cursor.execute(""" + INSERT INTO appcodes_kv (id, parentid, k, v) + VALUES (%s, %s, %s, %s) + ON DUPLICATE KEY UPDATE v=VALUES(v) + """, (item['id'], item.get('parentid', ''), item['k'], item['v'])) + print(f" appcodes_kv: {len(appcodes_kv)} 条") + + return len(appcodes), len(appcodes_kv) + + +def main(): + parser = argparse.ArgumentParser(description='导入编码字典到 sage 数据库') + parser.add_argument('files', nargs='+', help='init/data.json 文件路径') + parser.add_argument('--host', help='覆盖数据库主机地址') + parser.add_argument('--port', help='覆盖数据库端口') + args = parser.parse_args() + + import pymysql + + db_params = load_config(args.host, args.port) + print(f"连接数据库: {db_params['user']}@{db_params['host']}/{db_params['db']}") + conn = pymysql.connect(**db_params) + try: + cursor = conn.cursor() + total_codes = 0 + total_kv = 0 + for filepath in args.files: + print(f"导入: {filepath}") + c, kv = import_data_json(cursor, filepath) + total_codes += c + total_kv += kv + conn.commit() + print(f"\n完成! 共导入 appcodes: {total_codes} 条, appcodes_kv: {total_kv} 条") + except Exception as e: + conn.rollback() + print(f"错误,已回滚: {e}", file=sys.stderr) + sys.exit(1) + finally: + conn.close() + + +if __name__ == '__main__': + main()