diff --git a/rbac/init.py b/rbac/init.py
index 7fef733..0e22547 100644
--- a/rbac/init.py
+++ b/rbac/init.py
@@ -6,19 +6,23 @@ from .orgs import (
)
from .userperm import UserPermissions
from .user_stats import get_user_stats
+from .rbac_tools import (
+ query_path_roles,
+ scan_unauth_files
+)
from rbac.check_perm import (
- objcheckperm,
+ objcheckperm,
get_org_users,
sor_get_org_users,
- checkUserPassword,
- register_user,
- register_auth_method,
- create_org,
+ checkUserPassword,
+ register_user,
+ register_auth_method,
+ create_org,
create_user
)
from rbac.set_role_perms import (
sor_add_user_roles,
- set_role_perm,
+ set_role_perm,
set_role_perms
)
from appPublic.log import debug
@@ -78,6 +82,8 @@ def load_rbac():
env.get_owner_orgid = get_owner_orgid
env.sor_add_user_roles = sor_add_user_roles
env.get_user_stats = get_user_stats
+ env.query_path_roles = query_path_roles
+ env.scan_unauth_files = scan_unauth_files
# Cache invalidation methods for use after role/permission changes
env.invalidate_user_perm_cache = env.userpermissions.invalidate_user_cache
env.invalidate_all_perm_caches = env.userpermissions.invalidate_all_user_caches
diff --git a/rbac/rbac_tools.py b/rbac/rbac_tools.py
new file mode 100644
index 0000000..e766d18
--- /dev/null
+++ b/rbac/rbac_tools.py
@@ -0,0 +1,228 @@
+"""
+RBAC 工具函数 — 权限查询与文件扫描。
+
+提供两个功能:
+1. 查询指定路径拥有权限的所有角色
+2. 扫描 wwwroot 下符号链接目录中未授权的文件
+
+.dspy 文件应调用此模块的函数(通过 request._run_ns),自身不做 import。
+每个函数返回 (title, message, is_error) 三元组,.dspy 据此返回 UiMessage 或 UiError。
+"""
+import os
+from collections import defaultdict
+
+
+async def query_path_roles(sor, path):
+ """
+ 查询指定路径拥有权限的角色列表。
+
+ Returns:
+ (title, message, is_error)
+ """
+ if not path:
+ return ('错误', '请输入路径', True)
+ if not path.startswith('/'):
+ path = '/' + path
+
+ perm_recs = await sor.sqlExe(
+ "SELECT id, path FROM permission WHERE path=${path}$",
+ {'path': path}
+ )
+ if not perm_recs:
+ like_path = path.rstrip('/') + '/%'
+ like_recs = await sor.sqlExe(
+ "SELECT path FROM permission WHERE path LIKE ${lp}$",
+ {'lp': like_path}
+ )
+ msg = f"路径 '{path}' 未在 permission 表中注册。"
+ if like_recs:
+ sub = '
'.join([f' {r.path}' for r in like_recs[:10]])
+ if len(like_recs) > 10:
+ sub += f'
... 共 {len(like_recs)} 条'
+ msg += f'
模糊匹配到 {len(like_recs)} 个子路径:
{sub}'
+ return ('未找到', msg, True)
+
+ perm = perm_recs[0]
+ permid = perm.id
+
+ rp_recs = await sor.sqlExe(
+ "SELECT roleid FROM rolepermission WHERE permid=${permid}$",
+ {'permid': permid}
+ )
+ if not rp_recs:
+ msg = f"路径: {path} (perm_id: {permid})
无任何角色拥有此路径权限。"
+ return ('查询结果', msg, False)
+
+ role_ids = [r.roleid for r in rp_recs]
+ special = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')]
+ normal = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')]
+
+ rows = []
+ for sp in special:
+ rows.append(f"
| {sp} | * | {sp} |
")
+
+ if normal:
+ placeholders = ','.join([f'${i}$' for i in range(len(normal))])
+ ns = {f'_{i}': rid for i, rid in enumerate(normal)}
+ role_recs = await sor.sqlExe(
+ f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})",
+ ns
+ )
+ for r in role_recs:
+ rows.append(
+ f"| {r.id} | "
+ f"{getattr(r, 'orgtypeid', '*')} | "
+ f"{getattr(r, 'name', r.id)} |
"
+ )
+
+ table = (
+ ""
+ ""
+ "| 角色ID | "
+ "orgtypeid | "
+ "名称 | "
+ "
"
+ + ''.join(rows) +
+ "
"
+ )
+
+ html = (
+ f"路径: {path} (perm_id: {permid})
"
+ f"共 {len(rp_recs)} 个角色有权限:
"
+ f"{table}"
+ )
+ return ('查询结果', html, False)
+
+
+def find_symlink_dirs(root):
+ """找出指定目录下直接子目录中是符号链接的目录。"""
+ symlinks = []
+ for entry in sorted(os.listdir(root)):
+ full = os.path.join(root, entry)
+ if os.path.islink(full) and os.path.isdir(full):
+ target = os.readlink(full)
+ symlinks.append((entry, full, target))
+ return symlinks
+
+
+def walk_symlink_dirs(root, symlinks):
+ """
+ 遍历所有符号链接目录下的文件。
+ Returns: list of (relative_path, absolute_path)
+ """
+ real_root = os.path.realpath(root)
+ visited_real = set()
+ files = []
+ for name, link_path, target in symlinks:
+ for r, dirs, filenames in os.walk(link_path, followlinks=True):
+ real_r = os.path.realpath(r)
+ if real_r in visited_real:
+ dirs.clear()
+ continue
+ visited_real.add(real_r)
+ dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root]
+ for fname in sorted(filenames):
+ abs_path = os.path.join(r, fname)
+ rel = '/' + os.path.relpath(abs_path, root)
+ files.append((rel, abs_path))
+ return files
+
+
+def resolve_wwwroot(wwwroot_param, file_path):
+ """自动定位 wwwroot 目录。"""
+ if wwwroot_param:
+ wwwroot_param = os.path.abspath(wwwroot_param)
+ if os.path.isdir(wwwroot_param):
+ return wwwroot_param, None
+ return None, f"wwwroot 目录不存在: {wwwroot_param}"
+
+ sage_root = os.environ.get('SAGE_ROOT')
+ if sage_root:
+ wr = os.path.join(sage_root, 'wwwroot')
+ if os.path.isdir(wr):
+ return wr, None
+ return None, f"SAGE_ROOT 指向的 wwwroot 不存在: {wr}"
+
+ if '/wwwroot/' in file_path:
+ idx = file_path.index('/wwwroot/')
+ wr = file_path[:idx + len('/wwwroot')]
+ if os.path.isdir(wr):
+ return wr, None
+
+ candidate = file_path
+ for _ in range(5):
+ candidate = os.path.dirname(candidate)
+ if not candidate or candidate == '/':
+ break
+ wr = os.path.join(candidate, 'wwwroot')
+ if os.path.isdir(wr):
+ return wr, None
+
+ return None, '无法自动定位 wwwroot 目录,请传入 wwwroot 参数或设置 SAGE_ROOT 环境变量。'
+
+
+async def scan_unauth_files(sor, wwwroot_param, file_path):
+ """
+ 扫描 wwwroot 下符号链接目录中未授权的文件。
+
+ Returns:
+ (title, message, is_error)
+ """
+ wwwroot, err = resolve_wwwroot(wwwroot_param, file_path)
+ if err:
+ return ('错误', err, True)
+
+ symlinks = find_symlink_dirs(wwwroot)
+ if not symlinks:
+ return ('扫描结果', f"在 {wwwroot} 中未发现任何符号链接目录。", False)
+
+ all_files = walk_symlink_dirs(wwwroot, symlinks)
+ symlink_info = '
'.join([
+ f" {name}/ → {target}" for name, _, target in symlinks
+ ])
+
+ perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {})
+ path_to_permid = {r.path: r.id for r in perm_recs}
+
+ rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {})
+ perms_with_roles = {r.permid for r in rp_recs}
+
+ unauth = []
+ authed = 0
+ for rel_path, abs_path in all_files:
+ permid = path_to_permid.get(rel_path)
+ if permid is None:
+ unauth.append((rel_path, '路径未注册'))
+ elif permid not in perms_with_roles:
+ unauth.append((rel_path, '有记录但无角色'))
+ else:
+ authed += 1
+
+ by_module = defaultdict(list)
+ for rel_path, reason in unauth:
+ parts = rel_path.strip('/').split('/')
+ module = parts[0] if parts else 'root'
+ by_module[module].append((rel_path, reason))
+
+ html = (
+ f"wwwroot: {wwwroot}
"
+ f"符号链接目录 ({len(symlinks)}):
{symlink_info}
"
+ f"总文件数: {len(all_files)} | 已授权: {authed} | 未授权: {len(unauth)}
"
+ )
+
+ if not unauth:
+ html += "所有文件均有权限覆盖,无需处理。
"
+ else:
+ html += f"未授权文件 ({len(unauth)} 个):
"
+ for module in sorted(by_module.keys()):
+ items = by_module[module]
+ html += f"{module}/ ({len(items)} 个文件)
"
+ html += ""
+ for rel_path, reason in items[:50]:
+ color = '#EF4444' if reason == '路径未注册' else '#F97316'
+ html += f"- [{reason}] {rel_path}
"
+ if len(items) > 50:
+ html += f"- ... 还有 {len(items) - 50} 个文件
"
+ html += "
"
+
+ return ('扫描结果', html, False)
diff --git a/wwwroot/find_unauth_files.dspy b/wwwroot/find_unauth_files.dspy
index 6cf69b4..4069989 100644
--- a/wwwroot/find_unauth_files.dspy
+++ b/wwwroot/find_unauth_files.dspy
@@ -1,124 +1,8 @@
-# 查找 wwwroot 下符号链接目录中没有任何角色权限的文件
-import os
-
wwwroot = params_kw.get('wwwroot', '').strip()
-# 定位 wwwroot
-if not wwwroot:
- sage_root = os.environ.get('SAGE_ROOT')
- if sage_root:
- wwwroot = os.path.join(sage_root, 'wwwroot')
- else:
- # 从当前文件路径向上找 wwwroot
- # 生产: ~/token/sage/wwwroot/rbac/find_unauth_files.dspy
- # 开发: ~/repos/sage/wwwroot/rbac/find_unauth_files.dspy (通过symlink)
- # 或: ~/repos/rbac/wwwroot/find_unauth_files.dspy (rbac repo自身)
- this_file = os.path.abspath(__file__)
- # 如果路径中包含 /wwwroot/ 段,直接截断
- if '/wwwroot/' in this_file:
- idx = this_file.index('/wwwroot/')
- wwwroot = this_file[:idx + len('/wwwroot')]
- else:
- # 尝试找 wwwroot 子目录
- for level in range(5):
- candidate = this_file
- for _ in range(level):
- candidate = os.path.dirname(candidate)
- if not candidate or candidate == '/':
- break
- wr = os.path.join(candidate, 'wwwroot')
- if os.path.isdir(wr):
- wwwroot = wr
- break
- if not wwwroot:
- return UiError(title='错误', message='无法自动定位 wwwroot 目录,请通过参数传入 wwwroot 路径,或设置 SAGE_ROOT 环境变量。')
-
-wwwroot = os.path.abspath(wwwroot)
-if not os.path.isdir(wwwroot):
- return UiError(title='错误', message=f"wwwroot 目录不存在: {wwwroot}")
-
-def find_symlink_dirs(root):
- symlinks = []
- for entry in sorted(os.listdir(root)):
- full = os.path.join(root, entry)
- if os.path.islink(full) and os.path.isdir(full):
- target = os.readlink(full)
- symlinks.append((entry, full, target))
- return symlinks
-
-def walk_symlink_dirs(root, symlinks):
- real_root = os.path.realpath(root)
- visited_real = set()
- files = []
- for name, link_path, target in symlinks:
- for r, dirs, filenames in os.walk(link_path, followlinks=True):
- real_r = os.path.realpath(r)
- if real_r in visited_real:
- dirs.clear()
- continue
- visited_real.add(real_r)
- dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root]
- for fname in sorted(filenames):
- abs_path = os.path.join(r, fname)
- rel = '/' + os.path.relpath(abs_path, root)
- files.append((rel, abs_path))
- return files
-
-symlinks = find_symlink_dirs(wwwroot)
-if not symlinks:
- return UiMessage(title='扫描结果', message=f"在 {wwwroot} 中未发现任何符号链接目录。")
-
-symlink_info = '
'.join([f" {name}/ → {target}" for name, _, target in symlinks])
-
-all_files = walk_symlink_dirs(wwwroot, symlinks)
-
async with get_sor_context(request._run_ns, 'rbac') as sor:
- perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {})
- path_to_permid = {}
- for r in perm_recs:
- path_to_permid[r.path] = r.id
+ title, message, is_error = await request._run_ns.scan_unauth_files(sor, wwwroot, __file__)
- rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {})
- perms_with_roles = set(r.permid for r in rp_recs)
-
-unauth_files = []
-authed_count = 0
-
-for rel_path, abs_path in all_files:
- permid = path_to_permid.get(rel_path)
- if permid is None:
- unauth_files.append((rel_path, "路径未注册"))
- elif permid not in perms_with_roles:
- unauth_files.append((rel_path, "有记录但无角色"))
- else:
- authed_count += 1
-
-# 按模块分组
-from collections import defaultdict
-by_module = defaultdict(list)
-for rel_path, reason in unauth_files:
- parts = rel_path.strip('/').split('/')
- module = parts[0] if parts else 'root'
- by_module[module].append((rel_path, reason))
-
-# 构建 HTML 输出
-html = f"wwwroot: {wwwroot}
"
-html += f"符号链接目录 ({len(symlinks)}):
{symlink_info}
"
-html += f"总文件数: {len(all_files)} | 已授权: {authed_count} | 未授权: {len(unauth_files)}
"
-
-if not unauth_files:
- html += "所有文件均有权限覆盖,无需处理。
"
-else:
- html += f"未授权文件 ({len(unauth_files)} 个):
"
- for module in sorted(by_module.keys()):
- items = by_module[module]
- html += f"{module}/ ({len(items)} 个文件)
"
- html += ""
- for rel_path, reason in items[:50]:
- color = '#EF4444' if reason == '路径未注册' else '#F97316'
- html += f"- [{reason}] {rel_path}
"
- if len(items) > 50:
- html += f"- ... 还有 {len(items) - 50} 个文件
"
- html += "
"
-
-return UiMessage(title='扫描结果', message=html)
+if is_error:
+ return UiError(title=title, message=message)
+return UiMessage(title=title, message=message)
diff --git a/wwwroot/list_path_roles.dspy b/wwwroot/list_path_roles.dspy
index 4515505..d1b2356 100644
--- a/wwwroot/list_path_roles.dspy
+++ b/wwwroot/list_path_roles.dspy
@@ -1,66 +1,8 @@
-# 查询指定路径拥有权限的所有角色
path = params_kw.get('path', '').strip()
-if not path:
- return UiError(title='错误', message='请输入路径')
-if not path.startswith('/'):
- path = '/' + path
-
-from appPublic.dictObject import DictObject
async with get_sor_context(request._run_ns, 'rbac') as sor:
- # 查找 permission 记录
- perm_recs = await sor.sqlExe("SELECT id, path FROM permission WHERE path=${path}$", {'path': path})
- if not perm_recs:
- msg = f"路径 '{path}' 未在 permission 表中注册。"
- # 尝试模糊匹配
- like_path = path.rstrip('/') + '/%'
- like_recs = await sor.sqlExe("SELECT path FROM permission WHERE path LIKE ${lp}$", {'lp': like_path})
- if like_recs:
- sub_paths = '
'.join([f' {r.path}' for r in like_recs[:10]])
- if len(like_recs) > 10:
- sub_paths += f'
... 共 {len(like_recs)} 条'
- msg += f'
模糊匹配到 {len(like_recs)} 个子路径:
{sub_paths}'
- return UiError(title='未找到', message=msg)
+ title, message, is_error = await request._run_ns.query_path_roles(sor, path)
- perm = perm_recs[0]
- permid = perm.id
-
- # 查找 rolepermission 关联
- rp_recs = await sor.sqlExe("SELECT roleid FROM rolepermission WHERE permid=${permid}$", {'permid': permid})
- if not rp_recs:
- return UiMessage(title='查询结果', message=f"路径: {path}
无任何角色拥有此路径权限。")
-
- # 查询角色详情
- role_ids = [r.roleid for r in rp_recs]
- special_roles = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')]
- normal_ids = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')]
-
- rows = []
- for sp in special_roles:
- rows.append(f"| {sp} | * | {sp} |
")
-
- if normal_ids:
- placeholders = ','.join([f'${i}$' for i in range(len(normal_ids))])
- ns = {f'_{i}': rid for i, rid in enumerate(normal_ids)}
- role_recs = await sor.sqlExe(f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})", ns)
- for r in role_recs:
- name = getattr(r, 'name', r.id)
- orgtypeid = getattr(r, 'orgtypeid', '*')
- rows.append(f"| {r.id} | {orgtypeid} | {name} |
")
-
- table_html = (
- ""
- ""
- "| 角色ID | "
- "orgtypeid | "
- "名称 | "
- "
"
- + ''.join(rows) +
- "
"
- )
-
- html = f"路径: {path} (perm_id: {permid})
"
- html += f"共 {len(rp_recs)} 个角色有权限:
"
- html += table_html
-
-return UiMessage(title='查询结果', message=html)
+if is_error:
+ return UiError(title=title, message=message)
+return UiMessage(title=title, message=message)