feat: add RBAC tools — list_path_roles and find_unauth_files
This commit is contained in:
parent
fd9ef322c7
commit
c53c16d54c
@ -1,7 +1,17 @@
|
||||
[
|
||||
{
|
||||
"name":"users",
|
||||
"label":"用户管理"
|
||||
"label":"用户管理",
|
||||
"url":"{{entire_url('/rbac/users')}}"
|
||||
},
|
||||
{
|
||||
"name":"list_path_roles",
|
||||
"label":"查询路径权限角色",
|
||||
"url":"{{entire_url('/rbac/list_path_roles.ui')}}"
|
||||
},
|
||||
{
|
||||
"name":"find_unauth_files",
|
||||
"label":"扫描未授权文件",
|
||||
"url":"{{entire_url('/rbac/find_unauth_files.dspy')}}"
|
||||
}
|
||||
]
|
||||
|
||||
113
wwwroot/find_unauth_files.dspy
Normal file
113
wwwroot/find_unauth_files.dspy
Normal file
@ -0,0 +1,113 @@
|
||||
# 查找 wwwroot 下符号链接目录中没有任何角色权限的文件
|
||||
import os
|
||||
|
||||
wwwroot = params_kw.get('wwwroot', '').strip()
|
||||
if not wwwroot:
|
||||
wwwroot = None
|
||||
|
||||
from appPublic.dictObject import DictObject
|
||||
|
||||
# 定位 wwwroot
|
||||
if not wwwroot:
|
||||
# 默认: 当前模块 wwwroot 的父目录 wwwroot
|
||||
# rbac/wwwroot -> 找 sage/wwwroot
|
||||
# 如果 sage_root 环境变量存在
|
||||
sage_root = os.environ.get('SAGE_ROOT')
|
||||
if sage_root:
|
||||
wwwroot = os.path.join(sage_root, 'wwwroot')
|
||||
else:
|
||||
# 尝试从当前文件路径推断: rbac/wwwroot/xxx.dspy -> sage/wwwroot
|
||||
this_file = os.path.abspath(__file__)
|
||||
# 通常在 repos/sage/wwwroot/rbac/find_unauth_files.dspy
|
||||
# wwwroot 是上一层
|
||||
wwwroot = os.path.dirname(os.path.dirname(this_file))
|
||||
|
||||
wwwroot = os.path.abspath(wwwroot)
|
||||
if not os.path.isdir(wwwroot):
|
||||
return UiError(title='错误', message=f"wwwroot 目录不存在: {wwwroot}")
|
||||
|
||||
def find_symlink_dirs(root):
|
||||
symlinks = []
|
||||
for entry in sorted(os.listdir(root)):
|
||||
full = os.path.join(root, entry)
|
||||
if os.path.islink(full) and os.path.isdir(full):
|
||||
target = os.readlink(full)
|
||||
symlinks.append((entry, full, target))
|
||||
return symlinks
|
||||
|
||||
def walk_symlink_dirs(root, symlinks):
|
||||
real_root = os.path.realpath(root)
|
||||
visited_real = set()
|
||||
files = []
|
||||
for name, link_path, target in symlinks:
|
||||
for r, dirs, filenames in os.walk(link_path, followlinks=True):
|
||||
real_r = os.path.realpath(r)
|
||||
if real_r in visited_real:
|
||||
dirs.clear()
|
||||
continue
|
||||
visited_real.add(real_r)
|
||||
dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root]
|
||||
for fname in sorted(filenames):
|
||||
abs_path = os.path.join(r, fname)
|
||||
rel = '/' + os.path.relpath(abs_path, root)
|
||||
files.append((rel, abs_path))
|
||||
return files
|
||||
|
||||
symlinks = find_symlink_dirs(wwwroot)
|
||||
if not symlinks:
|
||||
return UiMessage(title='扫描结果', message=f"在 {wwwroot} 中未发现任何符号链接目录。")
|
||||
|
||||
symlink_info = '<br>'.join([f" {name}/ → {target}" for name, _, target in symlinks])
|
||||
|
||||
all_files = walk_symlink_dirs(wwwroot, symlinks)
|
||||
|
||||
async with get_sor_context(request._run_ns, 'rbac') as sor:
|
||||
perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {})
|
||||
path_to_permid = {}
|
||||
for r in perm_recs:
|
||||
path_to_permid[r.path] = r.id
|
||||
|
||||
rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {})
|
||||
perms_with_roles = set(r.permid for r in rp_recs)
|
||||
|
||||
unauth_files = []
|
||||
authed_count = 0
|
||||
|
||||
for rel_path, abs_path in all_files:
|
||||
permid = path_to_permid.get(rel_path)
|
||||
if permid is None:
|
||||
unauth_files.append((rel_path, "路径未注册"))
|
||||
elif permid not in perms_with_roles:
|
||||
unauth_files.append((rel_path, "有记录但无角色"))
|
||||
else:
|
||||
authed_count += 1
|
||||
|
||||
# 按模块分组
|
||||
from collections import defaultdict
|
||||
by_module = defaultdict(list)
|
||||
for rel_path, reason in unauth_files:
|
||||
parts = rel_path.strip('/').split('/')
|
||||
module = parts[0] if parts else 'root'
|
||||
by_module[module].append((rel_path, reason))
|
||||
|
||||
# 构建 HTML 输出
|
||||
html = f"<p><b>wwwroot:</b> {wwwroot}</p>"
|
||||
html += f"<p><b>符号链接目录 ({len(symlinks)}):</b><br>{symlink_info}</p>"
|
||||
html += f"<p>总文件数: {len(all_files)} | 已授权: {authed_count} | 未授权: {len(unauth_files)}</p>"
|
||||
|
||||
if not unauth_files:
|
||||
html += "<p style='color:#22C55E;font-weight:bold;'>所有文件均有权限覆盖,无需处理。</p>"
|
||||
else:
|
||||
html += f"<p style='color:#EF4444;font-weight:bold;'>未授权文件 ({len(unauth_files)} 个):</p>"
|
||||
for module in sorted(by_module.keys()):
|
||||
items = by_module[module]
|
||||
html += f"<h4 style='color:#FBBF24;'>{module}/ ({len(items)} 个文件)</h4>"
|
||||
html += "<ul style='font-family:monospace;font-size:12px;'>"
|
||||
for rel_path, reason in items[:50]:
|
||||
color = '#EF4444' if reason == '路径未注册' else '#F97316'
|
||||
html += f"<li style='color:{color};'>[{reason}] {rel_path}</li>"
|
||||
if len(items) > 50:
|
||||
html += f"<li>... 还有 {len(items) - 50} 个文件</li>"
|
||||
html += "</ul>"
|
||||
|
||||
return UiMessage(title='扫描结果', message=html)
|
||||
66
wwwroot/list_path_roles.dspy
Normal file
66
wwwroot/list_path_roles.dspy
Normal file
@ -0,0 +1,66 @@
|
||||
# 查询指定路径拥有权限的所有角色
|
||||
path = params_kw.get('path', '').strip()
|
||||
if not path:
|
||||
return UiError(title='错误', message='请输入路径')
|
||||
if not path.startswith('/'):
|
||||
path = '/' + path
|
||||
|
||||
from appPublic.dictObject import DictObject
|
||||
|
||||
async with get_sor_context(request._run_ns, 'rbac') as sor:
|
||||
# 查找 permission 记录
|
||||
perm_recs = await sor.sqlExe("SELECT id, path FROM permission WHERE path=${path}$", {'path': path})
|
||||
if not perm_recs:
|
||||
msg = f"路径 '{path}' 未在 permission 表中注册。"
|
||||
# 尝试模糊匹配
|
||||
like_path = path.rstrip('/') + '/%'
|
||||
like_recs = await sor.sqlExe("SELECT path FROM permission WHERE path LIKE ${lp}$", {'lp': like_path})
|
||||
if like_recs:
|
||||
sub_paths = '<br>'.join([f' {r.path}' for r in like_recs[:10]])
|
||||
if len(like_recs) > 10:
|
||||
sub_paths += f'<br>... 共 {len(like_recs)} 条'
|
||||
msg += f'<br><br>模糊匹配到 {len(like_recs)} 个子路径:<br>{sub_paths}'
|
||||
return UiError(title='未找到', message=msg)
|
||||
|
||||
perm = perm_recs[0]
|
||||
permid = perm.id
|
||||
|
||||
# 查找 rolepermission 关联
|
||||
rp_recs = await sor.sqlExe("SELECT roleid FROM rolepermission WHERE permid=${permid}$", {'permid': permid})
|
||||
if not rp_recs:
|
||||
return UiMessage(title='查询结果', message=f"路径: {path}<br>无任何角色拥有此路径权限。")
|
||||
|
||||
# 查询角色详情
|
||||
role_ids = [r.roleid for r in rp_recs]
|
||||
special_roles = [rid for rid in role_ids if rid in ('any', 'anonymous', 'logined')]
|
||||
normal_ids = [rid for rid in role_ids if rid not in ('any', 'anonymous', 'logined')]
|
||||
|
||||
rows = []
|
||||
for sp in special_roles:
|
||||
rows.append(f"<tr><td>{sp}</td><td>*</td><td>{sp}</td></tr>")
|
||||
|
||||
if normal_ids:
|
||||
placeholders = ','.join([f'${i}$' for i in range(len(normal_ids))])
|
||||
ns = {f'_{i}': rid for i, rid in enumerate(normal_ids)}
|
||||
role_recs = await sor.sqlExe(f"SELECT id, orgtypeid, name FROM role WHERE id IN ({placeholders})", ns)
|
||||
for r in role_recs:
|
||||
name = getattr(r, 'name', r.id)
|
||||
orgtypeid = getattr(r, 'orgtypeid', '*')
|
||||
rows.append(f"<tr><td>{r.id}</td><td>{orgtypeid}</td><td>{name}</td></tr>")
|
||||
|
||||
table_html = (
|
||||
"<table style='border-collapse:collapse;width:100%;'>"
|
||||
"<tr style='background:#334155;color:#fff;'>"
|
||||
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>角色ID</th>"
|
||||
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>orgtypeid</th>"
|
||||
"<th style='padding:6px 12px;text-align:left;border:1px solid #475569;'>名称</th>"
|
||||
"</tr>"
|
||||
+ ''.join(rows) +
|
||||
"</table>"
|
||||
)
|
||||
|
||||
html = f"<p>路径: <b>{path}</b> (perm_id: {permid})</p>"
|
||||
html += f"<p>共 {len(rp_recs)} 个角色有权限:</p>"
|
||||
html += table_html
|
||||
|
||||
return UiMessage(title='查询结果', message=html)
|
||||
27
wwwroot/list_path_roles.ui
Normal file
27
wwwroot/list_path_roles.ui
Normal file
@ -0,0 +1,27 @@
|
||||
{
|
||||
"widgettype": "ModalForm",
|
||||
"options": {
|
||||
"cwidth": 20,
|
||||
"cheight": 15,
|
||||
"title": "查询路径权限角色",
|
||||
"fields": [
|
||||
{
|
||||
"name": "path",
|
||||
"label": "路径",
|
||||
"uitype": "str",
|
||||
"placeholder": "如 /harnessed_agent/index.ui"
|
||||
}
|
||||
]
|
||||
},
|
||||
"binds": [
|
||||
{
|
||||
"wid": "self",
|
||||
"event": "submit",
|
||||
"actiontype": "urlwidget",
|
||||
"target": "root.page_center",
|
||||
"options": {
|
||||
"url": "{{entire_url('./list_path_roles.dspy')}}"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user