fix: RBAC permission init - remove /main prefix, include js/css files, use * wildcard

- perm_config.py: all paths no longer use /main prefix
- init_permissions.py:
  1. scan wwwroot including symlinks for .ui/.dspy/.js/.css
  2. register paths without /main prefix
  3. create admin_superuser user (super/Kyy@123456)
  4. use orgtypeid='*' for role wildcard matching
- sync app/ and root copies
This commit is contained in:
yumoqing 2026-05-05 13:44:55 +08:00
parent f932bfb088
commit 1ed4ce0935
4 changed files with 1659 additions and 378 deletions

View File

@ -1,286 +1,426 @@
"""Permission initialization for integrated CRM application.
Reads perm_config.py and registers permissions to database.
This should be run during deployment or first-time setup.
Usage:
# During deployment
python app/init_permissions.py
# Or called from integrated_crm_app.py init()
#!/usr/bin/env python3
"""
Integrated CRM - RBAC 权限初始化脚本
功能
1. 遍历 wwwroot 下所有文件含子目录和 ln -s 链接目录将路径写入 permission
2. 为各机构的角色设置每个 path 的权限通过 orgtypeid='*' 通配所有机构
3. 初始化 admin_superuser 用户用户名 super密码 Kyy@123456
依赖 RBAC 通配机制
- 用户角色会展开为 orgtypeid.nameorgtypeid.**.name 三种 key
- 授权给 orgtypeid='*' 的角色等于授权给所有机构的同名角色
- *.admin_superuser 匹配所有机构的 admin_superuser 角色
使用方法
cd ~/repos/integrated_crm_app
source py3/bin/activate
python app/init_permissions.py
部署流程
1. 停止应用pkill -f integrated_crm_app.py
2. 执行 python app/init_permissions.py
3. 启动应用nohup python app/integrated_crm_app.py --port 8080 &
"""
import asyncio
import os
import sys
import asyncio
import fnmatch
from appPublic.uniqueID import getID
from appPublic.log import debug, info
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools
import importlib.util
# ============================================================
# 配置
# ============================================================
APP_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, APP_ROOT)
# 数据库配置(与 conf/config.json 一致)
DB_CONFIG = {
"host": "localhost",
"port": 3306,
"user": "hermes",
"password": "hermes123",
"db": "crm_db",
"charset": "utf8mb4",
}
# perm_config.py 路径
PERM_CONFIG_PATH = os.path.join(APP_ROOT, "perm_config.py")
# wwwroot 根目录(含符号链接子目录)
WWWROOT_BASE = os.path.join(APP_ROOT, "wwwroot")
# admin_superuser 初始账号密码
ADMIN_USERNAME = "super"
ADMIN_PASSWORD = "Kyy@123456"
def _scan_wwwroot_paths(wwwroot_dir, module_dir):
"""Scan wwwroot directory to get all actual file paths for a module.
Returns list of relative paths like '/customer_management/customer_list.ui'
# ============================================================
# 加载 perm_config
# ============================================================
def load_perm_config():
spec = importlib.util.spec_from_file_location("perm_config", PERM_CONFIG_PATH)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ============================================================
# 遍历 wwwroot 所有文件(含 ln -s 链接目录)
# ============================================================
def scan_wwwroot(wwwroot_path):
"""
遍历 wwwroot 目录下所有文件递归跟随符号链接
返回 URL 路径列表 ['/login.ui', '/customer_management/customer_list.ui']
关键
1. os.walk 默认不跟随符号链接需要 followlinks=True
2. 排除 self-referencing symlinks main -> wwwroot防止死循环
3. /main 前缀由 config.json paths 映射产生此处返回相对路径即可
脚本会自动注册 /xxx /main/xxx 双路径
"""
full_dir = os.path.join(wwwroot_dir, module_dir)
if not os.path.isdir(full_dir):
return []
paths = []
for root, dirs, files in os.walk(full_dir):
if not os.path.isdir(wwwroot_path):
print(f" [WARN] wwwroot not found: {wwwroot_path}")
return paths
real_wwwroot = os.path.realpath(wwwroot_path)
visited_real = set()
for root, dirs, files in os.walk(wwwroot_path, followlinks=True):
real_root = os.path.realpath(root)
if real_root in visited_real:
dirs.clear() # 防止进入循环目录
continue
visited_real.add(real_root)
# 过滤掉指向 wwwroot 自身的符号链接目录
safe_dirs = []
for d in dirs:
real_d = os.path.realpath(os.path.join(root, d))
if real_d != real_wwwroot and real_d not in visited_real:
safe_dirs.append(d)
dirs[:] = safe_dirs
for f in files:
abs_path = os.path.join(root, f)
rel_path = os.path.relpath(abs_path, wwwroot_dir)
# Convert to URL path (forward slashes, leading /)
url_path = '/' + rel_path.replace(os.sep, '/')
# 注册 .ui, .dspy, .js, .css 文件的权限
if not (f.endswith('.ui') or f.endswith('.dspy') or f.endswith('.js') or f.endswith('.css')):
continue
full = os.path.join(root, f)
rel = os.path.relpath(full, wwwroot_path)
url_path = '/' + rel.replace(os.sep, '/')
paths.append(url_path)
return sorted(paths)
def _expand_wildcard_pattern(pattern, wwwroot_dir, all_paths):
"""Expand a wildcard pattern like '/customer_management/**' to actual paths.
Since rbac.check_roles_path() only does exact match, we need to expand
** patterns into individual file paths.
"""
if '**' not in pattern:
return [pattern]
# Pattern like '/customer_management/**' or '/financial_management/invoice**'
prefix = pattern.replace('**', '').rstrip('/')
matched = []
for p in all_paths:
if p.startswith(prefix):
matched.append(p)
return matched if matched else [pattern]
# ============================================================
# 数据库操作sqlor CRUD
# ============================================================
def _expand_permissions(PERMISSION_MATRIX, wwwroot_dir):
"""Expand PERMISSION_MATRIX with wildcard patterns into actual file paths.
Returns dict: {role_id: set_of_paths}
"""
# Build master list of all actual file paths
all_paths = []
if os.path.isdir(wwwroot_dir):
for item in os.listdir(wwwroot_dir):
full_item = os.path.join(wwwroot_dir, item)
if os.path.isdir(full_item):
all_paths.extend(_scan_wwwroot_paths(wwwroot_dir, item))
info(f'Scanned {len(all_paths)} actual file paths from wwwroot')
# Expand patterns
role_paths = {}
for module, patterns in PERMISSION_MATRIX.items():
for pattern, role_list in patterns.items():
if '**' in pattern:
expanded = _expand_wildcard_pattern(pattern, wwwroot_dir, all_paths)
info(f' Expanded "{pattern}" -> {len(expanded)} paths')
else:
expanded = [pattern]
for path in expanded:
if path not in role_paths:
role_paths[path] = set()
for role in role_list:
role_paths[path].add(role)
return role_paths
async def ensure_permission(sor, path, name='', permtype='page'):
"""Ensure a permission exists in the database."""
async def ensure_permission(sor, path, permtype='page'):
"""确保 permission 记录存在,返回 permission ID。"""
recs = await sor.R('permission', {'path': path})
if recs:
return recs[0].id
from appPublic.uniqueID import getID
permid = getID()
await sor.C('permission', {
'id': permid,
'name': name or path.split('/')[-1] or path,
'path': path,
'permtype': permtype,
'parentid': '',
'description': f'Auto-generated for {path}',
})
return permid
async def ensure_role(sor, roleid, name, desc=''):
"""Ensure a role exists with the given ID.
If a role with the same name or roleid-as-name already exists, reuse its ID.
This handles the case where roles were pre-created with UUID IDs but we define
them with string IDs in perm_config.py.
async def ensure_role(sor, role_id, name, desc='', orgtypeid='*'):
"""
# Try matching by name (supports both Chinese and English names)
for match_name in [name, roleid]:
recs = await sor.R('role', {'name': match_name})
if recs:
return recs[0].id
# Try matching by ID
recs = await sor.R('role', {'id': roleid})
确保角色存在
匹配顺序中文名 -> 英文ID -> 创建新角色
默认 orgtypeid='*' 表示通配所有机构
"""
# 按中文名匹配
recs = await sor.R('role', {'name': name})
if recs:
return recs[0].id
# Create new role
# 按英文ID匹配
recs = await sor.R('role', {'id': role_id})
if recs:
return recs[0].id
# 创建新角色
from appPublic.uniqueID import getID
rid = role_id
await sor.C('role', {
'id': roleid,
'orgtypeid': '*',
'id': rid,
'orgtypeid': orgtypeid,
'name': name,
'role_name': role_id,
'description': desc,
})
return roleid
print(f" [CREATED] role: {role_id} ({name}) orgtypeid={orgtypeid} id={rid}")
return rid
async def grant_permission(sor, roleid, permid):
"""Grant a permission to a role if not already granted."""
recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid})
if not recs:
async def grant_perm(sor, role_id, perm_id):
"""授予角色权限(幂等)。"""
existing = await sor.R('rolepermission', {'roleid': role_id, 'permid': perm_id})
if not existing:
await sor.C('rolepermission', {
'id': getID(),
'roleid': roleid,
'permid': permid,
'roleid': role_id,
'permid': perm_id,
})
async def init_permissions_from_config(dbname, config_module=None):
"""Initialize all permissions from perm_config.py.
CRM是单业主机构系统所有角色属于同一业主机构
流程
1. 创建约定角色any/logined/anonymous和定义的角色
2. 扫描 wwwroot 目录展开 ** 通配符为实际文件路径
3. 注册权限并授权
4. 注册 CRUD 路径并授权
def dual_paths(path):
"""返回路径列表。只使用原始路径,不加 /main 前缀。"""
# 如果路径以 /main 开头,去掉 /main 前缀(统一使用无前缀路径)
if path.startswith('/main'):
cleaned = path[5:] # '/main/xxx' -> '/xxx'
if not cleaned:
cleaned = '/'
return [cleaned]
return [path]
# ============================================================
# 构建路径->角色映射
# ============================================================
def build_path_role_map(perm_config, all_files):
"""
if config_module is None:
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app.perm_config import (
ROLES, PERMISSION_MATRIX, CRUD_TABLES
)
else:
ROLES = config_module.ROLES
PERMISSION_MATRIX = config_module.PERMISSION_MATRIX
CRUD_TABLES = config_module.CRUD_TABLES
# Get wwwroot directory from config
config = getConfig()
workdir = os.path.dirname(os.path.dirname(__file__))
wwwroot_dir = os.path.join(workdir, 'wwwroot')
# Expand wildcards into actual file paths
role_paths = _expand_permissions(PERMISSION_MATRIX, wwwroot_dir)
根据 perm_config.PERMISSION_MATRIX 和扫描到的文件列表
构建 {url_path: [role_ids]} 映射
对于非通配路径精确路径如果该路径在 all_files 加入映射
对于通配路径/module/**展开到该目录下所有匹配文件
"""
path_roles = {} # path -> set of role_ids
for _section, entries in perm_config.PERMISSION_MATRIX.items():
for pattern, roles in entries.items():
if '**' in pattern:
# 通配模式:展开到 wwwroot 下匹配的文件
base = pattern.replace('/**', '').lstrip('/')
prefix = '/' + base if base else ''
for fp in all_files:
if fp.startswith(prefix):
if fp not in path_roles:
path_roles[fp] = set()
path_roles[fp].update(roles)
else:
# 精确路径:如果该文件存在,加入映射
if pattern in all_files:
if pattern not in path_roles:
path_roles[pattern] = set()
path_roles[pattern].update(roles)
return {k: list(v) for k, v in path_roles.items()}
# ============================================================
# 主流程
# ============================================================
async def main():
print("=" * 65)
print("Integrated CRM - RBAC 权限初始化")
print("=" * 65)
# [1] 加载配置
pc = load_perm_config()
print(f"\n[1/6] 加载 perm_config.py")
print(f" 角色定义: {len(pc.ROLES)}")
print(f" 权限配置段: {len(pc.PERMISSION_MATRIX)}")
# [2] 扫描 wwwroot 所有文件
all_files = scan_wwwroot(WWWROOT_BASE)
print(f"\n[2/6] 扫描 wwwroot 文件(含符号链接): {len(all_files)}")
if all_files:
print(f" 示例: {all_files[0]}")
if len(all_files) > 1:
print(f" ...")
print(f" {all_files[-1]}")
# [3] 构建 路径->角色 映射
path_role_map = build_path_role_map(pc, all_files)
print(f"\n[3/6] 路径-角色映射: {len(path_role_map)} 个路径需要授权")
# [4] 连接数据库 + 注册所有角色
from sqlor.dbpools import DBPools
db = DBPools()
async with db.sqlorContext(dbname) as sor:
# Step 1: 创建约定角色固定IDrbac/userperm.py硬编码检查
info('Creating convention roles...')
db.addDatabase('crm_db', {
'driver': 'aiomysql',
'kwargs': DB_CONFIG,
})
print(f"\n[4/6] 连接数据库 {DB_CONFIG['db']}...")
async with db.sqlorContext('crm_db') as sor:
# ---- 4a: 创建/验证所有角色 ----
# 关键:所有角色使用 orgtypeid='*' 创建,配合 RBAC 通配机制
print(f"\n [4a] 注册角色orgtypeid='*' 通配所有机构)...")
role_ids = {}
for fixed_id in ['any', 'logined', 'anonymous']:
role_ids[fixed_id] = await ensure_role(sor, fixed_id, fixed_id)
# Step 2: 创建定义的角色
info('Creating defined roles...')
for role in ROLES:
role_ids[role['id']] = await ensure_role(
sor, role['id'], role['name'], role.get('desc', '')
)
# Step 3: 注册展开后的路径权限并授权
info('Registering expanded permissions...')
for rid, (name, desc) in pc.ROLES.items():
role_ids[rid] = await ensure_role(sor, rid, name, desc, orgtypeid='*')
# ---- 4b: 注册 permission + 授权 ----
print(f"\n [4b] 注册权限并授权...")
perm_count = 0
for path, roles in role_paths.items():
# Register both /xxx and /main/xxx variants
# The auth middleware passes request.path (e.g. /main/customer_management/xxx.ui)
# but perm_config.py defines paths without /main prefix
variants = [path]
if not path.startswith('/main/'):
variants.append('/main' + path)
for variant in variants:
permid = await ensure_permission(sor, variant, permtype='page')
for role_name in roles:
if role_name in role_ids:
await grant_permission(sor, role_ids[role_name], permid)
perm_count += 1
# Step 4: 注册 CRUD 路径
info('Registering CRUD paths...')
for module, tables in CRUD_TABLES.items():
grant_count = 0
error_count = 0
for path, roles in path_role_map.items():
for p in dual_paths(path):
try:
permid = await ensure_permission(sor, p, permtype='page')
perm_count += 1
for rn in roles:
if rn in ('any', 'logined'):
continue
if rn not in role_ids:
print(f" [WARN] 未知角色: {rn} (path: {p})")
error_count += 1
continue
await grant_perm(sor, role_ids[rn], permid)
grant_count += 1
except Exception as e:
print(f" [ERROR] {p}: {e}")
error_count += 1
print(f" 注册权限: {perm_count}, 授权次数: {grant_count}, 错误: {error_count}")
# ---- 4c: 注册 CRUD API 权限 ----
print(f"\n [4c] 注册 CRUD API 权限...")
crud_count = 0
for module, tables in pc.CRUD_TABLES.items():
for table in tables:
crud_path = f'/{module}/{table}/'
permid = await ensure_permission(sor, crud_path,
name=f'{module}/{table} CRUD', permtype='crud')
# Grant to roles that have module-level access
if module in PERMISSION_MATRIX:
for path_pattern, role_list in PERMISSION_MATRIX[module].items():
if path_pattern.startswith(f'/{module}'):
for role_name in role_list:
if role_name in role_ids:
await grant_permission(sor, role_ids[role_name], permid)
perm_count += 1
info(f'Permission initialization complete: {perm_count} grants created')
# Step 5: 同步 admin 和 superuser 角色权限(兼容已有数据库)
# 关键superadmin 用户的角色是 UUID-based (orgtypeid=customer),需要额外同步
info('Syncing admin and superuser role permissions...')
admin_role_id = role_ids.get('admin')
superuser_role_id = role_ids.get('superuser')
admin_super_id = role_ids.get('admin_superuser')
# Get admin_superuser grants (source of truth)
admin_super_perms = []
if admin_super_id:
admin_super_perms = await sor.R('rolepermission', {'roleid': admin_super_id})
for g in admin_super_perms:
if admin_role_id:
await grant_permission(sor, admin_role_id, g.permid)
perm_count += 1
if superuser_role_id:
await grant_permission(sor, superuser_role_id, g.permid)
perm_count += 1
# 额外同步:找到所有 orgtypeid=customer 的 admin/superuser 角色并授权
info('Syncing customer-org admin/superuser roles...')
all_roles = await sor.R('role', {'orgtypeid': 'customer'})
for r in all_roles:
if r.name in ('admin', 'superuser'):
for g in admin_super_perms:
await grant_permission(sor, r.id, g.permid)
perm_count += 1
info(f' Synced {len(admin_super_perms)} grants to role {r.id} (name={r.name})')
info(f'After sync: {perm_count} total grants created')
for action in ('list', 'create', 'update', 'delete'):
path = f"/{module}/api/{table}_{action}.dspy"
try:
permid = await ensure_permission(sor, path, permtype='api')
await grant_perm(sor, role_ids['admin_superuser'], permid)
crud_count += 1
except Exception:
pass
print(f" 注册 CRUD API: {crud_count}")
# ---- 4d: 同步 admin_superuser 权限到各机构角色 ----
print(f"\n [4d] 同步 admin_superuser 权限到各机构已有角色...")
admin_perms = await sor.R('rolepermission', {'roleid': role_ids['admin_superuser']})
sync_count = 0
# 查找所有 orgtypeid='customer' 且名称为 admin/superuser 的角色
customer_roles = await sor.R('role', {'orgtypeid': 'customer'})
for r in customer_roles:
if r.name in ('admin', 'superuser', '系统管理员'):
for g in admin_perms:
await grant_perm(sor, r.id, g.permid)
sync_count += 1
print(f" 同步授权: {sync_count}")
# ---- 5: 创建 admin_superuser 用户 ----
print(f"\n[5/6] 创建 admin_superuser 用户...")
await create_admin_user(sor, ADMIN_USERNAME, ADMIN_PASSWORD, role_ids['admin_superuser'])
# ---- 6: 汇总 ----
print(f"\n[6/6] 初始化汇总")
print("-" * 65)
print(f" 文件扫描: {len(all_files)}")
print(f" 路径注册: {len(path_role_map)} 个原始路径 → {perm_count} 个(含/main变体")
print(f" 角色配置: {len(role_ids)}")
print(f" 权限授权: {grant_count}")
print(f" CRUD API: {crud_count}")
print(f" 机构同步: {sync_count}")
print(f" 管理员账号: {ADMIN_USERNAME} / {ADMIN_PASSWORD}")
print(f" 错误: {error_count}")
print()
print(" 重要:重启应用以刷新 RBAC 权限缓存!")
print(" pkill -f integrated_crm_app.py")
print(" nohup python app/integrated_crm_app.py --port 8080 &")
print()
# 角色权限数量统计
print("角色权限明细:")
print("-" * 65)
for rid, dbid in role_ids.items():
name, desc = pc.ROLES[rid]
count = len(await sor.R('rolepermission', {'roleid': dbid}))
print(f" {rid:25s} | {name:10s} | {count:4d} 条权限 | {desc}")
def main():
"""Run permission initialization."""
import json
from appPublic.dictObject import DictObject
config_path = os.path.dirname(os.path.dirname(__file__))
config = getConfig(config_path, {'workdir': config_path})
# Convert database config to DictObject format expected by DBPools
db_config = {}
for dbname, dbconf in config.databases.items():
db_config[dbname] = DictObject(driver=dbconf['driver'], kwargs=DictObject(**dbconf['kwargs']))
DBPools(db_config)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
dbname = list(config.databases.keys())[0]
loop.run_until_complete(init_permissions_from_config(dbname))
async def create_admin_user(sor, username, password, superuser_role_id):
"""
创建 admin_superuser 用户并分配角色
使用 password_encode() 对密码进行 bcrypt 加密
"""
from ahserver.globalEnv import password_encode
from appPublic.uniqueID import getID
from appPublic.timeUtils import timestampstr
# 检查用户是否已存在
recs = await sor.R('users', {'username': username})
if recs:
# 更新密码
uid = recs[0].id
encoded_pw = password_encode(password)
await sor.U('users', {'id': uid, 'password': encoded_pw})
print(f" 用户 {username} 已存在,已更新密码")
# 检查是否已有角色
existing_roles = await sor.R('userrole', {'userid': uid, 'roleid': superuser_role_id})
if not existing_roles:
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': superuser_role_id,
})
print(f" 已添加 admin_superuser 角色")
return uid
# 创建新用户
uid = getID()
encoded_pw = password_encode(password)
now = timestampstr()
# 创建用户orgid 与 userid 相同,表示自属机构)
await sor.C('users', {
'id': uid,
'username': username,
'password': encoded_pw,
'orgid': uid,
'nick_name': '系统管理员',
'created_at': now,
'login_fail_count': 0,
})
# 创建对应机构
await sor.C('organization', {
'id': uid,
'orgname': '系统管理',
})
await sor.C('orgtypes', {
'id': getID(),
'orgid': uid,
'orgtypeid': 'platform',
})
# 分配 admin_superuser 角色
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': superuser_role_id,
})
print(f" [CREATED] user: {username} id={uid} (密码已bcrypt加密)")
return uid
if __name__ == '__main__':
main()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,183 +1,449 @@
"""CRM Permission Configuration
#!/usr/bin/env python3
"""
Integrated CRM Application - RBAC Permission Configuration
单一业主机构内的角色和权限配置
部署时由管理员通过RBAC UI或此配置初始化
Convention roles:
- any: 任何人未登录
- logined: 已登录用户
- anonymous: 匿名用户
Defines roles, permission matrix, and CRUD table mappings for the
four-department CRM system (Sales, Marketing, Operations, Finance).
"""
# ============================================================
# 单一业主机构
# ROLE DEFINITIONS
# ============================================================
# CRM为单业主机构系统不跨机构。
# 所有角色属于同一业主机构内部的不同部门。
# Format: role_id -> (display_name, description)
# Role IDs are used in perm_config; ensure_role() matches by name or ID.
ROLES = {
# --- 销售部门 Sales ---
"sales_director": ("销售总监", "销售部门最高负责人,全模块读写+审批"),
"sales_manager": ("销售经理", "销售团队管理,本部门数据读写+审批"),
"sales_rep": ("销售代表", "一线销售,本人数据读写"),
"sales_support": ("销售支持", "销售辅助岗位,全局只读"),
# --- 市场部门 Marketing ---
"marketing_director": ("市场总监", "市场部门最高负责人,全模块读写+审批"),
"marketing_manager": ("市场经理", "市场团队管理,本部门数据读写+审批"),
"marketing_specialist": ("市场专员", "一线市场人员,本人数据读写"),
"campaign_operator": ("活动运营", "市场活动运营,活动关联数据读写"),
# --- 运维部门 Operations ---
"ops_director": ("运维总监", "运维部门最高负责人,全模块读写+审批"),
"ops_manager": ("运维经理", "运维团队管理,本部门合同读写+审批"),
"ops_engineer": ("运维工程师", "运维技术人员,工单/合同相关读写"),
"customer_service": ("客服专员", "客户服务,客户管理读写+合同只读"),
# --- 财务部门 Finance ---
"finance_director": ("财务总监", "财务部门最高负责人,全模块读写+审批"),
"finance_manager": ("财务经理", "财务团队管理,合同/财务读写+审批"),
"accountant": ("会计", "财务核算,财务模块读写"),
"cashier": ("出纳", "收付款执行,收付款读写"),
# --- 系统级 System ---
"admin_superuser": ("系统管理员", "超级管理员,全平台所有权限"),
}
# ============================================================
# 角色定义
# PERMISSION_MATRIX
# ============================================================
ROLES = [
# 销售部
{'id': 'sales_manager', 'name': '销售经理', 'desc': '团队管理、客户分配、审批'},
{'id': 'sales_rep', 'name': '销售代表', 'desc': '客户跟进、商机推进、合同起草'},
# 财务部
{'id': 'finance_admin', 'name': '财务管理员', 'desc': '所有财务操作、报表'},
{'id': 'finance_clerk', 'name': '财务出纳', 'desc': '收款登记、付款处理'},
# 管理员(兼容已有数据库中的角色名)
{'id': 'admin', 'name': '管理员', 'desc': '全部权限'},
{'id': 'superuser', 'name': '超级用户', 'desc': '全部权限'},
{'id': 'admin_superuser', 'name': '超级用户', 'desc': '全部权限,初始化用'},
]
# Maps URL path patterns -> list of roles that can access.
# 路径不加 /main 前缀,统一使用相对于 wwwroot 的路径。
#
# RBAC 通配机制:
# - 角色展开为 orgtypeid.name、orgtypeid.*、*.name 三种 key
# - *.role_name 匹配所有机构的同名角色
#
# 权限级别通过路径分组体现:
# /api/*_create.dspy, /api/*_update.dspy, /api/*_delete.dspy -> 写操作
# /api/*_list.dspy, *.ui -> 读操作
# /api/*_list.dspy, *.ui -> read operations
#
# The init script expands '**' to concrete file paths, then registers
# both canonical (/module/path.ui) and /main-prefixed (/main/module/path.ui) variants.
# ============================================================
# 权限矩阵
# 格式: {模块名: {路径模式: [有权限的角色列表]}}
# ============================================================
PERMISSION_MATRIX = {
# ----------------------------------------------------------
# 静态资源bricks 框架)—— 任何人可访问
# ----------------------------------------------------------
'bricks': {
'/bricks/**': ['any'],
# ========================================================
# Public / System Resources
# ========================================================
"bricks_static": {
"/bricks/**": ["any"], # All logged-in users
},
# ----------------------------------------------------------
# 公共路径(登录等)—— 任何人可访问
# ----------------------------------------------------------
'public': {
'/main/rbac/user/login.ui': ['any'],
'/main/rbac/user/login.dspy': ['any'],
'/main/rbac/user/up_login.dspy': ['any'],
'/main/rbac/user/logout.dspy': ['any'],
'/main/rbac/user/register.dspy': ['any'],
'/main/rbac/user/register.ui': ['any'],
'/main/login.ui': ['any'],
'/main/login.dspy': ['any'],
"rbac_public": {
"/main/login.ui": ["any"],
"/main/login.dspy": ["any"],
},
# ----------------------------------------------------------
# 根路径 —— 已登录用户可访问(/main/ 指向 base.ui
# ----------------------------------------------------------
'root': {
'/main/': ['logined'],
'/main/index.html': ['logined'],
'/main/base.ui': ['logined'],
'/base.ui': ['logined'],
'/login.ui': ['any'],
'/login.dspy': ['any'],
},
# ----------------------------------------------------------
# 客户管理模块 - 销售+财务+管理员
# ----------------------------------------------------------
'customer_management': {
'/customer_management/**': [
'sales_manager', 'sales_rep',
'finance_admin',
'admin_superuser',
# ========================================================
# 客户管理 Customer Management
# ========================================================
# 模块文件: customer_list.ui, customer_edit.ui, customer_pool.ui,
# handover_list.ui, base.ui
# api/customers_list.dspy, api/customers_create.dspy,
# api/customers_update.dspy, api/customers_delete.dspy,
# api/customer_pool_list.dspy, api/handover_list.dspy
"customer_management_read": {
"/customer_management/customer_list.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager", "marketing_specialist", "campaign_operator",
"ops_director", "ops_manager", "ops_engineer", "customer_service",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
'/customer_management/customer_handover**': [
'sales_manager', 'sales_rep',
'admin_superuser',
"/customer_management/customer_edit.ui": [
"sales_director", "sales_manager", "sales_rep",
"customer_service",
"admin_superuser",
],
'/customer_management/customer_pool**': [
'sales_manager', 'sales_rep',
'admin_superuser',
"/customer_management/customer_pool.ui": [
"sales_director", "sales_manager", "sales_rep",
"admin_superuser",
],
"/customer_management/handover_list.ui": [
"sales_director", "sales_manager",
"admin_superuser",
],
"/customer_management/base.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"customer_service",
"admin_superuser",
],
},
"customer_management_api_read": {
"/customer_management/api/customers_list.dspy": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager", "marketing_specialist", "campaign_operator",
"ops_director", "ops_manager", "ops_engineer", "customer_service",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/customer_management/api/customer_pool_list.dspy": [
"sales_director", "sales_manager", "sales_rep",
"admin_superuser",
],
"/customer_management/api/handover_list.dspy": [
"sales_director", "sales_manager",
"admin_superuser",
],
},
"customer_management_api_write": {
"/customer_management/api/customers_create.dspy": [
"sales_director", "sales_manager", "sales_rep",
"customer_service",
"admin_superuser",
],
"/customer_management/api/customers_update.dspy": [
"sales_director", "sales_manager", "sales_rep",
"customer_service",
"admin_superuser",
],
"/customer_management/api/customers_delete.dspy": [
"sales_director", "sales_manager",
"admin_superuser",
],
},
# ----------------------------------------------------------
# 商机管理模块 - 销售+管理员
# ----------------------------------------------------------
'opportunity_management': {
'/opportunity_management/**': [
'sales_manager', 'sales_rep',
'admin_superuser',
# ========================================================
# 商机管理 Opportunity Management
# ========================================================
# 模块文件: opportunity_management.ui, opportunity_edit.ui, base.ui
# api/opportunities_list.dspy, api/opportunities_create.dspy,
# api/opportunities_update.dspy, api/opportunities_delete.dspy,
# api/sales_stages_list.dspy
"opportunity_management_read": {
"/opportunity_management/opportunity_management.ui": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/opportunity_edit.ui": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/base.ui": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager",
"admin_superuser",
],
},
"opportunity_management_api_read": {
"/opportunity_management/api/opportunities_list.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/api/sales_stages_list.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
},
"opportunity_management_api_write": {
"/opportunity_management/api/opportunities_create.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/api/opportunities_update.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/api/opportunities_delete.dspy": [
"sales_director", "sales_manager",
"marketing_director", "marketing_manager",
"admin_superuser",
],
},
# ----------------------------------------------------------
# 合同管理模块 - 销售+财务+管理员
# ----------------------------------------------------------
'contract_management': {
'/contract_management/**': [
'sales_manager', 'sales_rep',
'finance_admin',
'admin_superuser',
# ========================================================
# 合同管理 Contract Management
# ========================================================
# 模块文件: contract_list.ui, contract_edit.ui, contract_detail.ui,
# ai_config.ui
# api/contract_list.dspy, api/contracts_create.dspy,
# api/contracts_update.dspy, api/contracts_delete.dspy,
# api/check_contract.dspy
"contract_management_read": {
"/contract_management/contract_list.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager", "ops_engineer",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
'/contract_management/contract_ai_config**': [
'sales_manager',
'admin_superuser',
"/contract_management/contract_edit.ui": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
"/contract_management/contract_detail.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"ops_director", "ops_manager", "ops_engineer",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/contract_management/ai_config.ui": [
"sales_director", "sales_manager",
"ops_director",
"finance_director",
"admin_superuser",
],
"/contract_management/base.ui": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
},
"contract_management_api_read": {
"/contract_management/api/contract_list.dspy": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"ops_director", "ops_manager", "ops_engineer",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/contract_management/api/check_contract.dspy": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
},
"contract_management_api_write": {
"/contract_management/api/contracts_create.dspy": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
"/contract_management/api/contracts_update.dspy": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
"/contract_management/api/contracts_delete.dspy": [
"sales_director", "sales_manager",
"ops_director",
"finance_director",
"admin_superuser",
],
},
# ----------------------------------------------------------
# 财务管理模块 - 财务+管理员
# ----------------------------------------------------------
'financial_management': {
'/financial_management/**': [
'finance_admin', 'finance_clerk',
'admin_superuser',
# ========================================================
# 财务管理 Financial Management
# ========================================================
# 模块文件: index.ui, receivables.ui, receivable_edit.ui,
# payments.ui, receipts.ui, financial_vouchers.ui
# api/receivables.dspy, api/receivables_list.dspy,
# api/receivables_create.dspy, api/receivables_update.dspy,
# api/receivables_delete.dspy
"financial_management_read": {
"/financial_management/index.ui": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
'/financial_management/receivables**': [
'sales_manager',
'finance_admin', 'finance_clerk',
'admin_superuser',
"/financial_management/receivables.ui": [
"sales_director", "sales_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/receivable_edit.ui": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/financial_management/payments.ui": [
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/receipts.ui": [
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/financial_vouchers.ui": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
},
"financial_management_api_read": {
"/financial_management/api/receivables.dspy": [
"sales_director", "sales_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/api/receivables_list.dspy": [
"sales_director", "sales_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/api/debug_receivables.dspy": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/financial_management/api/test_env.dspy": [
"admin_superuser",
],
},
"financial_management_api_write": {
"/financial_management/api/receivables_create.dspy": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/financial_management/api/receivables_update.dspy": [
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/api/receivables_delete.dspy": [
"finance_director", "finance_manager",
"admin_superuser",
],
},
# ----------------------------------------------------------
# 工作流审批模块 - 全部内部角色
# ----------------------------------------------------------
'workflow_approval': {
'/workflow_approval/**': [
'sales_manager', 'sales_rep',
'finance_admin', 'finance_clerk',
'admin_superuser',
# ========================================================
# 审批管理 Workflow Approval (placeholder)
# ========================================================
"workflow_approval": {
"/workflow_approval/**": [
"sales_director", "sales_manager",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
},
# ----------------------------------------------------------
# 统一仪表盘 - 全部内部角色
# ----------------------------------------------------------
'unified_dashboard': {
'/unified_dashboard/**': [
'sales_manager', 'sales_rep',
'finance_admin', 'finance_clerk',
'admin_superuser',
# ========================================================
# 统一仪表盘 Unified Dashboard (placeholder)
# ========================================================
"unified_dashboard": {
"/unified_dashboard/**": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
},
# ========================================================
# RBAC Admin (system admin only)
# ========================================================
"rbac_admin": {
"/rbac/**": ["admin_superuser"],
},
# ========================================================
# AppBase (system admin + department directors)
# ========================================================
"appbase": {
"/appbase/**": [
"admin_superuser",
"sales_director", "marketing_director",
"ops_director", "finance_director",
],
},
# ========================================================
# Main app pages (login redirect, base layout)
# ========================================================
"main_app": {
"/main/base.ui": ["logined"], # All logged-in users
"/main/index.ui": ["logined"],
},
}
# ============================================================
# CRUD 路径
# 每个模块的表自动生成 CRUD 路径: /{modulename}/{tablename}/
# CRUD TABLE PERMISSIONS
# ============================================================
# Maps module -> list of database tables. The init script registers
# CRUD API endpoints for each table.
CRUD_TABLES = {
'customer_management': [
'customers', 'customer_pool',
'customer_handover', 'customer_handover_items',
"customer_management": [
"customers",
"customer_pool",
"customer_handover",
"customer_handover_items",
],
'opportunity_management': [
'opportunities', 'opportunity_stage_history',
'opportunity_predictions', 'sales_stages',
"opportunity_management": [
"opportunities",
"sales_stages",
"opportunity_stage_history",
],
'contract_management': [
'contract', 'contract_versions', 'contract_attachment',
'contract_milestones', 'contract_ai_config',
'orders', 'order_payments',
"contract_management": [
"contracts",
"contract_milestones",
"contract_versions",
"contract_attachments",
"orders",
"order_payments",
],
'financial_management': [
'receivables', 'receipts', 'receipt_allocations',
'payments', 'financial_vouchers',
],
'workflow_approval': [
'approval_workflow', 'approval_instance',
'approval_step', 'approval_task',
],
'unified_dashboard': [
'dashboard_config', 'report_template', 'user_dashboard',
"financial_management": [
"receivables",
"payments",
"receipts",
"receipt_allocations",
"financial_vouchers",
],
}
# ============================================================
# MODULE DIRECTORY MAP
# ============================================================
# Maps module name -> wwwroot subdirectory path for wildcard expansion.
# Used by init_permissions.py to locate files on disk.
MODULE_WWWROOT = {
"customer_management": "customer_management",
"opportunity_management": "opportunity_management",
"contract_management": "contract_management",
"financial_management": "financial_management",
"workflow_approval": "workflow_approval",
"unified_dashboard": "unified_dashboard",
"rbac": "rbac",
"appbase": "appbase",
"bricks": "bricks",
}

426
init_permissions.py Normal file
View File

@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""
Integrated CRM - RBAC 权限初始化脚本
功能
1. 遍历 wwwroot 下所有文件含子目录和 ln -s 链接目录将路径写入 permission
2. 为各机构的角色设置每个 path 的权限通过 orgtypeid='*' 通配所有机构
3. 初始化 admin_superuser 用户用户名 super密码 Kyy@123456
依赖 RBAC 通配机制
- 用户角色会展开为 orgtypeid.nameorgtypeid.**.name 三种 key
- 授权给 orgtypeid='*' 的角色等于授权给所有机构的同名角色
- *.admin_superuser 匹配所有机构的 admin_superuser 角色
使用方法
cd ~/repos/integrated_crm_app
source py3/bin/activate
python app/init_permissions.py
部署流程
1. 停止应用pkill -f integrated_crm_app.py
2. 执行 python app/init_permissions.py
3. 启动应用nohup python app/integrated_crm_app.py --port 8080 &
"""
import asyncio
import os
import sys
import importlib.util
# ============================================================
# 配置
# ============================================================
APP_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, APP_ROOT)
# 数据库配置(与 conf/config.json 一致)
DB_CONFIG = {
"host": "localhost",
"port": 3306,
"user": "hermes",
"password": "hermes123",
"db": "crm_db",
"charset": "utf8mb4",
}
# perm_config.py 路径
PERM_CONFIG_PATH = os.path.join(APP_ROOT, "perm_config.py")
# wwwroot 根目录(含符号链接子目录)
WWWROOT_BASE = os.path.join(APP_ROOT, "wwwroot")
# admin_superuser 初始账号密码
ADMIN_USERNAME = "super"
ADMIN_PASSWORD = "Kyy@123456"
# ============================================================
# 加载 perm_config
# ============================================================
def load_perm_config():
spec = importlib.util.spec_from_file_location("perm_config", PERM_CONFIG_PATH)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ============================================================
# 遍历 wwwroot 所有文件(含 ln -s 链接目录)
# ============================================================
def scan_wwwroot(wwwroot_path):
"""
遍历 wwwroot 目录下所有文件递归跟随符号链接
返回 URL 路径列表 ['/login.ui', '/customer_management/customer_list.ui']
关键
1. os.walk 默认不跟随符号链接需要 followlinks=True
2. 排除 self-referencing symlinks main -> wwwroot防止死循环
3. /main 前缀由 config.json paths 映射产生此处返回相对路径即可
脚本会自动注册 /xxx /main/xxx 双路径
"""
paths = []
if not os.path.isdir(wwwroot_path):
print(f" [WARN] wwwroot not found: {wwwroot_path}")
return paths
real_wwwroot = os.path.realpath(wwwroot_path)
visited_real = set()
for root, dirs, files in os.walk(wwwroot_path, followlinks=True):
real_root = os.path.realpath(root)
if real_root in visited_real:
dirs.clear() # 防止进入循环目录
continue
visited_real.add(real_root)
# 过滤掉指向 wwwroot 自身的符号链接目录
safe_dirs = []
for d in dirs:
real_d = os.path.realpath(os.path.join(root, d))
if real_d != real_wwwroot and real_d not in visited_real:
safe_dirs.append(d)
dirs[:] = safe_dirs
for f in files:
# 注册 .ui, .dspy, .js, .css 文件的权限
if not (f.endswith('.ui') or f.endswith('.dspy') or f.endswith('.js') or f.endswith('.css')):
continue
full = os.path.join(root, f)
rel = os.path.relpath(full, wwwroot_path)
url_path = '/' + rel.replace(os.sep, '/')
paths.append(url_path)
return sorted(paths)
# ============================================================
# 数据库操作sqlor CRUD
# ============================================================
async def ensure_permission(sor, path, permtype='page'):
"""确保 permission 记录存在,返回 permission ID。"""
recs = await sor.R('permission', {'path': path})
if recs:
return recs[0].id
from appPublic.uniqueID import getID
permid = getID()
await sor.C('permission', {
'id': permid,
'path': path,
'permtype': permtype,
'parentid': '',
'description': f'Auto-generated for {path}',
})
return permid
async def ensure_role(sor, role_id, name, desc='', orgtypeid='*'):
"""
确保角色存在
匹配顺序中文名 -> 英文ID -> 创建新角色
默认 orgtypeid='*' 表示通配所有机构
"""
# 按中文名匹配
recs = await sor.R('role', {'name': name})
if recs:
return recs[0].id
# 按英文ID匹配
recs = await sor.R('role', {'id': role_id})
if recs:
return recs[0].id
# 创建新角色
from appPublic.uniqueID import getID
rid = role_id
await sor.C('role', {
'id': rid,
'orgtypeid': orgtypeid,
'name': name,
'role_name': role_id,
'description': desc,
})
print(f" [CREATED] role: {role_id} ({name}) orgtypeid={orgtypeid} id={rid}")
return rid
async def grant_perm(sor, role_id, perm_id):
"""授予角色权限(幂等)。"""
existing = await sor.R('rolepermission', {'roleid': role_id, 'permid': perm_id})
if not existing:
await sor.C('rolepermission', {
'roleid': role_id,
'permid': perm_id,
})
def dual_paths(path):
"""返回路径列表。只使用原始路径,不加 /main 前缀。"""
# 如果路径以 /main 开头,去掉 /main 前缀(统一使用无前缀路径)
if path.startswith('/main'):
cleaned = path[5:] # '/main/xxx' -> '/xxx'
if not cleaned:
cleaned = '/'
return [cleaned]
return [path]
# ============================================================
# 构建路径->角色映射
# ============================================================
def build_path_role_map(perm_config, all_files):
"""
根据 perm_config.PERMISSION_MATRIX 和扫描到的文件列表
构建 {url_path: [role_ids]} 映射
对于非通配路径精确路径如果该路径在 all_files 加入映射
对于通配路径/module/**展开到该目录下所有匹配文件
"""
path_roles = {} # path -> set of role_ids
for _section, entries in perm_config.PERMISSION_MATRIX.items():
for pattern, roles in entries.items():
if '**' in pattern:
# 通配模式:展开到 wwwroot 下匹配的文件
base = pattern.replace('/**', '').lstrip('/')
prefix = '/' + base if base else ''
for fp in all_files:
if fp.startswith(prefix):
if fp not in path_roles:
path_roles[fp] = set()
path_roles[fp].update(roles)
else:
# 精确路径:如果该文件存在,加入映射
if pattern in all_files:
if pattern not in path_roles:
path_roles[pattern] = set()
path_roles[pattern].update(roles)
return {k: list(v) for k, v in path_roles.items()}
# ============================================================
# 主流程
# ============================================================
async def main():
print("=" * 65)
print("Integrated CRM - RBAC 权限初始化")
print("=" * 65)
# [1] 加载配置
pc = load_perm_config()
print(f"\n[1/6] 加载 perm_config.py")
print(f" 角色定义: {len(pc.ROLES)}")
print(f" 权限配置段: {len(pc.PERMISSION_MATRIX)}")
# [2] 扫描 wwwroot 所有文件
all_files = scan_wwwroot(WWWROOT_BASE)
print(f"\n[2/6] 扫描 wwwroot 文件(含符号链接): {len(all_files)}")
if all_files:
print(f" 示例: {all_files[0]}")
if len(all_files) > 1:
print(f" ...")
print(f" {all_files[-1]}")
# [3] 构建 路径->角色 映射
path_role_map = build_path_role_map(pc, all_files)
print(f"\n[3/6] 路径-角色映射: {len(path_role_map)} 个路径需要授权")
# [4] 连接数据库 + 注册所有角色
from sqlor.dbpools import DBPools
db = DBPools()
db.addDatabase('crm_db', {
'driver': 'aiomysql',
'kwargs': DB_CONFIG,
})
print(f"\n[4/6] 连接数据库 {DB_CONFIG['db']}...")
async with db.sqlorContext('crm_db') as sor:
# ---- 4a: 创建/验证所有角色 ----
# 关键:所有角色使用 orgtypeid='*' 创建,配合 RBAC 通配机制
print(f"\n [4a] 注册角色orgtypeid='*' 通配所有机构)...")
role_ids = {}
for rid, (name, desc) in pc.ROLES.items():
role_ids[rid] = await ensure_role(sor, rid, name, desc, orgtypeid='*')
# ---- 4b: 注册 permission + 授权 ----
print(f"\n [4b] 注册权限并授权...")
perm_count = 0
grant_count = 0
error_count = 0
for path, roles in path_role_map.items():
for p in dual_paths(path):
try:
permid = await ensure_permission(sor, p, permtype='page')
perm_count += 1
for rn in roles:
if rn in ('any', 'logined'):
continue
if rn not in role_ids:
print(f" [WARN] 未知角色: {rn} (path: {p})")
error_count += 1
continue
await grant_perm(sor, role_ids[rn], permid)
grant_count += 1
except Exception as e:
print(f" [ERROR] {p}: {e}")
error_count += 1
print(f" 注册权限: {perm_count}, 授权次数: {grant_count}, 错误: {error_count}")
# ---- 4c: 注册 CRUD API 权限 ----
print(f"\n [4c] 注册 CRUD API 权限...")
crud_count = 0
for module, tables in pc.CRUD_TABLES.items():
for table in tables:
for action in ('list', 'create', 'update', 'delete'):
path = f"/{module}/api/{table}_{action}.dspy"
try:
permid = await ensure_permission(sor, path, permtype='api')
await grant_perm(sor, role_ids['admin_superuser'], permid)
crud_count += 1
except Exception:
pass
print(f" 注册 CRUD API: {crud_count}")
# ---- 4d: 同步 admin_superuser 权限到各机构角色 ----
print(f"\n [4d] 同步 admin_superuser 权限到各机构已有角色...")
admin_perms = await sor.R('rolepermission', {'roleid': role_ids['admin_superuser']})
sync_count = 0
# 查找所有 orgtypeid='customer' 且名称为 admin/superuser 的角色
customer_roles = await sor.R('role', {'orgtypeid': 'customer'})
for r in customer_roles:
if r.name in ('admin', 'superuser', '系统管理员'):
for g in admin_perms:
await grant_perm(sor, r.id, g.permid)
sync_count += 1
print(f" 同步授权: {sync_count}")
# ---- 5: 创建 admin_superuser 用户 ----
print(f"\n[5/6] 创建 admin_superuser 用户...")
await create_admin_user(sor, ADMIN_USERNAME, ADMIN_PASSWORD, role_ids['admin_superuser'])
# ---- 6: 汇总 ----
print(f"\n[6/6] 初始化汇总")
print("-" * 65)
print(f" 文件扫描: {len(all_files)}")
print(f" 路径注册: {len(path_role_map)} 个原始路径 → {perm_count} 个(含/main变体")
print(f" 角色配置: {len(role_ids)}")
print(f" 权限授权: {grant_count}")
print(f" CRUD API: {crud_count}")
print(f" 机构同步: {sync_count}")
print(f" 管理员账号: {ADMIN_USERNAME} / {ADMIN_PASSWORD}")
print(f" 错误: {error_count}")
print()
print(" 重要:重启应用以刷新 RBAC 权限缓存!")
print(" pkill -f integrated_crm_app.py")
print(" nohup python app/integrated_crm_app.py --port 8080 &")
print()
# 角色权限数量统计
print("角色权限明细:")
print("-" * 65)
for rid, dbid in role_ids.items():
name, desc = pc.ROLES[rid]
count = len(await sor.R('rolepermission', {'roleid': dbid}))
print(f" {rid:25s} | {name:10s} | {count:4d} 条权限 | {desc}")
async def create_admin_user(sor, username, password, superuser_role_id):
"""
创建 admin_superuser 用户并分配角色
使用 password_encode() 对密码进行 bcrypt 加密
"""
from ahserver.globalEnv import password_encode
from appPublic.uniqueID import getID
from appPublic.timeUtils import timestampstr
# 检查用户是否已存在
recs = await sor.R('users', {'username': username})
if recs:
# 更新密码
uid = recs[0].id
encoded_pw = password_encode(password)
await sor.U('users', {'id': uid, 'password': encoded_pw})
print(f" 用户 {username} 已存在,已更新密码")
# 检查是否已有角色
existing_roles = await sor.R('userrole', {'userid': uid, 'roleid': superuser_role_id})
if not existing_roles:
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': superuser_role_id,
})
print(f" 已添加 admin_superuser 角色")
return uid
# 创建新用户
uid = getID()
encoded_pw = password_encode(password)
now = timestampstr()
# 创建用户orgid 与 userid 相同,表示自属机构)
await sor.C('users', {
'id': uid,
'username': username,
'password': encoded_pw,
'orgid': uid,
'nick_name': '系统管理员',
'created_at': now,
'login_fail_count': 0,
})
# 创建对应机构
await sor.C('organization', {
'id': uid,
'orgname': '系统管理',
})
await sor.C('orgtypes', {
'id': getID(),
'orgid': uid,
'orgtypeid': 'platform',
})
# 分配 admin_superuser 角色
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': superuser_role_id,
})
print(f" [CREATED] user: {username} id={uid} (密码已bcrypt加密)")
return uid
if __name__ == "__main__":
asyncio.run(main())

