integrated_crm_app/app/init_permissions.py
2026-05-03 14:26:27 +08:00

287 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Permission initialization for integrated CRM application.
Reads perm_config.py and registers permissions to database.
This should be run during deployment or first-time setup.
Usage:
# During deployment
python app/init_permissions.py
# Or called from integrated_crm_app.py init()
"""
import os
import sys
import asyncio
import fnmatch
from appPublic.uniqueID import getID
from appPublic.log import debug, info
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools
def _scan_wwwroot_paths(wwwroot_dir, module_dir):
"""Scan wwwroot directory to get all actual file paths for a module.
Returns list of relative paths like '/customer_management/customer_list.ui'
"""
full_dir = os.path.join(wwwroot_dir, module_dir)
if not os.path.isdir(full_dir):
return []
paths = []
for root, dirs, files in os.walk(full_dir):
for f in files:
abs_path = os.path.join(root, f)
rel_path = os.path.relpath(abs_path, wwwroot_dir)
# Convert to URL path (forward slashes, leading /)
url_path = '/' + rel_path.replace(os.sep, '/')
paths.append(url_path)
return sorted(paths)
def _expand_wildcard_pattern(pattern, wwwroot_dir, all_paths):
"""Expand a wildcard pattern like '/customer_management/**' to actual paths.
Since rbac.check_roles_path() only does exact match, we need to expand
** patterns into individual file paths.
"""
if '**' not in pattern:
return [pattern]
# Pattern like '/customer_management/**' or '/financial_management/invoice**'
prefix = pattern.replace('**', '').rstrip('/')
matched = []
for p in all_paths:
if p.startswith(prefix):
matched.append(p)
return matched if matched else [pattern]
def _expand_permissions(PERMISSION_MATRIX, wwwroot_dir):
"""Expand PERMISSION_MATRIX with wildcard patterns into actual file paths.
Returns dict: {role_id: set_of_paths}
"""
# Build master list of all actual file paths
all_paths = []
if os.path.isdir(wwwroot_dir):
for item in os.listdir(wwwroot_dir):
full_item = os.path.join(wwwroot_dir, item)
if os.path.isdir(full_item):
all_paths.extend(_scan_wwwroot_paths(wwwroot_dir, item))
info(f'Scanned {len(all_paths)} actual file paths from wwwroot')
# Expand patterns
role_paths = {}
for module, patterns in PERMISSION_MATRIX.items():
for pattern, role_list in patterns.items():
if '**' in pattern:
expanded = _expand_wildcard_pattern(pattern, wwwroot_dir, all_paths)
info(f' Expanded "{pattern}" -> {len(expanded)} paths')
else:
expanded = [pattern]
for path in expanded:
if path not in role_paths:
role_paths[path] = set()
for role in role_list:
role_paths[path].add(role)
return role_paths
async def ensure_permission(sor, path, name='', permtype='page'):
"""Ensure a permission exists in the database."""
recs = await sor.R('permission', {'path': path})
if recs:
return recs[0].id
permid = getID()
await sor.C('permission', {
'id': permid,
'name': name or path.split('/')[-1] or path,
'path': path,
'permtype': permtype,
})
return permid
async def ensure_role(sor, roleid, name, desc=''):
"""Ensure a role exists with the given ID.
If a role with the same name or roleid-as-name already exists, reuse its ID.
This handles the case where roles were pre-created with UUID IDs but we define
them with string IDs in perm_config.py.
"""
# Try matching by name (supports both Chinese and English names)
for match_name in [name, roleid]:
recs = await sor.R('role', {'name': match_name})
if recs:
return recs[0].id
# Try matching by ID
recs = await sor.R('role', {'id': roleid})
if recs:
return recs[0].id
# Create new role
await sor.C('role', {
'id': roleid,
'orgtypeid': '*',
'name': name,
})
return roleid
async def grant_permission(sor, roleid, permid):
"""Grant a permission to a role if not already granted."""
recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid})
if not recs:
await sor.C('rolepermission', {
'id': getID(),
'roleid': roleid,
'permid': permid,
})
async def init_permissions_from_config(dbname, config_module=None):
"""Initialize all permissions from perm_config.py.
CRM是单业主机构系统所有角色属于同一业主机构。
流程:
1. 创建约定角色any/logined/anonymous和定义的角色
2. 扫描 wwwroot 目录,展开 ** 通配符为实际文件路径
3. 注册权限并授权
4. 注册 CRUD 路径并授权
"""
if config_module is None:
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.perm_config import (
ROLES, PERMISSION_MATRIX, CRUD_TABLES
)
else:
ROLES = config_module.ROLES
PERMISSION_MATRIX = config_module.PERMISSION_MATRIX
CRUD_TABLES = config_module.CRUD_TABLES
# Get wwwroot directory from config
config = getConfig()
workdir = os.path.dirname(os.path.dirname(__file__))
wwwroot_dir = os.path.join(workdir, 'wwwroot')
# Expand wildcards into actual file paths
role_paths = _expand_permissions(PERMISSION_MATRIX, wwwroot_dir)
db = DBPools()
async with db.sqlorContext(dbname) as sor:
# Step 1: 创建约定角色固定IDrbac/userperm.py硬编码检查
info('Creating convention roles...')
role_ids = {}
for fixed_id in ['any', 'logined', 'anonymous']:
role_ids[fixed_id] = await ensure_role(sor, fixed_id, fixed_id)
# Step 2: 创建定义的角色
info('Creating defined roles...')
for role in ROLES:
role_ids[role['id']] = await ensure_role(
sor, role['id'], role['name'], role.get('desc', '')
)
# Step 3: 注册展开后的路径权限并授权
info('Registering expanded permissions...')
perm_count = 0
for path, roles in role_paths.items():
# Register both /xxx and /main/xxx variants
# The auth middleware passes request.path (e.g. /main/customer_management/xxx.ui)
# but perm_config.py defines paths without /main prefix
variants = [path]
if not path.startswith('/main/'):
variants.append('/main' + path)
for variant in variants:
permid = await ensure_permission(sor, variant, permtype='page')
for role_name in roles:
if role_name in role_ids:
await grant_permission(sor, role_ids[role_name], permid)
perm_count += 1
# Step 4: 注册 CRUD 路径
info('Registering CRUD paths...')
for module, tables in CRUD_TABLES.items():
for table in tables:
crud_path = f'/{module}/{table}/'
permid = await ensure_permission(sor, crud_path,
name=f'{module}/{table} CRUD', permtype='crud')
# Grant to roles that have module-level access
if module in PERMISSION_MATRIX:
for path_pattern, role_list in PERMISSION_MATRIX[module].items():
if path_pattern.startswith(f'/{module}'):
for role_name in role_list:
if role_name in role_ids:
await grant_permission(sor, role_ids[role_name], permid)
perm_count += 1
info(f'Permission initialization complete: {perm_count} grants created')
# Step 5: 同步 admin 和 superuser 角色权限(兼容已有数据库)
# 关键superadmin 用户的角色是 UUID-based (orgtypeid=customer),需要额外同步
info('Syncing admin and superuser role permissions...')
admin_role_id = role_ids.get('admin')
superuser_role_id = role_ids.get('superuser')
admin_super_id = role_ids.get('admin_superuser')
# Get admin_superuser grants (source of truth)
admin_super_perms = []
if admin_super_id:
admin_super_perms = await sor.R('rolepermission', {'roleid': admin_super_id})
for g in admin_super_perms:
if admin_role_id:
await grant_permission(sor, admin_role_id, g.permid)
perm_count += 1
if superuser_role_id:
await grant_permission(sor, superuser_role_id, g.permid)
perm_count += 1
# 额外同步:找到所有 orgtypeid=customer 的 admin/superuser 角色并授权
info('Syncing customer-org admin/superuser roles...')
all_roles = await sor.R('role', {'orgtypeid': 'customer'})
for r in all_roles:
if r.name in ('admin', 'superuser'):
for g in admin_super_perms:
await grant_permission(sor, r.id, g.permid)
perm_count += 1
info(f' Synced {len(admin_super_perms)} grants to role {r.id} (name={r.name})')
info(f'After sync: {perm_count} total grants created')
def main():
"""Run permission initialization."""
import json
from appPublic.dictObject import DictObject
config_path = os.path.dirname(os.path.dirname(__file__))
config = getConfig(config_path, {'workdir': config_path})
# Convert database config to DictObject format expected by DBPools
db_config = {}
for dbname, dbconf in config.databases.items():
db_config[dbname] = DictObject(driver=dbconf['driver'], kwargs=DictObject(**dbconf['kwargs']))
DBPools(db_config)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
dbname = list(config.databases.keys())[0]
loop.run_until_complete(init_permissions_from_config(dbname))
if __name__ == '__main__':
main()