feat: add RBAC scripts — list_path_roles.py and find_unauth_files.py

This commit is contained in:
yumoqing 2026-05-26 08:55:49 +08:00
parent c5d6ec6c8f
commit 2d2dad84d0
2 changed files with 276 additions and 0 deletions

View File

@ -0,0 +1,184 @@
#!/usr/bin/env python
"""
遍历 wwwroot 下所有通过 ln -s 链接进来的目录
找出没有任何角色拥有权限的文件
用法:
./py3/bin/python find_unauth_files.py [wwwroot_path]
示例:
# 默认 sage/wwwroot
./py3/bin/python find_unauth_files.py
# 指定路径
./py3/bin/python find_unauth_files.py /home/hermesai/repos/sage/wwwroot
说明:
- 只扫描 wwwroot 下直接子目录中是符号链接的目录ln -s 指向的
- 跟随符号链接遍历followlinks=True防环
- 检查每个文件是否在 permission 表中有任意角色关联
- 输出未授权文件清单
注意此脚本只读数据库不做任何修改
"""
import os
import sys
import asyncio
sage_root = os.environ.get('SAGE_ROOT')
if sage_root and sage_root not in sys.path:
sys.path.insert(0, sage_root)
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
def find_symlink_dirs(wwwroot):
"""找出 wwwroot 下直接子目录中是符号链接的目录。"""
symlinks = []
for entry in sorted(os.listdir(wwwroot)):
full = os.path.join(wwwroot, entry)
if os.path.islink(full) and os.path.isdir(full):
target = os.readlink(full)
symlinks.append((entry, full, target))
return symlinks
def walk_symlink_dirs(wwwroot, symlinks):
"""
遍历所有符号链接目录下的文件返回 (relative_path, absolute_path) 列表
relative_path 是相对于 wwwroot 的路径格式如 /harnessed_agent/index.ui
防环跟踪已访问的真实路径
"""
real_wwwroot = os.path.realpath(wwwroot)
visited_real = set()
files = []
for name, link_path, target in symlinks:
for root, dirs, filenames in os.walk(link_path, followlinks=True):
real_root = os.path.realpath(root)
# 防环:回到 wwwroot 自身或其他已访问路径
if real_root in visited_real:
dirs.clear()
continue
visited_real.add(real_root)
# 排除指向 wwwroot 自身的子目录
dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(root, d)) != real_wwwroot]
for fname in sorted(filenames):
abs_path = os.path.join(root, fname)
# 相对于 wwwroot 的路径,带前导 /
rel = '/' + os.path.relpath(abs_path, wwwroot)
files.append((rel, abs_path))
return files
async def main():
wwwroot = sys.argv[1] if len(sys.argv) > 1 else None
if not wwwroot:
# 默认: sage_root/wwwroot
if sage_root:
wwwroot = os.path.join(sage_root, 'wwwroot')
else:
# 尝试从当前目录推断
wwwroot = os.path.join(os.getcwd(), 'wwwroot')
wwwroot = os.path.abspath(wwwroot)
if not os.path.isdir(wwwroot):
print(f"错误: wwwroot 目录不存在: {wwwroot}")
sys.exit(1)
print(f"wwwroot: {wwwroot}")
print(f"SAGE_ROOT: {sage_root or '(未设置)'}")
# 1. 找符号链接目录
symlinks = find_symlink_dirs(wwwroot)
if not symlinks:
print("\n未发现任何符号链接目录。")
sys.exit(0)
print(f"\n发现 {len(symlinks)} 个符号链接目录:")
for name, full, target in symlinks:
print(f" {name}/ -> {target}")
# 2. 遍历所有文件
all_files = walk_symlink_dirs(wwwroot, symlinks)
print(f"\n{len(all_files)} 个文件,开始检查权限...\n")
# 3. 批量加载所有 permission + rolepermission 记录到内存
config = getConfig('.')
db = DBPools(config.databases)
async with db.sqlorContext('sage') as sor:
# 加载所有 permission 记录
perm_recs = await sor.sqlExe("SELECT id, path FROM permission")
# 构建 path -> perm_id 映射
path_to_permid = {}
for r in perm_recs:
path_to_permid[r.path] = r.id
# 加载所有 rolepermission 记录
rp_recs = await sor.sqlExe("SELECT permid FROM rolepermission")
# 构建 perm_id -> 有权限 的标记
perms_with_roles = set()
for r in rp_recs:
perms_with_roles.add(r.permid)
print(f" permission 表共 {len(path_to_permid)} 条记录")
print(f" 有角色关联的 permission 共 {len(perms_with_roles)}")
# 4. 检查每个文件
unauth_files = []
authed_files = []
for rel_path, abs_path in all_files:
permid = path_to_permid.get(rel_path)
if permid is None:
# 路径完全未注册
unauth_files.append((rel_path, abs_path, "路径未注册"))
elif permid not in perms_with_roles:
# 路径有 permission 记录但没有任何角色关联
unauth_files.append((rel_path, abs_path, "有记录但无角色"))
else:
authed_files.append(rel_path)
# 5. 输出结果
print(f"\n{'='*80}")
print(f"结果汇总:")
print(f" 已授权文件: {len(authed_files)}")
print(f" 未授权文件: {len(unauth_files)}")
print(f"{'='*80}")
if not unauth_files:
print("\n所有文件均有权限覆盖,无需处理。")
return
print(f"\n未授权文件清单 ({len(unauth_files)} 个):\n")
# 按模块目录分组
from collections import defaultdict
by_module = defaultdict(list)
for rel_path, abs_path, reason in unauth_files:
# 提取模块名: /harnessed_agent/xxx -> harnessed_agent
parts = rel_path.strip('/').split('/')
module = parts[0] if parts else 'root'
by_module[module].append((rel_path, abs_path, reason))
for module in sorted(by_module.keys()):
items = by_module[module]
print(f"\n--- {module}/ ({len(items)} 个文件) ---")
for rel_path, abs_path, reason in items:
print(f" [{reason}] {rel_path}")
# 也输出纯路径列表(方便管道处理)
print(f"\n{'='*80}")
print("纯路径列表(方便复制):")
for rel_path, abs_path, reason in unauth_files:
print(f" {rel_path}")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@ -0,0 +1,92 @@
#!/usr/bin/env python
"""
列出指定路径拥有权限的所有角色
用法:
./py3/bin/python list_path_roles.py <path>
示例:
./py3/bin/python list_path_roles.py /harnessed_agent/index.ui
./py3/bin/python list_path_roles.py /bricks/css/bricks.css
说明:
- 精确匹配路径
- 如果路径未注册会提示
- 显示角色 idorgtypeidname
"""
import os
import sys
import asyncio
sage_root = os.environ.get('SAGE_ROOT')
if sage_root and sage_root not in sys.path:
sys.path.insert(0, sage_root)
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
async def main():
if len(sys.argv) < 2:
print(f"用法: {sys.argv[0]} <path>")
print(f"示例: {sys.argv[0]} /harnessed_agent/index.ui")
sys.exit(1)
target_path = sys.argv[1]
if not target_path.startswith('/'):
target_path = '/' + target_path
config = getConfig('.')
db = DBPools(config.databases)
async with db.sqlorContext('sage') as sor:
# 1. 查找 permission 记录
perm_recs = await sor.R('permission', {'path': target_path})
if not perm_recs:
print(f"[未找到] 路径 '{target_path}' 未在 permission 表中注册。")
# 尝试 LIKE 模糊匹配
like_path = target_path.rstrip('/') + '/%'
like_recs = await sor.sqlExe("SELECT path FROM permission WHERE path LIKE %s", (like_path,))
if like_recs:
print(f"\n模糊匹配到 {len(like_recs)} 个子路径LIKE '{like_path}'")
for r in like_recs[:20]:
print(f" {r.path}")
if len(like_recs) > 20:
print(f" ... 共 {len(like_recs)}")
sys.exit(0)
perm = perm_recs[0]
print(f"路径: {perm.path}")
print(f"perm_id: {perm.id}")
# 2. 查找 rolepermission 关联
rp_recs = await sor.R('rolepermission', {'permid': perm.id})
if not rp_recs:
print(" [无角色] 该路径没有任何角色权限记录。")
sys.exit(0)
print(f"\n{len(rp_recs)} 个角色有权限:\n")
print(f"{'角色ID':<30} {'orgtypeid':<20} {'名称':<30}")
print("-" * 80)
for rp in rp_recs:
roleid = rp.roleid
# 特殊角色直接显示
if roleid in ('any', 'anonymous', 'logined'):
print(f"{roleid:<30} {'*':<20} {roleid:<30}")
continue
# 普通角色查 role 表
role_recs = await sor.R('role', {'id': roleid})
if role_recs:
r = role_recs[0]
orgtypeid = getattr(r, 'orgtypeid', '*')
name = getattr(r, 'name', roleid)
print(f"{roleid:<30} {orgtypeid:<20} {name:<30}")
else:
print(f"{roleid:<30} {'(未知)':<20} {'(role表中不存在)':<30}")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())