refactor: move RBAC tools logic to rbac/rbac_tools.py, dspy files call via request._run_ns
This commit is contained in:
parent
0b456486db
commit
f8c8a4ce4d
18
rbac/init.py
18
rbac/init.py
@ -6,19 +6,23 @@ from .orgs import (
|
|||||||
)
|
)
|
||||||
from .userperm import UserPermissions
|
from .userperm import UserPermissions
|
||||||
from .user_stats import get_user_stats
|
from .user_stats import get_user_stats
|
||||||
|
from .rbac_tools import (
|
||||||
|
query_path_roles,
|
||||||
|
scan_unauth_files
|
||||||
|
)
|
||||||
from rbac.check_perm import (
|
from rbac.check_perm import (
|
||||||
objcheckperm,
|
objcheckperm,
|
||||||
get_org_users,
|
get_org_users,
|
||||||
sor_get_org_users,
|
sor_get_org_users,
|
||||||
checkUserPassword,
|
checkUserPassword,
|
||||||
register_user,
|
register_user,
|
||||||
register_auth_method,
|
register_auth_method,
|
||||||
create_org,
|
create_org,
|
||||||
create_user
|
create_user
|
||||||
)
|
)
|
||||||
from rbac.set_role_perms import (
|
from rbac.set_role_perms import (
|
||||||
sor_add_user_roles,
|
sor_add_user_roles,
|
||||||
set_role_perm,
|
set_role_perm,
|
||||||
set_role_perms
|
set_role_perms
|
||||||
)
|
)
|
||||||
from appPublic.log import debug
|
from appPublic.log import debug
|
||||||
@ -78,6 +82,8 @@ def load_rbac():
|
|||||||
env.get_owner_orgid = get_owner_orgid
|
env.get_owner_orgid = get_owner_orgid
|
||||||
env.sor_add_user_roles = sor_add_user_roles
|
env.sor_add_user_roles = sor_add_user_roles
|
||||||
env.get_user_stats = get_user_stats
|
env.get_user_stats = get_user_stats
|
||||||
|
env.query_path_roles = query_path_roles
|
||||||
|
env.scan_unauth_files = scan_unauth_files
|
||||||
# Cache invalidation methods for use after role/permission changes
|
# Cache invalidation methods for use after role/permission changes
|
||||||
env.invalidate_user_perm_cache = env.userpermissions.invalidate_user_cache
|
env.invalidate_user_perm_cache = env.userpermissions.invalidate_user_cache
|
||||||
env.invalidate_all_perm_caches = env.userpermissions.invalidate_all_user_caches
|
env.invalidate_all_perm_caches = env.userpermissions.invalidate_all_user_caches
|
||||||
|
|||||||
228
rbac/rbac_tools.py
Normal file
228
rbac/rbac_tools.py
Normal file
@ -0,0 +1,228 @@
|
|||||||
|
"""
|
||||||
|
RBAC 工具函数 — 权限查询与文件扫描。
|
||||||
|
|
||||||
|
提供两个功能:
|
||||||
|
1. 查询指定路径拥有权限的所有角色
|
||||||
|
2. 扫描 wwwroot 下符号链接目录中未授权的文件
|
||||||
|
|
||||||
|
.dspy 文件应调用此模块的函数(通过 request._run_ns),自身不做 import。
|
||||||
|
每个函数返回 (title, message, is_error) 三元组,.dspy 据此返回 UiMessage 或 UiError。
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
|
||||||
|
async def query_path_roles(sor, path):
|
||||||
|
"""
|
||||||
|
查询指定路径拥有权限的角色列表。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(title, message, is_error)
|
||||||
|
"""
|
||||||
|
if not path:
|
||||||
|
return ('错误', '请输入路径', True)
|
||||||
|
if not path.startswith('/'):
|
||||||
|
path = '/' + path
|
||||||
|
|
||||||
|
perm_recs = await sor.sqlExe(
|
||||||
|
"SELECT id, path FROM permission WHERE path=${path}$",
|
||||||
|
{'path': path}
|
||||||
|
)
|
||||||
|
if not perm_recs:
|
||||||
|
like_path = path.rstrip('/') + '/%'
|
||||||
|
like_recs = await sor.sqlExe(
|
||||||
|
"SELECT path FROM permission WHERE path LIKE ${lp}$",
|
||||||
|
{'lp': like_path}
|
||||||
|
)
|
||||||
|
msg = f"路径 '{path}' 未在 permission 表中注册。"
|
||||||
|
if like_recs:
|
||||||
|
sub = '<br>'.join([f' {r.path}' for r in like_recs[:10]])
|
||||||
|
if len(like_recs) > 10:
|
||||||
|
sub += f'<br>... 共 {len(like_recs)} 条'
|
||||||
|
msg += f'<br><br>模糊匹配到 {len(like_recs)} 个子路径:<br>{sub}'
|
||||||
|
return ('未找到', msg, True)
|
||||||
|
|
||||||
|
perm = perm_recs[0]
|
||||||
|
permid = perm.id
|
||||||
|
|
||||||
|
rp_recs = await sor.sqlExe(
|
||||||
|
"SELECT roleid FROM rolepermission WHERE permid=${permid}$",
|
||||||
|
{'permid': permid}
|
||||||
|
)
|
||||||
|
if not rp_recs:
|
||||||
|
msg = f"路径: <b>{path}</b> (perm_id: {permid})<br><br>无任何角色拥有此路径权限。"
|
||||||
|
return ('查询结果', msg, False)
|
||||||
|
|
||||||
|
role_ids = [r.roleid for r in rp_recs]
|
||||||
|
special = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')]
|
||||||
|
normal = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')]
|
||||||
|
|
||||||
|
rows = []
|
||||||
|
for sp in special:
|
||||||
|
rows.append(f"<tr><td>{sp}</td><td>*</td><td>{sp}</td></tr>")
|
||||||
|
|
||||||
|
if normal:
|
||||||
|
placeholders = ','.join([f'${i}$' for i in range(len(normal))])
|
||||||
|
ns = {f'_{i}': rid for i, rid in enumerate(normal)}
|
||||||
|
role_recs = await sor.sqlExe(
|
||||||
|
f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})",
|
||||||
|
ns
|
||||||
|
)
|
||||||
|
for r in role_recs:
|
||||||
|
rows.append(
|
||||||
|
f"<tr><td>{r.id}</td>"
|
||||||
|
f"<td>{getattr(r, 'orgtypeid', '*')}</td>"
|
||||||
|
f"<td>{getattr(r, 'name', r.id)}</td></tr>"
|
||||||
|
)
|
||||||
|
|
||||||
|
table = (
|
||||||
|
"<table style='border-collapse:collapse;width:100%;'>"
|
||||||
|
"<tr style='background:#334155;color:#fff;'>"
|
||||||
|
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>角色ID</th>"
|
||||||
|
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>orgtypeid</th>"
|
||||||
|
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>名称</th>"
|
||||||
|
"</tr>"
|
||||||
|
+ ''.join(rows) +
|
||||||
|
"</table>"
|
||||||
|
)
|
||||||
|
|
||||||
|
html = (
|
||||||
|
f"<p>路径: <b>{path}</b> (perm_id: {permid})</p>"
|
||||||
|
f"<p>共 {len(rp_recs)} 个角色有权限:</p>"
|
||||||
|
f"{table}"
|
||||||
|
)
|
||||||
|
return ('查询结果', html, False)
|
||||||
|
|
||||||
|
|
||||||
|
def find_symlink_dirs(root):
|
||||||
|
"""找出指定目录下直接子目录中是符号链接的目录。"""
|
||||||
|
symlinks = []
|
||||||
|
for entry in sorted(os.listdir(root)):
|
||||||
|
full = os.path.join(root, entry)
|
||||||
|
if os.path.islink(full) and os.path.isdir(full):
|
||||||
|
target = os.readlink(full)
|
||||||
|
symlinks.append((entry, full, target))
|
||||||
|
return symlinks
|
||||||
|
|
||||||
|
|
||||||
|
def walk_symlink_dirs(root, symlinks):
|
||||||
|
"""
|
||||||
|
遍历所有符号链接目录下的文件。
|
||||||
|
Returns: list of (relative_path, absolute_path)
|
||||||
|
"""
|
||||||
|
real_root = os.path.realpath(root)
|
||||||
|
visited_real = set()
|
||||||
|
files = []
|
||||||
|
for name, link_path, target in symlinks:
|
||||||
|
for r, dirs, filenames in os.walk(link_path, followlinks=True):
|
||||||
|
real_r = os.path.realpath(r)
|
||||||
|
if real_r in visited_real:
|
||||||
|
dirs.clear()
|
||||||
|
continue
|
||||||
|
visited_real.add(real_r)
|
||||||
|
dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root]
|
||||||
|
for fname in sorted(filenames):
|
||||||
|
abs_path = os.path.join(r, fname)
|
||||||
|
rel = '/' + os.path.relpath(abs_path, root)
|
||||||
|
files.append((rel, abs_path))
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_wwwroot(wwwroot_param, file_path):
|
||||||
|
"""自动定位 wwwroot 目录。"""
|
||||||
|
if wwwroot_param:
|
||||||
|
wwwroot_param = os.path.abspath(wwwroot_param)
|
||||||
|
if os.path.isdir(wwwroot_param):
|
||||||
|
return wwwroot_param, None
|
||||||
|
return None, f"wwwroot 目录不存在: {wwwroot_param}"
|
||||||
|
|
||||||
|
sage_root = os.environ.get('SAGE_ROOT')
|
||||||
|
if sage_root:
|
||||||
|
wr = os.path.join(sage_root, 'wwwroot')
|
||||||
|
if os.path.isdir(wr):
|
||||||
|
return wr, None
|
||||||
|
return None, f"SAGE_ROOT 指向的 wwwroot 不存在: {wr}"
|
||||||
|
|
||||||
|
if '/wwwroot/' in file_path:
|
||||||
|
idx = file_path.index('/wwwroot/')
|
||||||
|
wr = file_path[:idx + len('/wwwroot')]
|
||||||
|
if os.path.isdir(wr):
|
||||||
|
return wr, None
|
||||||
|
|
||||||
|
candidate = file_path
|
||||||
|
for _ in range(5):
|
||||||
|
candidate = os.path.dirname(candidate)
|
||||||
|
if not candidate or candidate == '/':
|
||||||
|
break
|
||||||
|
wr = os.path.join(candidate, 'wwwroot')
|
||||||
|
if os.path.isdir(wr):
|
||||||
|
return wr, None
|
||||||
|
|
||||||
|
return None, '无法自动定位 wwwroot 目录,请传入 wwwroot 参数或设置 SAGE_ROOT 环境变量。'
|
||||||
|
|
||||||
|
|
||||||
|
async def scan_unauth_files(sor, wwwroot_param, file_path):
|
||||||
|
"""
|
||||||
|
扫描 wwwroot 下符号链接目录中未授权的文件。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(title, message, is_error)
|
||||||
|
"""
|
||||||
|
wwwroot, err = resolve_wwwroot(wwwroot_param, file_path)
|
||||||
|
if err:
|
||||||
|
return ('错误', err, True)
|
||||||
|
|
||||||
|
symlinks = find_symlink_dirs(wwwroot)
|
||||||
|
if not symlinks:
|
||||||
|
return ('扫描结果', f"在 {wwwroot} 中未发现任何符号链接目录。", False)
|
||||||
|
|
||||||
|
all_files = walk_symlink_dirs(wwwroot, symlinks)
|
||||||
|
symlink_info = '<br>'.join([
|
||||||
|
f" {name}/ → {target}" for name, _, target in symlinks
|
||||||
|
])
|
||||||
|
|
||||||
|
perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {})
|
||||||
|
path_to_permid = {r.path: r.id for r in perm_recs}
|
||||||
|
|
||||||
|
rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {})
|
||||||
|
perms_with_roles = {r.permid for r in rp_recs}
|
||||||
|
|
||||||
|
unauth = []
|
||||||
|
authed = 0
|
||||||
|
for rel_path, abs_path in all_files:
|
||||||
|
permid = path_to_permid.get(rel_path)
|
||||||
|
if permid is None:
|
||||||
|
unauth.append((rel_path, '路径未注册'))
|
||||||
|
elif permid not in perms_with_roles:
|
||||||
|
unauth.append((rel_path, '有记录但无角色'))
|
||||||
|
else:
|
||||||
|
authed += 1
|
||||||
|
|
||||||
|
by_module = defaultdict(list)
|
||||||
|
for rel_path, reason in unauth:
|
||||||
|
parts = rel_path.strip('/').split('/')
|
||||||
|
module = parts[0] if parts else 'root'
|
||||||
|
by_module[module].append((rel_path, reason))
|
||||||
|
|
||||||
|
html = (
|
||||||
|
f"<p><b>wwwroot:</b> {wwwroot}</p>"
|
||||||
|
f"<p><b>符号链接目录 ({len(symlinks)}):</b><br>{symlink_info}</p>"
|
||||||
|
f"<p>总文件数: {len(all_files)} | 已授权: {authed} | 未授权: {len(unauth)}</p>"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not unauth:
|
||||||
|
html += "<p style='color:#22C55E;font-weight:bold;'>所有文件均有权限覆盖,无需处理。</p>"
|
||||||
|
else:
|
||||||
|
html += f"<p style='color:#EF4444;font-weight:bold;'>未授权文件 ({len(unauth)} 个):</p>"
|
||||||
|
for module in sorted(by_module.keys()):
|
||||||
|
items = by_module[module]
|
||||||
|
html += f"<h4 style='color:#FBBF24;'>{module}/ ({len(items)} 个文件)</h4>"
|
||||||
|
html += "<ul style='font-family:monospace;font-size:12px;'>"
|
||||||
|
for rel_path, reason in items[:50]:
|
||||||
|
color = '#EF4444' if reason == '路径未注册' else '#F97316'
|
||||||
|
html += f"<li style='color:{color};'>[{reason}] {rel_path}</li>"
|
||||||
|
if len(items) > 50:
|
||||||
|
html += f"<li>... 还有 {len(items) - 50} 个文件</li>"
|
||||||
|
html += "</ul>"
|
||||||
|
|
||||||
|
return ('扫描结果', html, False)
|
||||||
@ -1,124 +1,8 @@
|
|||||||
# 查找 wwwroot 下符号链接目录中没有任何角色权限的文件
|
|
||||||
import os
|
|
||||||
|
|
||||||
wwwroot = params_kw.get('wwwroot', '').strip()
|
wwwroot = params_kw.get('wwwroot', '').strip()
|
||||||
|
|
||||||
# 定位 wwwroot
|
|
||||||
if not wwwroot:
|
|
||||||
sage_root = os.environ.get('SAGE_ROOT')
|
|
||||||
if sage_root:
|
|
||||||
wwwroot = os.path.join(sage_root, 'wwwroot')
|
|
||||||
else:
|
|
||||||
# 从当前文件路径向上找 wwwroot
|
|
||||||
# 生产: ~/token/sage/wwwroot/rbac/find_unauth_files.dspy
|
|
||||||
# 开发: ~/repos/sage/wwwroot/rbac/find_unauth_files.dspy (通过symlink)
|
|
||||||
# 或: ~/repos/rbac/wwwroot/find_unauth_files.dspy (rbac repo自身)
|
|
||||||
this_file = os.path.abspath(__file__)
|
|
||||||
# 如果路径中包含 /wwwroot/ 段,直接截断
|
|
||||||
if '/wwwroot/' in this_file:
|
|
||||||
idx = this_file.index('/wwwroot/')
|
|
||||||
wwwroot = this_file[:idx + len('/wwwroot')]
|
|
||||||
else:
|
|
||||||
# 尝试找 wwwroot 子目录
|
|
||||||
for level in range(5):
|
|
||||||
candidate = this_file
|
|
||||||
for _ in range(level):
|
|
||||||
candidate = os.path.dirname(candidate)
|
|
||||||
if not candidate or candidate == '/':
|
|
||||||
break
|
|
||||||
wr = os.path.join(candidate, 'wwwroot')
|
|
||||||
if os.path.isdir(wr):
|
|
||||||
wwwroot = wr
|
|
||||||
break
|
|
||||||
if not wwwroot:
|
|
||||||
return UiError(title='错误', message='无法自动定位 wwwroot 目录,请通过参数传入 wwwroot 路径,或设置 SAGE_ROOT 环境变量。')
|
|
||||||
|
|
||||||
wwwroot = os.path.abspath(wwwroot)
|
|
||||||
if not os.path.isdir(wwwroot):
|
|
||||||
return UiError(title='错误', message=f"wwwroot 目录不存在: {wwwroot}")
|
|
||||||
|
|
||||||
def find_symlink_dirs(root):
|
|
||||||
symlinks = []
|
|
||||||
for entry in sorted(os.listdir(root)):
|
|
||||||
full = os.path.join(root, entry)
|
|
||||||
if os.path.islink(full) and os.path.isdir(full):
|
|
||||||
target = os.readlink(full)
|
|
||||||
symlinks.append((entry, full, target))
|
|
||||||
return symlinks
|
|
||||||
|
|
||||||
def walk_symlink_dirs(root, symlinks):
|
|
||||||
real_root = os.path.realpath(root)
|
|
||||||
visited_real = set()
|
|
||||||
files = []
|
|
||||||
for name, link_path, target in symlinks:
|
|
||||||
for r, dirs, filenames in os.walk(link_path, followlinks=True):
|
|
||||||
real_r = os.path.realpath(r)
|
|
||||||
if real_r in visited_real:
|
|
||||||
dirs.clear()
|
|
||||||
continue
|
|
||||||
visited_real.add(real_r)
|
|
||||||
dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root]
|
|
||||||
for fname in sorted(filenames):
|
|
||||||
abs_path = os.path.join(r, fname)
|
|
||||||
rel = '/' + os.path.relpath(abs_path, root)
|
|
||||||
files.append((rel, abs_path))
|
|
||||||
return files
|
|
||||||
|
|
||||||
symlinks = find_symlink_dirs(wwwroot)
|
|
||||||
if not symlinks:
|
|
||||||
return UiMessage(title='扫描结果', message=f"在 {wwwroot} 中未发现任何符号链接目录。")
|
|
||||||
|
|
||||||
symlink_info = '<br>'.join([f" {name}/ → {target}" for name, _, target in symlinks])
|
|
||||||
|
|
||||||
all_files = walk_symlink_dirs(wwwroot, symlinks)
|
|
||||||
|
|
||||||
async with get_sor_context(request._run_ns, 'rbac') as sor:
|
async with get_sor_context(request._run_ns, 'rbac') as sor:
|
||||||
perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {})
|
title, message, is_error = await request._run_ns.scan_unauth_files(sor, wwwroot, __file__)
|
||||||
path_to_permid = {}
|
|
||||||
for r in perm_recs:
|
|
||||||
path_to_permid[r.path] = r.id
|
|
||||||
|
|
||||||
rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {})
|
if is_error:
|
||||||
perms_with_roles = set(r.permid for r in rp_recs)
|
return UiError(title=title, message=message)
|
||||||
|
return UiMessage(title=title, message=message)
|
||||||
unauth_files = []
|
|
||||||
authed_count = 0
|
|
||||||
|
|
||||||
for rel_path, abs_path in all_files:
|
|
||||||
permid = path_to_permid.get(rel_path)
|
|
||||||
if permid is None:
|
|
||||||
unauth_files.append((rel_path, "路径未注册"))
|
|
||||||
elif permid not in perms_with_roles:
|
|
||||||
unauth_files.append((rel_path, "有记录但无角色"))
|
|
||||||
else:
|
|
||||||
authed_count += 1
|
|
||||||
|
|
||||||
# 按模块分组
|
|
||||||
from collections import defaultdict
|
|
||||||
by_module = defaultdict(list)
|
|
||||||
for rel_path, reason in unauth_files:
|
|
||||||
parts = rel_path.strip('/').split('/')
|
|
||||||
module = parts[0] if parts else 'root'
|
|
||||||
by_module[module].append((rel_path, reason))
|
|
||||||
|
|
||||||
# 构建 HTML 输出
|
|
||||||
html = f"<p><b>wwwroot:</b> {wwwroot}</p>"
|
|
||||||
html += f"<p><b>符号链接目录 ({len(symlinks)}):</b><br>{symlink_info}</p>"
|
|
||||||
html += f"<p>总文件数: {len(all_files)} | 已授权: {authed_count} | 未授权: {len(unauth_files)}</p>"
|
|
||||||
|
|
||||||
if not unauth_files:
|
|
||||||
html += "<p style='color:#22C55E;font-weight:bold;'>所有文件均有权限覆盖,无需处理。</p>"
|
|
||||||
else:
|
|
||||||
html += f"<p style='color:#EF4444;font-weight:bold;'>未授权文件 ({len(unauth_files)} 个):</p>"
|
|
||||||
for module in sorted(by_module.keys()):
|
|
||||||
items = by_module[module]
|
|
||||||
html += f"<h4 style='color:#FBBF24;'>{module}/ ({len(items)} 个文件)</h4>"
|
|
||||||
html += "<ul style='font-family:monospace;font-size:12px;'>"
|
|
||||||
for rel_path, reason in items[:50]:
|
|
||||||
color = '#EF4444' if reason == '路径未注册' else '#F97316'
|
|
||||||
html += f"<li style='color:{color};'>[{reason}] {rel_path}</li>"
|
|
||||||
if len(items) > 50:
|
|
||||||
html += f"<li>... 还有 {len(items) - 50} 个文件</li>"
|
|
||||||
html += "</ul>"
|
|
||||||
|
|
||||||
return UiMessage(title='扫描结果', message=html)
|
|
||||||
|
|||||||
@ -1,66 +1,8 @@
|
|||||||
# 查询指定路径拥有权限的所有角色
|
|
||||||
path = params_kw.get('path', '').strip()
|
path = params_kw.get('path', '').strip()
|
||||||
if not path:
|
|
||||||
return UiError(title='错误', message='请输入路径')
|
|
||||||
if not path.startswith('/'):
|
|
||||||
path = '/' + path
|
|
||||||
|
|
||||||
from appPublic.dictObject import DictObject
|
|
||||||
|
|
||||||
async with get_sor_context(request._run_ns, 'rbac') as sor:
|
async with get_sor_context(request._run_ns, 'rbac') as sor:
|
||||||
# 查找 permission 记录
|
title, message, is_error = await request._run_ns.query_path_roles(sor, path)
|
||||||
perm_recs = await sor.sqlExe("SELECT id, path FROM permission WHERE path=${path}$", {'path': path})
|
|
||||||
if not perm_recs:
|
|
||||||
msg = f"路径 '{path}' 未在 permission 表中注册。"
|
|
||||||
# 尝试模糊匹配
|
|
||||||
like_path = path.rstrip('/') + '/%'
|
|
||||||
like_recs = await sor.sqlExe("SELECT path FROM permission WHERE path LIKE ${lp}$", {'lp': like_path})
|
|
||||||
if like_recs:
|
|
||||||
sub_paths = '<br>'.join([f' {r.path}' for r in like_recs[:10]])
|
|
||||||
if len(like_recs) > 10:
|
|
||||||
sub_paths += f'<br>... 共 {len(like_recs)} 条'
|
|
||||||
msg += f'<br><br>模糊匹配到 {len(like_recs)} 个子路径:<br>{sub_paths}'
|
|
||||||
return UiError(title='未找到', message=msg)
|
|
||||||
|
|
||||||
perm = perm_recs[0]
|
if is_error:
|
||||||
permid = perm.id
|
return UiError(title=title, message=message)
|
||||||
|
return UiMessage(title=title, message=message)
|
||||||
# 查找 rolepermission 关联
|
|
||||||
rp_recs = await sor.sqlExe("SELECT roleid FROM rolepermission WHERE permid=${permid}$", {'permid': permid})
|
|
||||||
if not rp_recs:
|
|
||||||
return UiMessage(title='查询结果', message=f"路径: {path}<br>无任何角色拥有此路径权限。")
|
|
||||||
|
|
||||||
# 查询角色详情
|
|
||||||
role_ids = [r.roleid for r in rp_recs]
|
|
||||||
special_roles = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')]
|
|
||||||
normal_ids = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')]
|
|
||||||
|
|
||||||
rows = []
|
|
||||||
for sp in special_roles:
|
|
||||||
rows.append(f"<tr><td>{sp}</td><td>*</td><td>{sp}</td></tr>")
|
|
||||||
|
|
||||||
if normal_ids:
|
|
||||||
placeholders = ','.join([f'${i}$' for i in range(len(normal_ids))])
|
|
||||||
ns = {f'_{i}': rid for i, rid in enumerate(normal_ids)}
|
|
||||||
role_recs = await sor.sqlExe(f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})", ns)
|
|
||||||
for r in role_recs:
|
|
||||||
name = getattr(r, 'name', r.id)
|
|
||||||
orgtypeid = getattr(r, 'orgtypeid', '*')
|
|
||||||
rows.append(f"<tr><td>{r.id}</td><td>{orgtypeid}</td><td>{name}</td></tr>")
|
|
||||||
|
|
||||||
table_html = (
|
|
||||||
"<table style='border-collapse:collapse;width:100%;'>"
|
|
||||||
"<tr style='background:#334155;color:#fff;'>"
|
|
||||||
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>角色ID</th>"
|
|
||||||
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>orgtypeid</th>"
|
|
||||||
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>名称</th>"
|
|
||||||
"</tr>"
|
|
||||||
+ ''.join(rows) +
|
|
||||||
"</table>"
|
|
||||||
)
|
|
||||||
|
|
||||||
html = f"<p>路径: <b>{path}</b> (perm_id: {permid})</p>"
|
|
||||||
html += f"<p>共 {len(rp_recs)} 个角色有权限:</p>"
|
|
||||||
html += table_html
|
|
||||||
|
|
||||||
return UiMessage(title='查询结果', message=html)
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user