integrated_crm_app/app/init_permissions.py
yumoqing 1fc2be73c0 fix: init_permissions.py - use config.json databases for DBPools, add id for rolepermission insert
1. Replace manual DB_CONFIG with getConfig('.').databases for DBPools init
2. Add id=getID() when inserting into rolepermission table (id has no default value)
3. Sync app/ and root copies
2026-05-05 14:08:49 +08:00

420 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Integrated CRM - RBAC 权限初始化脚本
功能:
1. 遍历 wwwroot 下所有文件(含子目录和 ln -s 链接目录),将路径写入 permission 表
2. 为各机构的角色设置每个 path 的权限(通过 orgtypeid='*' 通配所有机构)
3. 初始化 admin_superuser 用户(用户名 super密码 Kyy@123456
依赖 RBAC 通配机制:
- 用户角色会展开为 orgtypeid.name、orgtypeid.*、*.name 三种 key
- 授权给 orgtypeid='*' 的角色,等于授权给所有机构的同名角色
- *.admin_superuser 匹配所有机构的 admin_superuser 角色
使用方法:
cd ~/repos/integrated_crm_app
source py3/bin/activate
python app/init_permissions.py
部署流程:
1. 停止应用pkill -f integrated_crm_app.py
2. 执行: python app/init_permissions.py
3. 启动应用nohup python app/integrated_crm_app.py --port 8080 &
"""
import asyncio
import os
import sys
import importlib.util
# ============================================================
# 配置
# ============================================================
APP_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, APP_ROOT)
# perm_config.py 路径
PERM_CONFIG_PATH = os.path.join(APP_ROOT, "perm_config.py")
# wwwroot 根目录(含符号链接子目录)
WWWROOT_BASE = os.path.join(APP_ROOT, "wwwroot")
# admin_superuser 初始账号密码
ADMIN_USERNAME = "super"
ADMIN_PASSWORD = "Kyy@123456"
# ============================================================
# 加载 perm_config
# ============================================================
def load_perm_config():
spec = importlib.util.spec_from_file_location("perm_config", PERM_CONFIG_PATH)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ============================================================
# 遍历 wwwroot 所有文件(含 ln -s 链接目录)
# ============================================================
def scan_wwwroot(wwwroot_path):
"""
遍历 wwwroot 目录下所有文件(递归,跟随符号链接)。
返回 URL 路径列表,如 ['/login.ui', '/customer_management/customer_list.ui']。
关键:
1. os.walk 默认不跟随符号链接,需要 followlinks=True
2. 排除 self-referencing symlinks如 main -> wwwroot防止死循环
3. /main 前缀由 config.json paths 映射产生,此处返回相对路径即可,
脚本会自动注册 /xxx 和 /main/xxx 双路径。
"""
paths = []
if not os.path.isdir(wwwroot_path):
print(f" [WARN] wwwroot not found: {wwwroot_path}")
return paths
real_wwwroot = os.path.realpath(wwwroot_path)
visited_real = set()
for root, dirs, files in os.walk(wwwroot_path, followlinks=True):
real_root = os.path.realpath(root)
if real_root in visited_real:
dirs.clear() # 防止进入循环目录
continue
visited_real.add(real_root)
# 过滤掉指向 wwwroot 自身的符号链接目录
safe_dirs = []
for d in dirs:
real_d = os.path.realpath(os.path.join(root, d))
if real_d != real_wwwroot and real_d not in visited_real:
safe_dirs.append(d)
dirs[:] = safe_dirs
for f in files:
# 注册 .ui, .dspy, .js, .css 文件的权限
if not (f.endswith('.ui') or f.endswith('.dspy') or f.endswith('.js') or f.endswith('.css')):
continue
full = os.path.join(root, f)
rel = os.path.relpath(full, wwwroot_path)
url_path = '/' + rel.replace(os.sep, '/')
paths.append(url_path)
return sorted(paths)
# ============================================================
# 数据库操作sqlor CRUD
# ============================================================
async def ensure_permission(sor, path, permtype='page'):
"""确保 permission 记录存在,返回 permission ID。"""
recs = await sor.R('permission', {'path': path})
if recs:
return recs[0].id
from appPublic.uniqueID import getID
permid = getID()
await sor.C('permission', {
'id': permid,
'path': path,
'permtype': permtype,
'parentid': '',
'description': f'Auto-generated for {path}',
})
return permid
async def ensure_role(sor, role_id, name, desc='', orgtypeid='*'):
"""
确保角色存在。
匹配顺序:中文名 -> 英文ID -> 创建新角色。
默认 orgtypeid='*' 表示通配所有机构。
"""
# 按中文名匹配
recs = await sor.R('role', {'name': name})
if recs:
return recs[0].id
# 按英文ID匹配
recs = await sor.R('role', {'id': role_id})
if recs:
return recs[0].id
# 创建新角色
from appPublic.uniqueID import getID
rid = role_id
await sor.C('role', {
'id': rid,
'orgtypeid': orgtypeid,
'name': name,
'role_name': role_id,
'description': desc,
})
print(f" [CREATED] role: {role_id} ({name}) orgtypeid={orgtypeid} id={rid}")
return rid
async def grant_perm(sor, role_id, perm_id):
"""授予角色权限(幂等)。"""
existing = await sor.R('rolepermission', {'roleid': role_id, 'permid': perm_id})
if not existing:
from appPublic.uniqueID import getID
await sor.C('rolepermission', {
'id': getID(),
'roleid': role_id,
'permid': perm_id,
})
def dual_paths(path):
"""返回路径列表。只使用原始路径,不加 /main 前缀。"""
# 如果路径以 /main 开头,去掉 /main 前缀(统一使用无前缀路径)
if path.startswith('/main'):
cleaned = path[5:] # '/main/xxx' -> '/xxx'
if not cleaned:
cleaned = '/'
return [cleaned]
return [path]
# ============================================================
# 构建路径->角色映射
# ============================================================
def build_path_role_map(perm_config, all_files):
"""
根据 perm_config.PERMISSION_MATRIX 和扫描到的文件列表,
构建 {url_path: [role_ids]} 映射。
对于非通配路径(精确路径),如果该路径在 all_files 中,加入映射。
对于通配路径(/module/**),展开到该目录下所有匹配文件。
"""
path_roles = {} # path -> set of role_ids
for _section, entries in perm_config.PERMISSION_MATRIX.items():
for pattern, roles in entries.items():
if '**' in pattern:
# 通配模式:展开到 wwwroot 下匹配的文件
base = pattern.replace('/**', '').lstrip('/')
prefix = '/' + base if base else ''
for fp in all_files:
if fp.startswith(prefix):
if fp not in path_roles:
path_roles[fp] = set()
path_roles[fp].update(roles)
else:
# 精确路径:如果该文件存在,加入映射
if pattern in all_files:
if pattern not in path_roles:
path_roles[pattern] = set()
path_roles[pattern].update(roles)
return {k: list(v) for k, v in path_roles.items()}
# ============================================================
# 主流程
# ============================================================
async def main():
print("=" * 65)
print("Integrated CRM - RBAC 权限初始化")
print("=" * 65)
# [1] 加载配置
pc = load_perm_config()
print(f"\n[1/6] 加载 perm_config.py")
print(f" 角色定义: {len(pc.ROLES)}")
print(f" 权限配置段: {len(pc.PERMISSION_MATRIX)}")
# [2] 扫描 wwwroot 所有文件
all_files = scan_wwwroot(WWWROOT_BASE)
print(f"\n[2/6] 扫描 wwwroot 文件(含符号链接): {len(all_files)}")
if all_files:
print(f" 示例: {all_files[0]}")
if len(all_files) > 1:
print(f" ...")
print(f" {all_files[-1]}")
# [3] 构建 路径->角色 映射
path_role_map = build_path_role_map(pc, all_files)
print(f"\n[3/6] 路径-角色映射: {len(path_role_map)} 个路径需要授权")
# [4] 连接数据库:用 config.json 中的 databases 初始化 DBPools
from appPublic.jsonConfig import getConfig
from sqlor.dbpools import DBPools
config = getConfig('.')
DBPools(config.databases)
db = DBPools()
# 获取数据库名config.json databases 中定义的第一个/默认数据库名)
dbname = list(config.databases.keys())[0]
print(f"\n[4/6] 连接数据库 {dbname}...")
async with db.sqlorContext(dbname) as sor:
# ---- 4a: 创建/验证所有角色 ----
# 关键:所有角色使用 orgtypeid='*' 创建,配合 RBAC 通配机制
print(f"\n [4a] 注册角色orgtypeid='*' 通配所有机构)...")
role_ids = {}
for rid, (name, desc) in pc.ROLES.items():
role_ids[rid] = await ensure_role(sor, rid, name, desc, orgtypeid='*')
# ---- 4b: 注册 permission + 授权 ----
print(f"\n [4b] 注册权限并授权...")
perm_count = 0
grant_count = 0
error_count = 0
for path, roles in path_role_map.items():
for p in dual_paths(path):
try:
permid = await ensure_permission(sor, p, permtype='page')
perm_count += 1
for rn in roles:
if rn in ('any', 'logined'):
continue
if rn not in role_ids:
print(f" [WARN] 未知角色: {rn} (path: {p})")
error_count += 1
continue
await grant_perm(sor, role_ids[rn], permid)
grant_count += 1
except Exception as e:
print(f" [ERROR] {p}: {e}")
error_count += 1
print(f" 注册权限: {perm_count}, 授权次数: {grant_count}, 错误: {error_count}")
# ---- 4c: 注册 CRUD API 权限 ----
print(f"\n [4c] 注册 CRUD API 权限...")
crud_count = 0
for module, tables in pc.CRUD_TABLES.items():
for table in tables:
for action in ('list', 'create', 'update', 'delete'):
path = f"/{module}/api/{table}_{action}.dspy"
try:
permid = await ensure_permission(sor, path, permtype='api')
await grant_perm(sor, role_ids['admin_superuser'], permid)
crud_count += 1
except Exception:
pass
print(f" 注册 CRUD API: {crud_count}")
# ---- 4d: 同步 admin_superuser 权限到各机构角色 ----
print(f"\n [4d] 同步 admin_superuser 权限到各机构已有角色...")
admin_perms = await sor.R('rolepermission', {'roleid': role_ids['admin_superuser']})
sync_count = 0
# 查找所有 orgtypeid='customer' 且名称为 admin/superuser 的角色
customer_roles = await sor.R('role', {'orgtypeid': 'customer'})
for r in customer_roles:
if r.name in ('admin', 'superuser', '系统管理员'):
for g in admin_perms:
await grant_perm(sor, r.id, g.permid)
sync_count += 1
print(f" 同步授权: {sync_count}")
# ---- 5: 创建 admin_superuser 用户 ----
print(f"\n[5/6] 创建 admin_superuser 用户...")
await create_admin_user(sor, ADMIN_USERNAME, ADMIN_PASSWORD, role_ids['admin_superuser'])
# ---- 6: 汇总 ----
print(f"\n[6/6] 初始化汇总")
print("-" * 65)
print(f" 文件扫描: {len(all_files)}")
print(f" 路径注册: {len(path_role_map)} 个原始路径 → {perm_count} 个(含/main变体")
print(f" 角色配置: {len(role_ids)}")
print(f" 权限授权: {grant_count}")
print(f" CRUD API: {crud_count}")
print(f" 机构同步: {sync_count}")
print(f" 管理员账号: {ADMIN_USERNAME} / {ADMIN_PASSWORD}")
print(f" 错误: {error_count}")
print()
print(" 重要:重启应用以刷新 RBAC 权限缓存!")
print(" pkill -f integrated_crm_app.py")
print(" nohup python app/integrated_crm_app.py --port 8080 &")
print()
# 角色权限数量统计
print("角色权限明细:")
print("-" * 65)
for rid, dbid in role_ids.items():
name, desc = pc.ROLES[rid]
count = len(await sor.R('rolepermission', {'roleid': dbid}))
print(f" {rid:25s} | {name:10s} | {count:4d} 条权限 | {desc}")
async def create_admin_user(sor, username, password, superuser_role_id):
"""
创建 admin_superuser 用户并分配角色。
使用 password_encode() 对密码进行 bcrypt 加密。
"""
from ahserver.globalEnv import password_encode
from appPublic.uniqueID import getID
from appPublic.timeUtils import timestampstr
# 检查用户是否已存在
recs = await sor.R('users', {'username': username})
if recs:
# 更新密码
uid = recs[0].id
encoded_pw = password_encode(password)
await sor.U('users', {'id': uid, 'password': encoded_pw})
print(f" 用户 {username} 已存在,已更新密码")
# 检查是否已有角色
existing_roles = await sor.R('userrole', {'userid': uid, 'roleid': superuser_role_id})
if not existing_roles:
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': superuser_role_id,
})
print(f" 已添加 admin_superuser 角色")
return uid
# 创建新用户
uid = getID()
encoded_pw = password_encode(password)
now = timestampstr()
# 创建用户orgid 与 userid 相同,表示自属机构)
await sor.C('users', {
'id': uid,
'username': username,
'password': encoded_pw,
'orgid': uid,
'nick_name': '系统管理员',
'created_at': now,
'login_fail_count': 0,
})
# 创建对应机构
await sor.C('organization', {
'id': uid,
'orgname': '系统管理',
})
await sor.C('orgtypes', {
'id': getID(),
'orgid': uid,
'orgtypeid': 'platform',
})
# 分配 admin_superuser 角色
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': superuser_role_id,
})
print(f" [CREATED] user: {username} id={uid} (密码已bcrypt加密)")
return uid
if __name__ == "__main__":
asyncio.run(main())