449
perm_config.py Normal file
View File

@ -0,0 +1,449 @@
#!/usr/bin/env python3
"""
Integrated CRM Application - RBAC Permission Configuration
Defines roles, permission matrix, and CRUD table mappings for the
four-department CRM system (Sales, Marketing, Operations, Finance).
"""
# ============================================================
# ROLE DEFINITIONS
# ============================================================
# Format: role_id -> (display_name, description)
# Role IDs are used in perm_config; ensure_role() matches by name or ID.
ROLES = {
# --- 销售部门 Sales ---
"sales_director": ("销售总监", "销售部门最高负责人,全模块读写+审批"),
"sales_manager": ("销售经理", "销售团队管理,本部门数据读写+审批"),
"sales_rep": ("销售代表", "一线销售,本人数据读写"),
"sales_support": ("销售支持", "销售辅助岗位,全局只读"),
# --- 市场部门 Marketing ---
"marketing_director": ("市场总监", "市场部门最高负责人,全模块读写+审批"),
"marketing_manager": ("市场经理", "市场团队管理,本部门数据读写+审批"),
"marketing_specialist": ("市场专员", "一线市场人员,本人数据读写"),
"campaign_operator": ("活动运营", "市场活动运营,活动关联数据读写"),
# --- 运维部门 Operations ---
"ops_director": ("运维总监", "运维部门最高负责人,全模块读写+审批"),
"ops_manager": ("运维经理", "运维团队管理,本部门合同读写+审批"),
"ops_engineer": ("运维工程师", "运维技术人员,工单/合同相关读写"),
"customer_service": ("客服专员", "客户服务,客户管理读写+合同只读"),
# --- 财务部门 Finance ---
"finance_director": ("财务总监", "财务部门最高负责人,全模块读写+审批"),
"finance_manager": ("财务经理", "财务团队管理,合同/财务读写+审批"),
"accountant": ("会计", "财务核算,财务模块读写"),
"cashier": ("出纳", "收付款执行,收付款读写"),
# --- 系统级 System ---
"admin_superuser": ("系统管理员", "超级管理员,全平台所有权限"),
}
# ============================================================
# PERMISSION_MATRIX
# ============================================================
# Maps URL path patterns -> list of roles that can access.
# 路径不加 /main 前缀,统一使用相对于 wwwroot 的路径。
#
# RBAC 通配机制:
# - 角色展开为 orgtypeid.name、orgtypeid.*、*.name 三种 key
# - *.role_name 匹配所有机构的同名角色
#
# 权限级别通过路径分组体现:
# /api/*_create.dspy, /api/*_update.dspy, /api/*_delete.dspy -> 写操作
# /api/*_list.dspy, *.ui -> 读操作
# /api/*_list.dspy, *.ui -> read operations
#
# The init script expands '**' to concrete file paths, then registers
# both canonical (/module/path.ui) and /main-prefixed (/main/module/path.ui) variants.
PERMISSION_MATRIX = {
# ========================================================
# Public / System Resources
# ========================================================
"bricks_static": {
"/bricks/**": ["any"], # All logged-in users
},
"rbac_public": {
"/main/login.ui": ["any"],
"/main/login.dspy": ["any"],
},
# ========================================================
# 客户管理 Customer Management
# ========================================================
# 模块文件: customer_list.ui, customer_edit.ui, customer_pool.ui,
# handover_list.ui, base.ui
# api/customers_list.dspy, api/customers_create.dspy,
# api/customers_update.dspy, api/customers_delete.dspy,
# api/customer_pool_list.dspy, api/handover_list.dspy
"customer_management_read": {
"/customer_management/customer_list.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager", "marketing_specialist", "campaign_operator",
"ops_director", "ops_manager", "ops_engineer", "customer_service",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/customer_management/customer_edit.ui": [
"sales_director", "sales_manager", "sales_rep",
"customer_service",
"admin_superuser",
],
"/customer_management/customer_pool.ui": [
"sales_director", "sales_manager", "sales_rep",
"admin_superuser",
],
"/customer_management/handover_list.ui": [
"sales_director", "sales_manager",
"admin_superuser",
],
"/customer_management/base.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"customer_service",
"admin_superuser",
],
},
"customer_management_api_read": {
"/customer_management/api/customers_list.dspy": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager", "marketing_specialist", "campaign_operator",
"ops_director", "ops_manager", "ops_engineer", "customer_service",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/customer_management/api/customer_pool_list.dspy": [
"sales_director", "sales_manager", "sales_rep",
"admin_superuser",
],
"/customer_management/api/handover_list.dspy": [
"sales_director", "sales_manager",
"admin_superuser",
],
},
"customer_management_api_write": {
"/customer_management/api/customers_create.dspy": [
"sales_director", "sales_manager", "sales_rep",
"customer_service",
"admin_superuser",
],
"/customer_management/api/customers_update.dspy": [
"sales_director", "sales_manager", "sales_rep",
"customer_service",
"admin_superuser",
],
"/customer_management/api/customers_delete.dspy": [
"sales_director", "sales_manager",
"admin_superuser",
],
},
# ========================================================
# 商机管理 Opportunity Management
# ========================================================
# 模块文件: opportunity_management.ui, opportunity_edit.ui, base.ui
# api/opportunities_list.dspy, api/opportunities_create.dspy,
# api/opportunities_update.dspy, api/opportunities_delete.dspy,
# api/sales_stages_list.dspy
"opportunity_management_read": {
"/opportunity_management/opportunity_management.ui": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/opportunity_edit.ui": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/base.ui": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager",
"admin_superuser",
],
},
"opportunity_management_api_read": {
"/opportunity_management/api/opportunities_list.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/api/sales_stages_list.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
},
"opportunity_management_api_write": {
"/opportunity_management/api/opportunities_create.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/api/opportunities_update.dspy": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager", "marketing_specialist",
"admin_superuser",
],
"/opportunity_management/api/opportunities_delete.dspy": [
"sales_director", "sales_manager",
"marketing_director", "marketing_manager",
"admin_superuser",
],
},
# ========================================================
# 合同管理 Contract Management
# ========================================================
# 模块文件: contract_list.ui, contract_edit.ui, contract_detail.ui,
# ai_config.ui
# api/contract_list.dspy, api/contracts_create.dspy,
# api/contracts_update.dspy, api/contracts_delete.dspy,
# api/check_contract.dspy
"contract_management_read": {
"/contract_management/contract_list.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager", "ops_engineer",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/contract_management/contract_edit.ui": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
"/contract_management/contract_detail.ui": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"ops_director", "ops_manager", "ops_engineer",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/contract_management/ai_config.ui": [
"sales_director", "sales_manager",
"ops_director",
"finance_director",
"admin_superuser",
],
"/contract_management/base.ui": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
},
"contract_management_api_read": {
"/contract_management/api/contract_list.dspy": [
"sales_director", "sales_manager", "sales_rep", "sales_support",
"ops_director", "ops_manager", "ops_engineer",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/contract_management/api/check_contract.dspy": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
},
"contract_management_api_write": {
"/contract_management/api/contracts_create.dspy": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
"/contract_management/api/contracts_update.dspy": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager",
"admin_superuser",
],
"/contract_management/api/contracts_delete.dspy": [
"sales_director", "sales_manager",
"ops_director",
"finance_director",
"admin_superuser",
],
},
# ========================================================
# 财务管理 Financial Management
# ========================================================
# 模块文件: index.ui, receivables.ui, receivable_edit.ui,
# payments.ui, receipts.ui, financial_vouchers.ui
# api/receivables.dspy, api/receivables_list.dspy,
# api/receivables_create.dspy, api/receivables_update.dspy,
# api/receivables_delete.dspy
"financial_management_read": {
"/financial_management/index.ui": [
"sales_director", "sales_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/receivables.ui": [
"sales_director", "sales_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/receivable_edit.ui": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/financial_management/payments.ui": [
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/receipts.ui": [
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/financial_vouchers.ui": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
},
"financial_management_api_read": {
"/financial_management/api/receivables.dspy": [
"sales_director", "sales_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/api/receivables_list.dspy": [
"sales_director", "sales_manager",
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/api/debug_receivables.dspy": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/financial_management/api/test_env.dspy": [
"admin_superuser",
],
},
"financial_management_api_write": {
"/financial_management/api/receivables_create.dspy": [
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
"/financial_management/api/receivables_update.dspy": [
"finance_director", "finance_manager", "accountant", "cashier",
"admin_superuser",
],
"/financial_management/api/receivables_delete.dspy": [
"finance_director", "finance_manager",
"admin_superuser",
],
},
# ========================================================
# 审批管理 Workflow Approval (placeholder)
# ========================================================
"workflow_approval": {
"/workflow_approval/**": [
"sales_director", "sales_manager",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
},
# ========================================================
# 统一仪表盘 Unified Dashboard (placeholder)
# ========================================================
"unified_dashboard": {
"/unified_dashboard/**": [
"sales_director", "sales_manager", "sales_rep",
"marketing_director", "marketing_manager",
"ops_director", "ops_manager",
"finance_director", "finance_manager", "accountant",
"admin_superuser",
],
},
# ========================================================
# RBAC Admin (system admin only)
# ========================================================
"rbac_admin": {
"/rbac/**": ["admin_superuser"],
},
# ========================================================
# AppBase (system admin + department directors)
# ========================================================
"appbase": {
"/appbase/**": [
"admin_superuser",
"sales_director", "marketing_director",
"ops_director", "finance_director",
],
},
# ========================================================
# Main app pages (login redirect, base layout)
# ========================================================
"main_app": {
"/main/base.ui": ["logined"], # All logged-in users
"/main/index.ui": ["logined"],
},
}
# ============================================================
# CRUD TABLE PERMISSIONS
# ============================================================
# Maps module -> list of database tables. The init script registers
# CRUD API endpoints for each table.
CRUD_TABLES = {
"customer_management": [
"customers",
"customer_pool",
"customer_handover",
"customer_handover_items",
],
"opportunity_management": [
"opportunities",
"sales_stages",
"opportunity_stage_history",
],
"contract_management": [
"contracts",
"contract_milestones",
"contract_versions",
"contract_attachments",
"orders",
"order_payments",
],
"financial_management": [
"receivables",
"payments",
"receipts",
"receipt_allocations",
"financial_vouchers",
],
}
# ============================================================
# MODULE DIRECTORY MAP
# ============================================================
# Maps module name -> wwwroot subdirectory path for wildcard expansion.
# Used by init_permissions.py to locate files on disk.
MODULE_WWWROOT = {
"customer_management": "customer_management",
"opportunity_management": "opportunity_management",
"contract_management": "contract_management",
"financial_management": "financial_management",
"workflow_approval": "workflow_approval",
"unified_dashboard": "unified_dashboard",
"rbac": "rbac",
"appbase": "appbase",
"bricks": "bricks",
}