refactor: import_codes.py改用getConfig()+DBPools+sqlor模式

This commit is contained in:
Hermes Agent 2026-06-17 16:04:03 +08:00
parent 67e61da2fd
commit 54c9e229a0

View File

@ -3,13 +3,14 @@
导入 init/data.json 中的 appcodes / appcodes_kv 编码字典到 sage 数据库
用法:
python import_codes.py <path/to/init/data.json> [more_paths ...]
python import_codes.py --host 127.0.0.1 ../product_management/init/data.json
cd ~/repos/sage
py3/bin/python import_codes.py ../supplychain/init/data.json
py3/bin/python import_codes.py ../product_management/init/data.json ../supplychain/init/data.json
"""
import sys
import json
import os
import re
import asyncio
import argparse
# 修正 venv site-packages 路径venv 可能是旧版 Python 创建的)
@ -21,49 +22,18 @@ if os.path.isdir(_venv_lib):
if os.path.isdir(_sp) and _sp not in sys.path:
sys.path.insert(0, _sp)
# appPublic editable install 可能指向 Docker 路径,修正为本地路径
_appbase_dir = os.path.join(os.path.dirname(_script_dir), 'apppublic')
if os.path.isdir(_appbase_dir) and _appbase_dir not in sys.path:
sys.path.insert(0, _appbase_dir)
# 添加本地依赖路径editable install 可能指向 Docker 路径)
_repos_dir = os.path.dirname(_script_dir)
for _dep in ['apppublic', 'sqlor']:
_dep_dir = os.path.join(_repos_dir, _dep)
if os.path.isdir(_dep_dir) and _dep_dir not in sys.path:
sys.path.insert(0, _dep_dir)
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools
def load_config(host_override=None, port_override=None):
"""读取 conf/config.json 并返回数据库连接参数(已解密密码)"""
conf_path = os.path.join(_script_dir, 'conf', 'config.json')
with open(conf_path, 'r', encoding='utf-8') as f:
raw = f.read()
# 提取 password_key
m = re.search(r'"password_key"\s*:\s*"([^"]+)"', raw)
if not m:
raise ValueError("config.json 中未找到 password_key")
password_key = m.group(1)
# 提取 databases 段并解析为 JSON
m = re.search(r'"databases"\s*:\s*(\{.*?\n\t\})', raw, re.DOTALL)
if not m:
raise ValueError("config.json 中未找到 databases 段")
db_section = json.loads(m.group(1))
db_cfg = db_section['sage']['kwargs']
from appPublic.rc4 import unpassword
params = {
'host': db_cfg['host'],
'user': db_cfg['user'],
'password': unpassword(db_cfg['password'], key=password_key),
'db': db_cfg['db'],
'charset': db_cfg.get('charset', 'utf8mb4'),
}
if 'port' in db_cfg:
params['port'] = int(db_cfg['port'])
if host_override:
params['host'] = host_override
if port_override:
params['port'] = int(port_override)
return params
def import_data_json(cursor, filepath):
async def import_data_json(sor, filepath):
"""将单个 data.json 中的 appcodes 和 appcodes_kv 导入数据库"""
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
@ -71,56 +41,58 @@ def import_data_json(cursor, filepath):
# 导入 appcodes
appcodes = data.get('appcodes', [])
for item in appcodes:
cursor.execute("""
await sor.sqlExe("""
INSERT INTO appcodes (id, name, hierarchy_flg)
VALUES (%s, %s, %s)
VALUES (${id}$, ${name}$, ${hierarchy_flg}$)
ON DUPLICATE KEY UPDATE name=VALUES(name), hierarchy_flg=VALUES(hierarchy_flg)
""", (item['id'], item['name'], item.get('hierarchy_flg', '0')))
""", {
'id': item['id'],
'name': item['name'],
'hierarchy_flg': item.get('hierarchy_flg', '0')
})
print(f" appcodes: {len(appcodes)}")
# 导入 appcodes_kv
appcodes_kv = data.get('appcodes_kv', [])
for item in appcodes_kv:
cursor.execute("""
await sor.sqlExe("""
INSERT INTO appcodes_kv (id, parentid, k, v)
VALUES (%s, %s, %s, %s)
VALUES (${id}$, ${parentid}$, ${k}$, ${v}$)
ON DUPLICATE KEY UPDATE id=VALUES(id), v=VALUES(v)
""", (item['id'], item['parentid'], item['k'], item['v']))
""", {
'id': item['id'],
'parentid': item['parentid'],
'k': item['k'],
'v': item['v']
})
print(f" appcodes_kv: {len(appcodes_kv)}")
return len(appcodes), len(appcodes_kv)
def main():
async def main():
parser = argparse.ArgumentParser(description='导入编码字典到 sage 数据库')
parser.add_argument('files', nargs='+', help='init/data.json 文件路径')
parser.add_argument('--host', help='覆盖数据库主机地址')
parser.add_argument('--port', help='覆盖数据库端口')
args = parser.parse_args()
import pymysql
# 使用 getConfig() 获取配置DBPools 初始化数据库池
config = getConfig('.')
db = DBPools(config.databases)
print(f"连接数据库: sage")
db_params = load_config(args.host, args.port)
print(f"连接数据库: {db_params['user']}@{db_params['host']}/{db_params['db']}")
conn = pymysql.connect(**db_params)
try:
cursor = conn.cursor()
total_codes = 0
total_kv = 0
async with db.sqlorContext('sage') as sor:
for filepath in args.files:
print(f"导入: {filepath}")
c, kv = import_data_json(cursor, filepath)
c, kv = await import_data_json(sor, filepath)
total_codes += c
total_kv += kv
conn.commit()
print(f"\n完成! 共导入 appcodes: {total_codes} 条, appcodes_kv: {total_kv}")
except Exception as e:
conn.rollback()
print(f"错误,已回滚: {e}", file=sys.stderr)
sys.exit(1)
finally:
conn.close()
if __name__ == '__main__':
main()
asyncio.run(main())