127 lines
4.3 KiB
Python
127 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
导入 init/data.json 中的 appcodes / appcodes_kv 编码字典到 sage 数据库。
|
||
|
||
用法:
|
||
python import_codes.py <path/to/init/data.json> [more_paths ...]
|
||
python import_codes.py --host 127.0.0.1 ../product_management/init/data.json
|
||
"""
|
||
import sys
|
||
import json
|
||
import os
|
||
import re
|
||
import argparse
|
||
|
||
# 修正 venv site-packages 路径(venv 可能是旧版 Python 创建的)
|
||
_script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
_venv_lib = os.path.join(_script_dir, 'py3', 'lib')
|
||
if os.path.isdir(_venv_lib):
|
||
for _d in os.listdir(_venv_lib):
|
||
_sp = os.path.join(_venv_lib, _d, 'site-packages')
|
||
if os.path.isdir(_sp) and _sp not in sys.path:
|
||
sys.path.insert(0, _sp)
|
||
|
||
# appPublic editable install 可能指向 Docker 路径,修正为本地路径
|
||
_appbase_dir = os.path.join(os.path.dirname(_script_dir), 'apppublic')
|
||
if os.path.isdir(_appbase_dir) and _appbase_dir not in sys.path:
|
||
sys.path.insert(0, _appbase_dir)
|
||
|
||
|
||
def load_config(host_override=None, port_override=None):
|
||
"""读取 conf/config.json 并返回数据库连接参数(已解密密码)"""
|
||
conf_path = os.path.join(_script_dir, 'conf', 'config.json')
|
||
with open(conf_path, 'r', encoding='utf-8') as f:
|
||
raw = f.read()
|
||
|
||
# 提取 password_key
|
||
m = re.search(r'"password_key"\s*:\s*"([^"]+)"', raw)
|
||
if not m:
|
||
raise ValueError("config.json 中未找到 password_key")
|
||
password_key = m.group(1)
|
||
|
||
# 提取 databases 段并解析为 JSON
|
||
m = re.search(r'"databases"\s*:\s*(\{.*?\n\t\})', raw, re.DOTALL)
|
||
if not m:
|
||
raise ValueError("config.json 中未找到 databases 段")
|
||
db_section = json.loads(m.group(1))
|
||
db_cfg = db_section['sage']['kwargs']
|
||
|
||
from appPublic.rc4 import unpassword
|
||
params = {
|
||
'host': db_cfg['host'],
|
||
'user': db_cfg['user'],
|
||
'password': unpassword(db_cfg['password'], key=password_key),
|
||
'db': db_cfg['db'],
|
||
'charset': db_cfg.get('charset', 'utf8mb4'),
|
||
}
|
||
if 'port' in db_cfg:
|
||
params['port'] = int(db_cfg['port'])
|
||
if host_override:
|
||
params['host'] = host_override
|
||
if port_override:
|
||
params['port'] = int(port_override)
|
||
return params
|
||
|
||
|
||
def import_data_json(cursor, filepath):
|
||
"""将单个 data.json 中的 appcodes 和 appcodes_kv 导入数据库"""
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
# 导入 appcodes
|
||
appcodes = data.get('appcodes', [])
|
||
for item in appcodes:
|
||
cursor.execute("""
|
||
INSERT INTO appcodes (id, name, hierarchy_flg)
|
||
VALUES (%s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE name=VALUES(name), hierarchy_flg=VALUES(hierarchy_flg)
|
||
""", (item['id'], item['name'], item.get('hierarchy_flg', '0')))
|
||
print(f" appcodes: {len(appcodes)} 条")
|
||
|
||
# 导入 appcodes_kv
|
||
appcodes_kv = data.get('appcodes_kv', [])
|
||
for item in appcodes_kv:
|
||
cursor.execute("""
|
||
INSERT INTO appcodes_kv (id, parentid, k, v)
|
||
VALUES (%s, %s, %s, %s)
|
||
ON DUPLICATE KEY UPDATE v=VALUES(v)
|
||
""", (item['id'], item.get('parentid', ''), item['k'], item['v']))
|
||
print(f" appcodes_kv: {len(appcodes_kv)} 条")
|
||
|
||
return len(appcodes), len(appcodes_kv)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='导入编码字典到 sage 数据库')
|
||
parser.add_argument('files', nargs='+', help='init/data.json 文件路径')
|
||
parser.add_argument('--host', help='覆盖数据库主机地址')
|
||
parser.add_argument('--port', help='覆盖数据库端口')
|
||
args = parser.parse_args()
|
||
|
||
import pymysql
|
||
|
||
db_params = load_config(args.host, args.port)
|
||
print(f"连接数据库: {db_params['user']}@{db_params['host']}/{db_params['db']}")
|
||
conn = pymysql.connect(**db_params)
|
||
try:
|
||
cursor = conn.cursor()
|
||
total_codes = 0
|
||
total_kv = 0
|
||
for filepath in args.files:
|
||
print(f"导入: {filepath}")
|
||
c, kv = import_data_json(cursor, filepath)
|
||
total_codes += c
|
||
total_kv += kv
|
||
conn.commit()
|
||
print(f"\n完成! 共导入 appcodes: {total_codes} 条, appcodes_kv: {total_kv} 条")
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f"错误,已回滚: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|