- perm_config.py: role definitions and permission matrix (underscore-based role IDs) - init_permissions.py: permission initialization script - .gitignore: add build artifacts exclusions - Remove multi-org type design, single owner org only
153 lines
5.2 KiB
Python
153 lines
5.2 KiB
Python
"""Permission initialization for integrated CRM application.
|
||
|
||
Reads perm_config.py and registers permissions to database.
|
||
This should be run during deployment or first-time setup.
|
||
|
||
Usage:
|
||
# During deployment
|
||
python app/init_permissions.py
|
||
|
||
# Or called from integrated_crm_app.py init()
|
||
"""
|
||
import os
|
||
import sys
|
||
import asyncio
|
||
from appPublic.uniqueID import getID
|
||
from appPublic.log import debug, info
|
||
from sqlor.dbpools import DBPools
|
||
|
||
|
||
async def ensure_permission(sor, path, name='', permtype='page'):
|
||
"""Ensure a permission exists in the database."""
|
||
recs = await sor.R('permission', {'path': path})
|
||
if recs:
|
||
return recs[0].id
|
||
permid = getID()
|
||
await sor.C('permission', {
|
||
'id': permid,
|
||
'name': name or path.split('/')[-1] or path,
|
||
'path': path,
|
||
'permtype': permtype,
|
||
})
|
||
return permid
|
||
|
||
|
||
async def ensure_role(sor, roleid, name, desc=''):
|
||
"""Ensure a role exists with the given ID."""
|
||
recs = await sor.R('role', {'id': roleid})
|
||
if recs:
|
||
return recs[0].id
|
||
await sor.C('role', {
|
||
'id': roleid,
|
||
'orgtypeid': '*',
|
||
'name': name,
|
||
})
|
||
return roleid
|
||
|
||
|
||
async def grant_permission(sor, roleid, permid):
|
||
"""Grant a permission to a role if not already granted."""
|
||
recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid})
|
||
if not recs:
|
||
await sor.C('rolepermission', {
|
||
'id': getID(),
|
||
'roleid': roleid,
|
||
'permid': permid,
|
||
})
|
||
|
||
|
||
async def init_permissions_from_config(dbname, config_module=None):
|
||
"""Initialize all permissions from perm_config.py.
|
||
|
||
CRM是单业主机构系统,所有角色属于同一业主机构。
|
||
流程:
|
||
1. 创建约定角色(any/logined/anonymous)和定义的角色
|
||
2. 注册 PERMISSION_MATRIX 中的路径权限
|
||
3. 注册 CRUD 路径并授权
|
||
"""
|
||
if config_module is None:
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||
from app.perm_config import (
|
||
ROLES, PERMISSION_MATRIX, CRUD_TABLES
|
||
)
|
||
else:
|
||
ROLES = config_module.ROLES
|
||
PERMISSION_MATRIX = config_module.PERMISSION_MATRIX
|
||
CRUD_TABLES = config_module.CRUD_TABLES
|
||
|
||
db = DBPools()
|
||
async with db.sqlorContext(dbname) as sor:
|
||
# Step 1: 创建约定角色(固定ID,rbac/userperm.py硬编码检查)
|
||
info('Creating convention roles...')
|
||
role_ids = {}
|
||
|
||
# 约定角色必须用固定ID
|
||
for fixed_id in ['any', 'logined', 'anonymous']:
|
||
role_ids[fixed_id] = await ensure_role(sor, fixed_id, fixed_id)
|
||
|
||
# Step 2: 创建定义的角色
|
||
info('Creating defined roles...')
|
||
for role in ROLES:
|
||
role_ids[role['id']] = await ensure_role(
|
||
sor, role['id'], role['name'], role.get('desc', '')
|
||
)
|
||
|
||
# Step 3: 注册权限矩阵中的路径并授权
|
||
info('Registering permissions from matrix...')
|
||
perm_count = 0
|
||
|
||
for module, paths in PERMISSION_MATRIX.items():
|
||
for path_pattern, role_list in paths.items():
|
||
permid = await ensure_permission(sor, path_pattern,
|
||
name=f'{module}: {path_pattern}', permtype='module')
|
||
|
||
for role_name in role_list:
|
||
if role_name in role_ids:
|
||
await grant_permission(sor, role_ids[role_name], permid)
|
||
perm_count += 1
|
||
|
||
# Step 4: 注册 CRUD 路径
|
||
info('Registering CRUD paths...')
|
||
for module, tables in CRUD_TABLES.items():
|
||
for table in tables:
|
||
crud_path = f'/{module}/{table}/'
|
||
permid = await ensure_permission(sor, crud_path,
|
||
name=f'{module}/{table} CRUD', permtype='crud')
|
||
|
||
# 根据模块权限矩阵授予权限
|
||
if module in PERMISSION_MATRIX:
|
||
for path_pattern, role_list in PERMISSION_MATRIX[module].items():
|
||
if path_pattern.startswith(f'/{module}'):
|
||
for role_name in role_list:
|
||
if role_name in role_ids:
|
||
await grant_permission(sor, role_ids[role_name], permid)
|
||
perm_count += 1
|
||
|
||
info(f'Permission initialization complete: {perm_count} grants created')
|
||
|
||
|
||
def main():
|
||
"""Run permission initialization."""
|
||
import json
|
||
from appPublic.jsonConfig import getConfig
|
||
from appPublic.dictObject import DictObject
|
||
|
||
config_path = os.path.dirname(os.path.dirname(__file__))
|
||
config = getConfig(config_path, {'workdir': config_path})
|
||
|
||
# Convert database config to DictObject format expected by DBPools
|
||
db_config = {}
|
||
for dbname, dbconf in config.databases.items():
|
||
db_config[dbname] = DictObject(driver=dbconf['driver'], kwargs=DictObject(**dbconf['kwargs']))
|
||
|
||
DBPools(db_config)
|
||
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
dbname = list(config.databases.keys())[0]
|
||
loop.run_until_complete(init_permissions_from_config(dbname))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|