integrated_crm_app/app/init_permissions.py
yumoqing 4c5b2a5716 feat: add permission config and initialization for single-owner CRM
- perm_config.py: role definitions and permission matrix (underscore-based role IDs)
- init_permissions.py: permission initialization script
- .gitignore: add build artifacts exclusions
- Remove multi-org type design, single owner org only
2026-04-29 12:57:14 +08:00

153 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Permission initialization for integrated CRM application.
Reads perm_config.py and registers permissions to database.
This should be run during deployment or first-time setup.
Usage:
# During deployment
python app/init_permissions.py
# Or called from integrated_crm_app.py init()
"""
import os
import sys
import asyncio
from appPublic.uniqueID import getID
from appPublic.log import debug, info
from sqlor.dbpools import DBPools
async def ensure_permission(sor, path, name='', permtype='page'):
"""Ensure a permission exists in the database."""
recs = await sor.R('permission', {'path': path})
if recs:
return recs[0].id
permid = getID()
await sor.C('permission', {
'id': permid,
'name': name or path.split('/')[-1] or path,
'path': path,
'permtype': permtype,
})
return permid
async def ensure_role(sor, roleid, name, desc=''):
"""Ensure a role exists with the given ID."""
recs = await sor.R('role', {'id': roleid})
if recs:
return recs[0].id
await sor.C('role', {
'id': roleid,
'orgtypeid': '*',
'name': name,
})
return roleid
async def grant_permission(sor, roleid, permid):
"""Grant a permission to a role if not already granted."""
recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid})
if not recs:
await sor.C('rolepermission', {
'id': getID(),
'roleid': roleid,
'permid': permid,
})
async def init_permissions_from_config(dbname, config_module=None):
"""Initialize all permissions from perm_config.py.
CRM是单业主机构系统所有角色属于同一业主机构。
流程:
1. 创建约定角色any/logined/anonymous和定义的角色
2. 注册 PERMISSION_MATRIX 中的路径权限
3. 注册 CRUD 路径并授权
"""
if config_module is None:
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.perm_config import (
ROLES, PERMISSION_MATRIX, CRUD_TABLES
)
else:
ROLES = config_module.ROLES
PERMISSION_MATRIX = config_module.PERMISSION_MATRIX
CRUD_TABLES = config_module.CRUD_TABLES
db = DBPools()
async with db.sqlorContext(dbname) as sor:
# Step 1: 创建约定角色固定IDrbac/userperm.py硬编码检查
info('Creating convention roles...')
role_ids = {}
# 约定角色必须用固定ID
for fixed_id in ['any', 'logined', 'anonymous']:
role_ids[fixed_id] = await ensure_role(sor, fixed_id, fixed_id)
# Step 2: 创建定义的角色
info('Creating defined roles...')
for role in ROLES:
role_ids[role['id']] = await ensure_role(
sor, role['id'], role['name'], role.get('desc', '')
)
# Step 3: 注册权限矩阵中的路径并授权
info('Registering permissions from matrix...')
perm_count = 0
for module, paths in PERMISSION_MATRIX.items():
for path_pattern, role_list in paths.items():
permid = await ensure_permission(sor, path_pattern,
name=f'{module}: {path_pattern}', permtype='module')
for role_name in role_list:
if role_name in role_ids:
await grant_permission(sor, role_ids[role_name], permid)
perm_count += 1
# Step 4: 注册 CRUD 路径
info('Registering CRUD paths...')
for module, tables in CRUD_TABLES.items():
for table in tables:
crud_path = f'/{module}/{table}/'
permid = await ensure_permission(sor, crud_path,
name=f'{module}/{table} CRUD', permtype='crud')
# 根据模块权限矩阵授予权限
if module in PERMISSION_MATRIX:
for path_pattern, role_list in PERMISSION_MATRIX[module].items():
if path_pattern.startswith(f'/{module}'):
for role_name in role_list:
if role_name in role_ids:
await grant_permission(sor, role_ids[role_name], permid)
perm_count += 1
info(f'Permission initialization complete: {perm_count} grants created')
def main():
"""Run permission initialization."""
import json
from appPublic.jsonConfig import getConfig
from appPublic.dictObject import DictObject
config_path = os.path.dirname(os.path.dirname(__file__))
config = getConfig(config_path, {'workdir': config_path})
# Convert database config to DictObject format expected by DBPools
db_config = {}
for dbname, dbconf in config.databases.items():
db_config[dbname] = DictObject(driver=dbconf['driver'], kwargs=DictObject(**dbconf['kwargs']))
DBPools(db_config)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
dbname = list(config.databases.keys())[0]
loop.run_until_complete(init_permissions_from_config(dbname))
if __name__ == '__main__':
main()