From f8c8a4ce4d75250173c6c2a68ac1b87e8b129246 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 26 May 2026 09:32:38 +0800 Subject: [PATCH] refactor: move RBAC tools logic to rbac/rbac_tools.py, dspy files call via request._run_ns --- rbac/init.py | 18 ++- rbac/rbac_tools.py | 228 +++++++++++++++++++++++++++++++++ wwwroot/find_unauth_files.dspy | 124 +----------------- wwwroot/list_path_roles.dspy | 66 +--------- 4 files changed, 248 insertions(+), 188 deletions(-) create mode 100644 rbac/rbac_tools.py diff --git a/rbac/init.py b/rbac/init.py index 7fef733..0e22547 100644 --- a/rbac/init.py +++ b/rbac/init.py @@ -6,19 +6,23 @@ from .orgs import ( ) from .userperm import UserPermissions from .user_stats import get_user_stats +from .rbac_tools import ( + query_path_roles, + scan_unauth_files +) from rbac.check_perm import ( - objcheckperm, + objcheckperm, get_org_users, sor_get_org_users, - checkUserPassword, - register_user, - register_auth_method, - create_org, + checkUserPassword, + register_user, + register_auth_method, + create_org, create_user ) from rbac.set_role_perms import ( sor_add_user_roles, - set_role_perm, + set_role_perm, set_role_perms ) from appPublic.log import debug @@ -78,6 +82,8 @@ def load_rbac(): env.get_owner_orgid = get_owner_orgid env.sor_add_user_roles = sor_add_user_roles env.get_user_stats = get_user_stats + env.query_path_roles = query_path_roles + env.scan_unauth_files = scan_unauth_files # Cache invalidation methods for use after role/permission changes env.invalidate_user_perm_cache = env.userpermissions.invalidate_user_cache env.invalidate_all_perm_caches = env.userpermissions.invalidate_all_user_caches diff --git a/rbac/rbac_tools.py b/rbac/rbac_tools.py new file mode 100644 index 0000000..e766d18 --- /dev/null +++ b/rbac/rbac_tools.py @@ -0,0 +1,228 @@ +""" +RBAC 工具函数 — 权限查询与文件扫描。 + +提供两个功能: +1. 查询指定路径拥有权限的所有角色 +2. 扫描 wwwroot 下符号链接目录中未授权的文件 + +.dspy 文件应调用此模块的函数(通过 request._run_ns),自身不做 import。 +每个函数返回 (title, message, is_error) 三元组,.dspy 据此返回 UiMessage 或 UiError。 +""" +import os +from collections import defaultdict + + +async def query_path_roles(sor, path): + """ + 查询指定路径拥有权限的角色列表。 + + Returns: + (title, message, is_error) + """ + if not path: + return ('错误', '请输入路径', True) + if not path.startswith('/'): + path = '/' + path + + perm_recs = await sor.sqlExe( + "SELECT id, path FROM permission WHERE path=${path}$", + {'path': path} + ) + if not perm_recs: + like_path = path.rstrip('/') + '/%' + like_recs = await sor.sqlExe( + "SELECT path FROM permission WHERE path LIKE ${lp}$", + {'lp': like_path} + ) + msg = f"路径 '{path}' 未在 permission 表中注册。" + if like_recs: + sub = '
'.join([f' {r.path}' for r in like_recs[:10]]) + if len(like_recs) > 10: + sub += f'
... 共 {len(like_recs)} 条' + msg += f'

模糊匹配到 {len(like_recs)} 个子路径:
{sub}' + return ('未找到', msg, True) + + perm = perm_recs[0] + permid = perm.id + + rp_recs = await sor.sqlExe( + "SELECT roleid FROM rolepermission WHERE permid=${permid}$", + {'permid': permid} + ) + if not rp_recs: + msg = f"路径: {path} (perm_id: {permid})

