#!/usr/bin/env python3 """ 将 Sage 相关模块 models/ 目录下的 .xlsx 文件转换为 .json 数据表定义文件。 用法(在 Sage 虚拟环境中运行): python ~/repos/sage/xlsx2json_models.py # 转换所有模块 python ~/repos/sage/xlsx2json_models.py llmage # 只转换 llmage python ~/repos/sage/xlsx2json_models.py --dry-run # 预览 """ import sys import os import json import argparse # Sage 虚拟环境路径 sage_root = os.path.expanduser('~/repos/sage') sys.path.insert(0, sage_root) sys.path.insert(0, os.path.join(sage_root, 'py3/lib/python3.10/site-packages')) from appPublic.folderUtils import listFile from xls2ddl.xlsxData import xlsxFactory, CRUDData # Sage 相关模块列表 SAGE_MODULES = [ 'appbase', 'rbac', 'accounting', 'llmage', 'platformbiz', 'msp', 'cpcc', 'unipay', 'filemgr', 'dapi', 'uapi', 'rag', 'charge', 'pricing', 'discount', 'hermes-web-cli', ] def xlsx_to_json_dict(xlsx_path): """将 xlsx 文件读取并转换为 json 格式的 dict。""" d = xlsxFactory(xlsx_path) if d is None: return None data = d.get_data() # 只保留目标字段 result = {} # summary: 确保 primary 是 list if 'summary' in data: result['summary'] = [] for s in data['summary']: entry = {'name': s.get('name'), 'title': s.get('title')} primary = s.get('primary', '') if isinstance(primary, list): entry['primary'] = primary elif primary: entry['primary'] = [p.strip() for p in str(primary).split(',')] else: entry['primary'] = [] if s.get('catelog'): entry['catelog'] = s['catelog'] result['summary'].append(entry) # fields: 清理 None 值 if 'fields' in data: result['fields'] = [] for f in data['fields']: entry = {'name': f.get('name'), 'title': f.get('title'), 'type': f.get('type')} if f.get('length') is not None: entry['length'] = f['length'] if f.get('nullable') is not None: entry['nullable'] = f['nullable'] if f.get('default') is not None: entry['default'] = f['default'] result['fields'].append(entry) # indexes: 从 validation 中的 idx 操作转换,或直接用 indexes sheet if 'indexes' in data and data['indexes']: result['indexes'] = data['indexes'] elif 'validation' in data: indexes = [] for v in data['validation']: if v.get('oper') == 'idx': idx = {'name': v.get('name'), 'idxtype': 'index'} val = v.get('value', '') parts = val.split(':') if len(parts) >= 2: idx['idxtype'] = parts[0] idx['idxfields'] = [f.strip() for f in parts[1].split(',')] indexes.append(idx) if indexes: result['indexes'] = indexes # codes if 'codes' in data and data['codes']: result['codes'] = data['codes'] return result def convert_module(module_name, dry_run=False): """转换单个模块的 models 目录。""" module_dir = os.path.expanduser(f'~/repos/{module_name}') models_dir = os.path.join(module_dir, 'models') if not os.path.isdir(models_dir): print(f" [SKIP] {module_name}: models/ directory not found") return 0, 0 xlsx_files = [f for f in listFile(models_dir) if f.endswith('.xlsx')] if not xlsx_files: print(f" [SKIP] {module_name}: no .xlsx files") return 0, 0 converted = 0 skipped = 0 for xlsx_path in xlsx_files: table_name = os.path.splitext(os.path.basename(xlsx_path))[0] json_path = os.path.join(models_dir, f'{table_name}.json') # 跳过已经是 json 的文件 if os.path.exists(json_path): print(f" [EXIST] {table_name}.json already exists") skipped += 1 continue try: result = xlsx_to_json_dict(xlsx_path) if result is None: print(f" [FAIL] {table_name}: could not read xlsx") skipped += 1 continue if dry_run: print(f" [DRY] Would create {table_name}.json ({len(result.get('fields', []))} fields)") converted += 1 continue with open(json_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=4) print(f" [OK] {table_name}.json ({len(result.get('fields', []))} fields)") converted += 1 except Exception as e: print(f" [FAIL] {table_name}: {e}") skipped += 1 return converted, skipped def main(): parser = argparse.ArgumentParser(description='Convert models/*.xlsx to models/*.json') parser.add_argument('modules', nargs='*', help='Module names to convert (default: all)') parser.add_argument('--dry-run', action='store_true', help='Preview without writing files') args = parser.parse_args() modules = args.modules if args.modules else SAGE_MODULES dry_run = args.dry_run print("=" * 60) print(" Sage Models: xlsx -> json 转换") print("=" * 60) print(f"Modules: {', '.join(modules)}") print(f"Dry run: {dry_run}") print() total_converted = 0 total_skipped = 0 for mod in modules: print(f"[{mod}]") c, s = convert_module(mod, dry_run=dry_run) total_converted += c total_skipped += s print() print("=" * 60) print(f" 完成: 转换 {total_converted}, 跳过 {total_skipped}") print("=" * 60) if __name__ == '__main__': main()