From 2d2dad84d0aed513cff8c98731c43b02b12979ce Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 26 May 2026 08:55:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20add=20RBAC=20scripts=20=E2=80=94=20list?= =?UTF-8?q?=5Fpath=5Froles.py=20and=20find=5Funauth=5Ffiles.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/find_unauth_files.py | 184 +++++++++++++++++++++++++++++++++++ scripts/list_path_roles.py | 92 ++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 scripts/find_unauth_files.py create mode 100644 scripts/list_path_roles.py diff --git a/scripts/find_unauth_files.py b/scripts/find_unauth_files.py new file mode 100644 index 0000000..cd98de0 --- /dev/null +++ b/scripts/find_unauth_files.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python +""" +遍历 wwwroot 下所有通过 ln -s 链接进来的目录, +找出没有任何角色拥有权限的文件。 + +用法: + ./py3/bin/python find_unauth_files.py [wwwroot_path] + +示例: + # 默认 sage/wwwroot + ./py3/bin/python find_unauth_files.py + + # 指定路径 + ./py3/bin/python find_unauth_files.py /home/hermesai/repos/sage/wwwroot + +说明: + - 只扫描 wwwroot 下直接子目录中是符号链接的目录(ln -s 指向的) + - 跟随符号链接遍历(followlinks=True),防环 + - 检查每个文件是否在 permission 表中有任意角色关联 + - 输出未授权文件清单 + + 注意:此脚本只读数据库,不做任何修改。 +""" +import os +import sys +import asyncio + +sage_root = os.environ.get('SAGE_ROOT') +if sage_root and sage_root not in sys.path: + sys.path.insert(0, sage_root) + +from sqlor.dbpools import DBPools +from appPublic.jsonConfig import getConfig + + +def find_symlink_dirs(wwwroot): + """找出 wwwroot 下直接子目录中是符号链接的目录。""" + symlinks = [] + for entry in sorted(os.listdir(wwwroot)): + full = os.path.join(wwwroot, entry) + if os.path.islink(full) and os.path.isdir(full): + target = os.readlink(full) + symlinks.append((entry, full, target)) + return symlinks + + +def walk_symlink_dirs(wwwroot, symlinks): + """ + 遍历所有符号链接目录下的文件,返回 (relative_path, absolute_path) 列表。 + relative_path 是相对于 wwwroot 的路径,格式如 /harnessed_agent/index.ui + 防环:跟踪已访问的真实路径。 + """ + real_wwwroot = os.path.realpath(wwwroot) + visited_real = set() + files = [] + + for name, link_path, target in symlinks: + for root, dirs, filenames in os.walk(link_path, followlinks=True): + real_root = os.path.realpath(root) + + # 防环:回到 wwwroot 自身或其他已访问路径 + if real_root in visited_real: + dirs.clear() + continue + visited_real.add(real_root) + + # 排除指向 wwwroot 自身的子目录 + dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(root, d)) != real_wwwroot] + + for fname in sorted(filenames): + abs_path = os.path.join(root, fname) + # 相对于 wwwroot 的路径,带前导 / + rel = '/' + os.path.relpath(abs_path, wwwroot) + files.append((rel, abs_path)) + + return files + + +async def main(): + wwwroot = sys.argv[1] if len(sys.argv) > 1 else None + if not wwwroot: + # 默认: sage_root/wwwroot + if sage_root: + wwwroot = os.path.join(sage_root, 'wwwroot') + else: + # 尝试从当前目录推断 + wwwroot = os.path.join(os.getcwd(), 'wwwroot') + + wwwroot = os.path.abspath(wwwroot) + if not os.path.isdir(wwwroot): + print(f"错误: wwwroot 目录不存在: {wwwroot}") + sys.exit(1) + + print(f"wwwroot: {wwwroot}") + print(f"SAGE_ROOT: {sage_root or '(未设置)'}") + + # 1. 找符号链接目录 + symlinks = find_symlink_dirs(wwwroot) + if not symlinks: + print("\n未发现任何符号链接目录。") + sys.exit(0) + + print(f"\n发现 {len(symlinks)} 个符号链接目录:") + for name, full, target in symlinks: + print(f" {name}/ -> {target}") + + # 2. 遍历所有文件 + all_files = walk_symlink_dirs(wwwroot, symlinks) + print(f"\n共 {len(all_files)} 个文件,开始检查权限...\n") + + # 3. 批量加载所有 permission + rolepermission 记录到内存 + config = getConfig('.') + db = DBPools(config.databases) + + async with db.sqlorContext('sage') as sor: + # 加载所有 permission 记录 + perm_recs = await sor.sqlExe("SELECT id, path FROM permission") + # 构建 path -> perm_id 映射 + path_to_permid = {} + for r in perm_recs: + path_to_permid[r.path] = r.id + + # 加载所有 rolepermission 记录 + rp_recs = await sor.sqlExe("SELECT permid FROM rolepermission") + # 构建 perm_id -> 有权限 的标记 + perms_with_roles = set() + for r in rp_recs: + perms_with_roles.add(r.permid) + + print(f" permission 表共 {len(path_to_permid)} 条记录") + print(f" 有角色关联的 permission 共 {len(perms_with_roles)} 条") + + # 4. 检查每个文件 + unauth_files = [] + authed_files = [] + + for rel_path, abs_path in all_files: + permid = path_to_permid.get(rel_path) + if permid is None: + # 路径完全未注册 + unauth_files.append((rel_path, abs_path, "路径未注册")) + elif permid not in perms_with_roles: + # 路径有 permission 记录但没有任何角色关联 + unauth_files.append((rel_path, abs_path, "有记录但无角色")) + else: + authed_files.append(rel_path) + + # 5. 输出结果 + print(f"\n{'='*80}") + print(f"结果汇总:") + print(f" 已授权文件: {len(authed_files)}") + print(f" 未授权文件: {len(unauth_files)}") + print(f"{'='*80}") + + if not unauth_files: + print("\n所有文件均有权限覆盖,无需处理。") + return + + print(f"\n未授权文件清单 ({len(unauth_files)} 个):\n") + + # 按模块目录分组 + from collections import defaultdict + by_module = defaultdict(list) + for rel_path, abs_path, reason in unauth_files: + # 提取模块名: /harnessed_agent/xxx -> harnessed_agent + parts = rel_path.strip('/').split('/') + module = parts[0] if parts else 'root' + by_module[module].append((rel_path, abs_path, reason)) + + for module in sorted(by_module.keys()): + items = by_module[module] + print(f"\n--- {module}/ ({len(items)} 个文件) ---") + for rel_path, abs_path, reason in items: + print(f" [{reason}] {rel_path}") + + # 也输出纯路径列表(方便管道处理) + print(f"\n{'='*80}") + print("纯路径列表(方便复制):") + for rel_path, abs_path, reason in unauth_files: + print(f" {rel_path}") + + +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main()) diff --git a/scripts/list_path_roles.py b/scripts/list_path_roles.py new file mode 100644 index 0000000..5d4e317 --- /dev/null +++ b/scripts/list_path_roles.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +""" +列出指定路径拥有权限的所有角色。 + +用法: + ./py3/bin/python list_path_roles.py + +示例: + ./py3/bin/python list_path_roles.py /harnessed_agent/index.ui + ./py3/bin/python list_path_roles.py /bricks/css/bricks.css + +说明: + - 精确匹配路径 + - 如果路径未注册,会提示 + - 显示角色 id、orgtypeid、name +""" +import os +import sys +import asyncio + +sage_root = os.environ.get('SAGE_ROOT') +if sage_root and sage_root not in sys.path: + sys.path.insert(0, sage_root) + +from sqlor.dbpools import DBPools +from appPublic.jsonConfig import getConfig + + +async def main(): + if len(sys.argv) < 2: + print(f"用法: {sys.argv[0]} ") + print(f"示例: {sys.argv[0]} /harnessed_agent/index.ui") + sys.exit(1) + + target_path = sys.argv[1] + if not target_path.startswith('/'): + target_path = '/' + target_path + + config = getConfig('.') + db = DBPools(config.databases) + + async with db.sqlorContext('sage') as sor: + # 1. 查找 permission 记录 + perm_recs = await sor.R('permission', {'path': target_path}) + if not perm_recs: + print(f"[未找到] 路径 '{target_path}' 未在 permission 表中注册。") + + # 尝试 LIKE 模糊匹配 + like_path = target_path.rstrip('/') + '/%' + like_recs = await sor.sqlExe("SELECT path FROM permission WHERE path LIKE %s", (like_path,)) + if like_recs: + print(f"\n模糊匹配到 {len(like_recs)} 个子路径(LIKE '{like_path}'):") + for r in like_recs[:20]: + print(f" {r.path}") + if len(like_recs) > 20: + print(f" ... 共 {len(like_recs)} 条") + sys.exit(0) + + perm = perm_recs[0] + print(f"路径: {perm.path}") + print(f"perm_id: {perm.id}") + + # 2. 查找 rolepermission 关联 + rp_recs = await sor.R('rolepermission', {'permid': perm.id}) + if not rp_recs: + print(" [无角色] 该路径没有任何角色权限记录。") + sys.exit(0) + + print(f"\n共 {len(rp_recs)} 个角色有权限:\n") + print(f"{'角色ID':<30} {'orgtypeid':<20} {'名称':<30}") + print("-" * 80) + + for rp in rp_recs: + roleid = rp.roleid + # 特殊角色直接显示 + if roleid in ('any', 'anonymous', 'logined'): + print(f"{roleid:<30} {'*':<20} {roleid:<30}") + continue + + # 普通角色查 role 表 + role_recs = await sor.R('role', {'id': roleid}) + if role_recs: + r = role_recs[0] + orgtypeid = getattr(r, 'orgtypeid', '*') + name = getattr(r, 'name', roleid) + print(f"{roleid:<30} {orgtypeid:<20} {name:<30}") + else: + print(f"{roleid:<30} {'(未知)':<20} {'(role表中不存在)':<30}") + + +if __name__ == '__main__': + asyncio.get_event_loop().run_until_complete(main())