无任何角色拥有此路径权限。" + return ('查询结果', msg, False) + + role_ids = [r.roleid for r in rp_recs] + special = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')] + normal = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')] + + rows = [] + for sp in special: + rows.append(f"{sp}*{sp}") + + if normal: + placeholders = ','.join([f'${i}$' for i in range(len(normal))]) + ns = {f'_{i}': rid for i, rid in enumerate(normal)} + role_recs = await sor.sqlExe( + f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})", + ns + ) + for r in role_recs: + rows.append( + f"{r.id}" + f"{getattr(r, 'orgtypeid', '*')}" + f"{getattr(r, 'name', r.id)}" + ) + + table = ( + "" + "" + "" + "" + "" + "" + + ''.join(rows) + + "
角色IDorgtypeid名称
" + ) + + html = ( + f"

路径: {path} (perm_id: {permid})

" + f"

共 {len(rp_recs)} 个角色有权限:

" + f"{table}" + ) + return ('查询结果', html, False) + + +def find_symlink_dirs(root): + """找出指定目录下直接子目录中是符号链接的目录。""" + symlinks = [] + for entry in sorted(os.listdir(root)): + full = os.path.join(root, entry) + if os.path.islink(full) and os.path.isdir(full): + target = os.readlink(full) + symlinks.append((entry, full, target)) + return symlinks + + +def walk_symlink_dirs(root, symlinks): + """ + 遍历所有符号链接目录下的文件。 + Returns: list of (relative_path, absolute_path) + """ + real_root = os.path.realpath(root) + visited_real = set() + files = [] + for name, link_path, target in symlinks: + for r, dirs, filenames in os.walk(link_path, followlinks=True): + real_r = os.path.realpath(r) + if real_r in visited_real: + dirs.clear() + continue + visited_real.add(real_r) + dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root] + for fname in sorted(filenames): + abs_path = os.path.join(r, fname) + rel = '/' + os.path.relpath(abs_path, root) + files.append((rel, abs_path)) + return files + + +def resolve_wwwroot(wwwroot_param, file_path): + """自动定位 wwwroot 目录。""" + if wwwroot_param: + wwwroot_param = os.path.abspath(wwwroot_param) + if os.path.isdir(wwwroot_param): + return wwwroot_param, None + return None, f"wwwroot 目录不存在: {wwwroot_param}" + + sage_root = os.environ.get('SAGE_ROOT') + if sage_root: + wr = os.path.join(sage_root, 'wwwroot') + if os.path.isdir(wr): + return wr, None + return None, f"SAGE_ROOT 指向的 wwwroot 不存在: {wr}" + + if '/wwwroot/' in file_path: + idx = file_path.index('/wwwroot/') + wr = file_path[:idx + len('/wwwroot')] + if os.path.isdir(wr): + return wr, None + + candidate = file_path + for _ in range(5): + candidate = os.path.dirname(candidate) + if not candidate or candidate == '/': + break + wr = os.path.join(candidate, 'wwwroot') + if os.path.isdir(wr): + return wr, None + + return None, '无法自动定位 wwwroot 目录,请传入 wwwroot 参数或设置 SAGE_ROOT 环境变量。' + + +async def scan_unauth_files(sor, wwwroot_param, file_path): + """ + 扫描 wwwroot 下符号链接目录中未授权的文件。 + + Returns: + (title, message, is_error) + """ + wwwroot, err = resolve_wwwroot(wwwroot_param, file_path) + if err: + return ('错误', err, True) + + symlinks = find_symlink_dirs(wwwroot) + if not symlinks: + return ('扫描结果', f"在 {wwwroot} 中未发现任何符号链接目录。", False) + + all_files = walk_symlink_dirs(wwwroot, symlinks) + symlink_info = '
'.join([ + f" {name}/ → {target}" for name, _, target in symlinks + ]) + + perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {}) + path_to_permid = {r.path: r.id for r in perm_recs} + + rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {}) + perms_with_roles = {r.permid for r in rp_recs} + + unauth = [] + authed = 0 + for rel_path, abs_path in all_files: + permid = path_to_permid.get(rel_path) + if permid is None: + unauth.append((rel_path, '路径未注册')) + elif permid not in perms_with_roles: + unauth.append((rel_path, '有记录但无角色')) + else: + authed += 1 + + by_module = defaultdict(list) + for rel_path, reason in unauth: + parts = rel_path.strip('/').split('/') + module = parts[0] if parts else 'root' + by_module[module].append((rel_path, reason)) + + html = ( + f"

wwwroot: {wwwroot}

" + f"

符号链接目录 ({len(symlinks)}):
{symlink_info}

" + f"

总文件数: {len(all_files)} | 已授权: {authed} | 未授权: {len(unauth)}

" + ) + + if not unauth: + html += "

所有文件均有权限覆盖,无需处理。

" + else: + html += f"

未授权文件 ({len(unauth)} 个):

" + for module in sorted(by_module.keys()): + items = by_module[module] + html += f"

{module}/ ({len(items)} 个文件)

" + html += "" + + return ('扫描结果', html, False) diff --git a/wwwroot/find_unauth_files.dspy b/wwwroot/find_unauth_files.dspy index 6cf69b4..4069989 100644 --- a/wwwroot/find_unauth_files.dspy +++ b/wwwroot/find_unauth_files.dspy @@ -1,124 +1,8 @@ -# 查找 wwwroot 下符号链接目录中没有任何角色权限的文件 -import os - wwwroot = params_kw.get('wwwroot', '').strip() -# 定位 wwwroot -if not wwwroot: - sage_root = os.environ.get('SAGE_ROOT') - if sage_root: - wwwroot = os.path.join(sage_root, 'wwwroot') - else: - # 从当前文件路径向上找 wwwroot - # 生产: ~/token/sage/wwwroot/rbac/find_unauth_files.dspy - # 开发: ~/repos/sage/wwwroot/rbac/find_unauth_files.dspy (通过symlink) - # 或: ~/repos/rbac/wwwroot/find_unauth_files.dspy (rbac repo自身) - this_file = os.path.abspath(__file__) - # 如果路径中包含 /wwwroot/ 段,直接截断 - if '/wwwroot/' in this_file: - idx = this_file.index('/wwwroot/') - wwwroot = this_file[:idx + len('/wwwroot')] - else: - # 尝试找 wwwroot 子目录 - for level in range(5): - candidate = this_file - for _ in range(level): - candidate = os.path.dirname(candidate) - if not candidate or candidate == '/': - break - wr = os.path.join(candidate, 'wwwroot') - if os.path.isdir(wr): - wwwroot = wr - break - if not wwwroot: - return UiError(title='错误', message='无法自动定位 wwwroot 目录,请通过参数传入 wwwroot 路径,或设置 SAGE_ROOT 环境变量。') - -wwwroot = os.path.abspath(wwwroot) -if not os.path.isdir(wwwroot): - return UiError(title='错误', message=f"wwwroot 目录不存在: {wwwroot}") - -def find_symlink_dirs(root): - symlinks = [] - for entry in sorted(os.listdir(root)): - full = os.path.join(root, entry) - if os.path.islink(full) and os.path.isdir(full): - target = os.readlink(full) - symlinks.append((entry, full, target)) - return symlinks - -def walk_symlink_dirs(root, symlinks): - real_root = os.path.realpath(root) - visited_real = set() - files = [] - for name, link_path, target in symlinks: - for r, dirs, filenames in os.walk(link_path, followlinks=True): - real_r = os.path.realpath(r) - if real_r in visited_real: - dirs.clear() - continue - visited_real.add(real_r) - dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root] - for fname in sorted(filenames): - abs_path = os.path.join(r, fname) - rel = '/' + os.path.relpath(abs_path, root) - files.append((rel, abs_path)) - return files - -symlinks = find_symlink_dirs(wwwroot) -if not symlinks: - return UiMessage(title='扫描结果', message=f"在 {wwwroot} 中未发现任何符号链接目录。") - -symlink_info = '
'.join([f" {name}/ → {target}" for name, _, target in symlinks]) - -all_files = walk_symlink_dirs(wwwroot, symlinks) - async with get_sor_context(request._run_ns, 'rbac') as sor: - perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {}) - path_to_permid = {} - for r in perm_recs: - path_to_permid[r.path] = r.id + title, message, is_error = await request._run_ns.scan_unauth_files(sor, wwwroot, __file__) - rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {}) - perms_with_roles = set(r.permid for r in rp_recs) - -unauth_files = [] -authed_count = 0 - -for rel_path, abs_path in all_files: - permid = path_to_permid.get(rel_path) - if permid is None: - unauth_files.append((rel_path, "路径未注册")) - elif permid not in perms_with_roles: - unauth_files.append((rel_path, "有记录但无角色")) - else: - authed_count += 1 - -# 按模块分组 -from collections import defaultdict -by_module = defaultdict(list) -for rel_path, reason in unauth_files: - parts = rel_path.strip('/').split('/') - module = parts[0] if parts else 'root' - by_module[module].append((rel_path, reason)) - -# 构建 HTML 输出 -html = f"

wwwroot: {wwwroot}

" -html += f"

符号链接目录 ({len(symlinks)}):
{symlink_info}

" -html += f"

总文件数: {len(all_files)} | 已授权: {authed_count} | 未授权: {len(unauth_files)}

" - -if not unauth_files: - html += "

所有文件均有权限覆盖,无需处理。

" -else: - html += f"

未授权文件 ({len(unauth_files)} 个):

" - for module in sorted(by_module.keys()): - items = by_module[module] - html += f"

{module}/ ({len(items)} 个文件)

" - html += "" - -return UiMessage(title='扫描结果', message=html) +if is_error: + return UiError(title=title, message=message) +return UiMessage(title=title, message=message) diff --git a/wwwroot/list_path_roles.dspy b/wwwroot/list_path_roles.dspy index 4515505..d1b2356 100644 --- a/wwwroot/list_path_roles.dspy +++ b/wwwroot/list_path_roles.dspy @@ -1,66 +1,8 @@ -# 查询指定路径拥有权限的所有角色 path = params_kw.get('path', '').strip() -if not path: - return UiError(title='错误', message='请输入路径') -if not path.startswith('/'): - path = '/' + path - -from appPublic.dictObject import DictObject async with get_sor_context(request._run_ns, 'rbac') as sor: - # 查找 permission 记录 - perm_recs = await sor.sqlExe("SELECT id, path FROM permission WHERE path=${path}$", {'path': path}) - if not perm_recs: - msg = f"路径 '{path}' 未在 permission 表中注册。" - # 尝试模糊匹配 - like_path = path.rstrip('/') + '/%' - like_recs = await sor.sqlExe("SELECT path FROM permission WHERE path LIKE ${lp}$", {'lp': like_path}) - if like_recs: - sub_paths = '
'.join([f' {r.path}' for r in like_recs[:10]]) - if len(like_recs) > 10: - sub_paths += f'
... 共 {len(like_recs)} 条' - msg += f'

模糊匹配到 {len(like_recs)} 个子路径:
{sub_paths}' - return UiError(title='未找到', message=msg) + title, message, is_error = await request._run_ns.query_path_roles(sor, path) - perm = perm_recs[0] - permid = perm.id - - # 查找 rolepermission 关联 - rp_recs = await sor.sqlExe("SELECT roleid FROM rolepermission WHERE permid=${permid}$", {'permid': permid}) - if not rp_recs: - return UiMessage(title='查询结果', message=f"路径: {path}
无任何角色拥有此路径权限。") - - # 查询角色详情 - role_ids = [r.roleid for r in rp_recs] - special_roles = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')] - normal_ids = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')] - - rows = [] - for sp in special_roles: - rows.append(f"{sp}*{sp}") - - if normal_ids: - placeholders = ','.join([f'${i}$' for i in range(len(normal_ids))]) - ns = {f'_{i}': rid for i, rid in enumerate(normal_ids)} - role_recs = await sor.sqlExe(f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})", ns) - for r in role_recs: - name = getattr(r, 'name', r.id) - orgtypeid = getattr(r, 'orgtypeid', '*') - rows.append(f"{r.id}{orgtypeid}{name}") - - table_html = ( - "" - "" - "" - "" - "" - "" - + ''.join(rows) + - "
角色IDorgtypeid名称
" - ) - - html = f"

路径: {path} (perm_id: {permid})

" - html += f"

共 {len(rp_recs)} 个角色有权限:

" - html += table_html - -return UiMessage(title='查询结果', message=html) +if is_error: + return UiError(title=title, message=message) +return UiMessage(title=title, message=message)