"""Permission initialization for integrated CRM application. Reads perm_config.py and registers permissions to database. This should be run during deployment or first-time setup. Usage: # During deployment python app/init_permissions.py # Or called from integrated_crm_app.py init() """ import os import sys import asyncio from appPublic.uniqueID import getID from appPublic.log import debug, info from sqlor.dbpools import DBPools async def ensure_permission(sor, path, name='', permtype='page'): """Ensure a permission exists in the database.""" recs = await sor.R('permission', {'path': path}) if recs: return recs[0].id permid = getID() await sor.C('permission', { 'id': permid, 'name': name or path.split('/')[-1] or path, 'path': path, 'permtype': permtype, }) return permid async def ensure_role(sor, roleid, name, desc=''): """Ensure a role exists with the given ID.""" recs = await sor.R('role', {'id': roleid}) if recs: return recs[0].id await sor.C('role', { 'id': roleid, 'orgtypeid': '*', 'name': name, }) return roleid async def grant_permission(sor, roleid, permid): """Grant a permission to a role if not already granted.""" recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid}) if not recs: await sor.C('rolepermission', { 'id': getID(), 'roleid': roleid, 'permid': permid, }) async def init_permissions_from_config(dbname, config_module=None): """Initialize all permissions from perm_config.py. CRM是单业主机构系统,所有角色属于同一业主机构。 流程: 1. 创建约定角色(any/logined/anonymous)和定义的角色 2. 注册 PERMISSION_MATRIX 中的路径权限 3. 注册 CRUD 路径并授权 """ if config_module is None: sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from app.perm_config import ( ROLES, PERMISSION_MATRIX, CRUD_TABLES ) else: ROLES = config_module.ROLES PERMISSION_MATRIX = config_module.PERMISSION_MATRIX CRUD_TABLES = config_module.CRUD_TABLES db = DBPools() async with db.sqlorContext(dbname) as sor: # Step 1: 创建约定角色(固定ID,rbac/userperm.py硬编码检查) info('Creating convention roles...') role_ids = {} # 约定角色必须用固定ID for fixed_id in ['any', 'logined', 'anonymous']: role_ids[fixed_id] = await ensure_role(sor, fixed_id, fixed_id) # Step 2: 创建定义的角色 info('Creating defined roles...') for role in ROLES: role_ids[role['id']] = await ensure_role( sor, role['id'], role['name'], role.get('desc', '') ) # Step 3: 注册权限矩阵中的路径并授权 info('Registering permissions from matrix...') perm_count = 0 for module, paths in PERMISSION_MATRIX.items(): for path_pattern, role_list in paths.items(): permid = await ensure_permission(sor, path_pattern, name=f'{module}: {path_pattern}', permtype='module') for role_name in role_list: if role_name in role_ids: await grant_permission(sor, role_ids[role_name], permid) perm_count += 1 # Step 4: 注册 CRUD 路径 info('Registering CRUD paths...') for module, tables in CRUD_TABLES.items(): for table in tables: crud_path = f'/{module}/{table}/' permid = await ensure_permission(sor, crud_path, name=f'{module}/{table} CRUD', permtype='crud') # 根据模块权限矩阵授予权限 if module in PERMISSION_MATRIX: for path_pattern, role_list in PERMISSION_MATRIX[module].items(): if path_pattern.startswith(f'/{module}'): for role_name in role_list: if role_name in role_ids: await grant_permission(sor, role_ids[role_name], permid) perm_count += 1 info(f'Permission initialization complete: {perm_count} grants created') def main(): """Run permission initialization.""" import json from appPublic.jsonConfig import getConfig from appPublic.dictObject import DictObject config_path = os.path.dirname(os.path.dirname(__file__)) config = getConfig(config_path, {'workdir': config_path}) # Convert database config to DictObject format expected by DBPools db_config = {} for dbname, dbconf in config.databases.items(): db_config[dbname] = DictObject(driver=dbconf['driver'], kwargs=DictObject(**dbconf['kwargs'])) DBPools(db_config) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) dbname = list(config.databases.keys())[0] loop.run_until_complete(init_permissions_from_config(dbname)) if __name__ == '__main__': main()