diff --git a/app/init_permissions.py b/app/init_permissions.py deleted file mode 100644 index e976c38..0000000 --- a/app/init_permissions.py +++ /dev/null @@ -1,427 +0,0 @@ -#!/usr/bin/env python3 -""" -Integrated CRM - RBAC 权限初始化脚本 - -功能: - 1. 遍历 wwwroot 下所有文件(含子目录和 ln -s 链接目录),将路径写入 permission 表 - 2. 为各机构的角色设置每个 path 的权限(通过 orgtypeid='*' 通配所有机构) - 3. 初始化 admin_superuser 用户(用户名 super,密码 Kyy@123456) - -依赖 RBAC 通配机制: - - 用户角色会展开为 orgtypeid.name、orgtypeid.*、*.name 三种 key - - 授权给 orgtypeid='*' 的角色,等于授权给所有机构的同名角色 - - *.admin_superuser 匹配所有机构的 admin_superuser 角色 - -使用方法: - cd ~/repos/integrated_crm_app - source py3/bin/activate - python app/init_permissions.py - -部署流程: - 1. 停止应用:pkill -f integrated_crm_app.py - 2. 执行: python app/init_permissions.py - 3. 启动应用:nohup python app/integrated_crm_app.py --port 8080 & -""" - -import asyncio -import os -import sys -import importlib.util - -# ============================================================ -# 配置 -# ============================================================ - -_script_dir = os.path.dirname(os.path.abspath(__file__)) -# 脚本可能在项目根目录或 app/ 子目录,统一指向项目根 -if os.path.basename(_script_dir) == 'app': - APP_ROOT = os.path.dirname(_script_dir) -else: - APP_ROOT = _script_dir -sys.path.insert(0, APP_ROOT) - -# perm_config.py 路径 -PERM_CONFIG_PATH = os.path.join(APP_ROOT, "perm_config.py") - -# wwwroot 根目录(含符号链接子目录) -WWWROOT_BASE = os.path.join(APP_ROOT, "wwwroot") - -# admin_superuser 初始账号密码 -ADMIN_USERNAME = "super" -ADMIN_PASSWORD = "Kyy@123456" - - -# ============================================================ -# 加载 perm_config -# ============================================================ - -def load_perm_config(): - spec = importlib.util.spec_from_file_location("perm_config", PERM_CONFIG_PATH) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) - return mod - - -# ============================================================ -# 遍历 wwwroot 所有文件(含 ln -s 链接目录) -# ============================================================ - -def scan_wwwroot(wwwroot_path): - """ - 遍历 wwwroot 目录下所有文件(递归,跟随符号链接)。 - 返回 URL 路径列表,如 ['/login.ui', '/customer_management/customer_list.ui']。 - - 关键: - 1. os.walk 默认不跟随符号链接,需要 followlinks=True - 2. 排除 self-referencing symlinks(如 main -> wwwroot)防止死循环 - 3. /main 前缀由 config.json paths 映射产生,此处返回相对路径即可, - 脚本会自动注册 /xxx 和 /main/xxx 双路径。 - """ - paths = [] - if not os.path.isdir(wwwroot_path): - print(f" [WARN] wwwroot not found: {wwwroot_path}") - return paths - - real_wwwroot = os.path.realpath(wwwroot_path) - visited_real = set() - - for root, dirs, files in os.walk(wwwroot_path, followlinks=True): - real_root = os.path.realpath(root) - if real_root in visited_real: - dirs.clear() # 防止进入循环目录 - continue - visited_real.add(real_root) - - # 过滤掉指向 wwwroot 自身的符号链接目录 - safe_dirs = [] - for d in dirs: - real_d = os.path.realpath(os.path.join(root, d)) - if real_d != real_wwwroot and real_d not in visited_real: - safe_dirs.append(d) - dirs[:] = safe_dirs - - for f in files: - # 注册 .ui, .dspy, .js, .css 文件的权限 - if not (f.endswith('.ui') or f.endswith('.dspy') or f.endswith('.js') or f.endswith('.css')): - continue - full = os.path.join(root, f) - rel = os.path.relpath(full, wwwroot_path) - url_path = '/' + rel.replace(os.sep, '/') - paths.append(url_path) - - return sorted(paths) - - -# ============================================================ -# 数据库操作(sqlor CRUD) -# ============================================================ - -async def ensure_permission(sor, path, permtype='page'): - """确保 permission 记录存在,返回 permission ID。""" - recs = await sor.R('permission', {'path': path}) - if recs: - return recs[0].id - - from appPublic.uniqueID import getID - permid = getID() - await sor.C('permission', { - 'id': permid, - 'path': path, - 'permtype': permtype, - 'parentid': '', - 'description': f'Auto-generated for {path}', - }) - return permid - - -async def ensure_role(sor, role_id, name, desc='', orgtypeid='*'): - """ - 确保角色存在。 - 匹配顺序:中文名 -> 英文ID -> 创建新角色。 - 默认 orgtypeid='*' 表示通配所有机构。 - """ - # 按中文名匹配 - recs = await sor.R('role', {'name': name}) - if recs: - return recs[0].id - - # 按英文ID匹配 - recs = await sor.R('role', {'id': role_id}) - if recs: - return recs[0].id - - # 创建新角色 - from appPublic.uniqueID import getID - rid = role_id - await sor.C('role', { - 'id': rid, - 'orgtypeid': orgtypeid, - 'name': name, - 'role_name': role_id, - 'description': desc, - }) - print(f" [CREATED] role: {role_id} ({name}) orgtypeid={orgtypeid} id={rid}") - return rid - - -async def grant_perm(sor, role_id, perm_id): - """授予角色权限(幂等)。""" - existing = await sor.R('rolepermission', {'roleid': role_id, 'permid': perm_id}) - if not existing: - from appPublic.uniqueID import getID - await sor.C('rolepermission', { - 'id': getID(), - 'roleid': role_id, - 'permid': perm_id, - }) - - -def dual_paths(path): - """返回路径列表。只使用原始路径,不加 /main 前缀。""" - # 如果路径以 /main 开头,去掉 /main 前缀(统一使用无前缀路径) - if path.startswith('/main'): - cleaned = path[5:] # '/main/xxx' -> '/xxx' - if not cleaned: - cleaned = '/' - return [cleaned] - return [path] - - -# ============================================================ -# 构建路径->角色映射 -# ============================================================ - -def build_path_role_map(perm_config, all_files): - """ - 根据 perm_config.PERMISSION_MATRIX 和扫描到的文件列表, - 构建 {url_path: [role_ids]} 映射。 - - 对于非通配路径(精确路径),如果该路径在 all_files 中,加入映射。 - 对于通配路径(/module/**),展开到该目录下所有匹配文件。 - """ - path_roles = {} # path -> set of role_ids - - for _section, entries in perm_config.PERMISSION_MATRIX.items(): - for pattern, roles in entries.items(): - if '**' in pattern: - # 通配模式:展开到 wwwroot 下匹配的文件 - base = pattern.replace('/**', '').lstrip('/') - prefix = '/' + base if base else '' - for fp in all_files: - if fp.startswith(prefix): - if fp not in path_roles: - path_roles[fp] = set() - path_roles[fp].update(roles) - else: - # 精确路径:如果该文件存在,加入映射 - if pattern in all_files: - if pattern not in path_roles: - path_roles[pattern] = set() - path_roles[pattern].update(roles) - - return {k: list(v) for k, v in path_roles.items()} - - -# ============================================================ -# 主流程 -# ============================================================ - -async def main(): - print("=" * 65) - print("Integrated CRM - RBAC 权限初始化") - print("=" * 65) - - # [1] 加载配置 - pc = load_perm_config() - print(f"\n[1/6] 加载 perm_config.py") - print(f" 角色定义: {len(pc.ROLES)}") - print(f" 权限配置段: {len(pc.PERMISSION_MATRIX)}") - - # [2] 扫描 wwwroot 所有文件 - all_files = scan_wwwroot(WWWROOT_BASE) - print(f"\n[2/6] 扫描 wwwroot 文件(含符号链接): {len(all_files)} 个") - if all_files: - print(f" 示例: {all_files[0]}") - if len(all_files) > 1: - print(f" ...") - print(f" {all_files[-1]}") - - # [3] 构建 路径->角色 映射 - path_role_map = build_path_role_map(pc, all_files) - print(f"\n[3/6] 路径-角色映射: {len(path_role_map)} 个路径需要授权") - - # [4] 连接数据库:用 config.json 中的 databases 初始化 DBPools - from appPublic.jsonConfig import getConfig - from sqlor.dbpools import DBPools - config = getConfig('.') - DBPools(config.databases) - db = DBPools() - # 获取数据库名(config.json databases 中定义的第一个/默认数据库名) - dbname = list(config.databases.keys())[0] - print(f"\n[4/6] 连接数据库 {dbname}...") - - async with db.sqlorContext(dbname) as sor: - - # ---- 4a: 创建/验证所有角色 ---- - # 关键:所有角色使用 orgtypeid='*' 创建,配合 RBAC 通配机制 - print(f"\n [4a] 注册角色(orgtypeid='*' 通配所有机构)...") - role_ids = {} - for rid, (name, desc) in pc.ROLES.items(): - role_ids[rid] = await ensure_role(sor, rid, name, desc, orgtypeid='*') - - # ---- 4b: 注册 permission + 授权 ---- - print(f"\n [4b] 注册权限并授权...") - perm_count = 0 - grant_count = 0 - error_count = 0 - - for path, roles in path_role_map.items(): - for p in dual_paths(path): - try: - permid = await ensure_permission(sor, p, permtype='page') - perm_count += 1 - - for rn in roles: - if rn in ('any', 'logined'): - continue - if rn not in role_ids: - print(f" [WARN] 未知角色: {rn} (path: {p})") - error_count += 1 - continue - await grant_perm(sor, role_ids[rn], permid) - grant_count += 1 - except Exception as e: - print(f" [ERROR] {p}: {e}") - error_count += 1 - - print(f" 注册权限: {perm_count}, 授权次数: {grant_count}, 错误: {error_count}") - - # ---- 4c: 注册 CRUD API 权限 ---- - print(f"\n [4c] 注册 CRUD API 权限...") - crud_count = 0 - for module, tables in pc.CRUD_TABLES.items(): - for table in tables: - for action in ('list', 'create', 'update', 'delete'): - path = f"/{module}/api/{table}_{action}.dspy" - try: - permid = await ensure_permission(sor, path, permtype='api') - await grant_perm(sor, role_ids['admin_superuser'], permid) - crud_count += 1 - except Exception: - pass - print(f" 注册 CRUD API: {crud_count}") - - # ---- 4d: 同步 admin_superuser 权限到各机构角色 ---- - print(f"\n [4d] 同步 admin_superuser 权限到各机构已有角色...") - admin_perms = await sor.R('rolepermission', {'roleid': role_ids['admin_superuser']}) - sync_count = 0 - # 查找所有 orgtypeid='customer' 且名称为 admin/superuser 的角色 - customer_roles = await sor.R('role', {'orgtypeid': 'customer'}) - for r in customer_roles: - if r.name in ('admin', 'superuser', '系统管理员'): - for g in admin_perms: - await grant_perm(sor, r.id, g.permid) - sync_count += 1 - print(f" 同步授权: {sync_count}") - - # ---- 5: 创建 admin_superuser 用户 ---- - print(f"\n[5/6] 创建 admin_superuser 用户...") - await create_admin_user(sor, ADMIN_USERNAME, ADMIN_PASSWORD, role_ids['admin_superuser']) - - # ---- 6: 汇总 ---- - print(f"\n[6/6] 初始化汇总") - print("-" * 65) - print(f" 文件扫描: {len(all_files)} 个") - print(f" 路径注册: {len(path_role_map)} 个原始路径 → {perm_count} 个(含/main变体)") - print(f" 角色配置: {len(role_ids)} 个") - print(f" 权限授权: {grant_count} 次") - print(f" CRUD API: {crud_count} 个") - print(f" 机构同步: {sync_count} 次") - print(f" 管理员账号: {ADMIN_USERNAME} / {ADMIN_PASSWORD}") - print(f" 错误: {error_count}") - print() - print(" 重要:重启应用以刷新 RBAC 权限缓存!") - print(" pkill -f integrated_crm_app.py") - print(" nohup python app/integrated_crm_app.py --port 8080 &") - print() - - # 角色权限数量统计 - print("角色权限明细:") - print("-" * 65) - for rid, dbid in role_ids.items(): - name, desc = pc.ROLES[rid] - count = len(await sor.R('rolepermission', {'roleid': dbid})) - print(f" {rid:25s} | {name:10s} | {count:4d} 条权限 | {desc}") - - -async def create_admin_user(sor, username, password, superuser_role_id): - """ - 创建 admin_superuser 用户并分配角色。 - 使用 password_encode() 对密码进行 bcrypt 加密。 - """ - from ahserver.globalEnv import password_encode - from appPublic.uniqueID import getID - from appPublic.timeUtils import timestampstr - - # 检查用户是否已存在 - recs = await sor.R('users', {'username': username}) - if recs: - uid = recs[0].id - encoded_pw = password_encode(password) - # 用原始 SQL 更新密码,避免 sor.U() 错误生成 userid WHERE 条件 - await sor.sqlExe( - "UPDATE users SET password = ${pw}$ WHERE id = ${uid}$", - {'pw': encoded_pw, 'uid': uid} - ) - print(f" 用户 {username} 已存在,已更新密码") - # 检查是否已有角色 - existing_roles = await sor.R('userrole', {'userid': uid, 'roleid': superuser_role_id}) - if not existing_roles: - await sor.C('userrole', { - 'id': getID(), - 'userid': uid, - 'roleid': superuser_role_id, - }) - print(f" 已添加 admin_superuser 角色") - return uid - - # 创建新用户 - uid = getID() - encoded_pw = password_encode(password) - now = timestampstr() - - # 创建用户(orgid 与 userid 相同,表示自属机构) - await sor.C('users', { - 'id': uid, - 'username': username, - 'password': encoded_pw, - 'orgid': uid, - 'nick_name': '系统管理员', - 'created_at': now, - 'login_fail_count': 0, - }) - - # 创建对应机构 - await sor.C('organization', { - 'id': uid, - 'orgname': '系统管理', - }) - await sor.C('orgtypes', { - 'id': getID(), - 'orgid': uid, - 'orgtypeid': 'platform', - }) - - # 分配 admin_superuser 角色 - await sor.C('userrole', { - 'id': getID(), - 'userid': uid, - 'roleid': superuser_role_id, - }) - - print(f" [CREATED] user: {username} id={uid} (密码已bcrypt加密)") - return uid - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/init_permissions.py b/init_permissions.py index e976c38..1f3b798 100644 --- a/init_permissions.py +++ b/init_permissions.py @@ -32,12 +32,7 @@ import importlib.util # 配置 # ============================================================ -_script_dir = os.path.dirname(os.path.abspath(__file__)) -# 脚本可能在项目根目录或 app/ 子目录,统一指向项目根 -if os.path.basename(_script_dir) == 'app': - APP_ROOT = os.path.dirname(_script_dir) -else: - APP_ROOT = _script_dir +APP_ROOT = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, APP_ROOT) # perm_config.py 路径