sage/scripts/find_unauth_files.py

185 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
"""
遍历 wwwroot 下所有通过 ln -s 链接进来的目录,
找出没有任何角色拥有权限的文件。
用法:
./py3/bin/python find_unauth_files.py [wwwroot_path]
示例:
# 默认 sage/wwwroot
./py3/bin/python find_unauth_files.py
# 指定路径
./py3/bin/python find_unauth_files.py /home/hermesai/repos/sage/wwwroot
说明:
- 只扫描 wwwroot 下直接子目录中是符号链接的目录ln -s 指向的)
- 跟随符号链接遍历followlinks=True防环
- 检查每个文件是否在 permission 表中有任意角色关联
- 输出未授权文件清单
注意:此脚本只读数据库,不做任何修改。
"""
import os
import sys
import asyncio
sage_root = os.environ.get('SAGE_ROOT')
if sage_root and sage_root not in sys.path:
sys.path.insert(0, sage_root)
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
def find_symlink_dirs(wwwroot):
"""找出 wwwroot 下直接子目录中是符号链接的目录。"""
symlinks = []
for entry in sorted(os.listdir(wwwroot)):
full = os.path.join(wwwroot, entry)
if os.path.islink(full) and os.path.isdir(full):
target = os.readlink(full)
symlinks.append((entry, full, target))
return symlinks
def walk_symlink_dirs(wwwroot, symlinks):
"""
遍历所有符号链接目录下的文件,返回 (relative_path, absolute_path) 列表。
relative_path 是相对于 wwwroot 的路径,格式如 /harnessed_agent/index.ui
防环:跟踪已访问的真实路径。
"""
real_wwwroot = os.path.realpath(wwwroot)
visited_real = set()
files = []
for name, link_path, target in symlinks:
for root, dirs, filenames in os.walk(link_path, followlinks=True):
real_root = os.path.realpath(root)
# 防环:回到 wwwroot 自身或其他已访问路径
if real_root in visited_real:
dirs.clear()
continue
visited_real.add(real_root)
# 排除指向 wwwroot 自身的子目录
dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(root, d)) != real_wwwroot]
for fname in sorted(filenames):
abs_path = os.path.join(root, fname)
# 相对于 wwwroot 的路径,带前导 /
rel = '/' + os.path.relpath(abs_path, wwwroot)
files.append((rel, abs_path))
return files
async def main():
wwwroot = sys.argv[1] if len(sys.argv) > 1 else None
if not wwwroot:
# 默认: sage_root/wwwroot
if sage_root:
wwwroot = os.path.join(sage_root, 'wwwroot')
else:
# 尝试从当前目录推断
wwwroot = os.path.join(os.getcwd(), 'wwwroot')
wwwroot = os.path.abspath(wwwroot)
if not os.path.isdir(wwwroot):
print(f"错误: wwwroot 目录不存在: {wwwroot}")
sys.exit(1)
print(f"wwwroot: {wwwroot}")
print(f"SAGE_ROOT: {sage_root or '(未设置)'}")
# 1. 找符号链接目录
symlinks = find_symlink_dirs(wwwroot)
if not symlinks:
print("\n未发现任何符号链接目录。")
sys.exit(0)
print(f"\n发现 {len(symlinks)} 个符号链接目录:")
for name, full, target in symlinks:
print(f" {name}/ -> {target}")
# 2. 遍历所有文件
all_files = walk_symlink_dirs(wwwroot, symlinks)
print(f"\n{len(all_files)} 个文件,开始检查权限...\n")
# 3. 批量加载所有 permission + rolepermission 记录到内存
config = getConfig('.')
db = DBPools(config.databases)
async with db.sqlorContext('sage') as sor:
# 加载所有 permission 记录
perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {})
# 构建 path -> perm_id 映射
path_to_permid = {}
for r in perm_recs:
path_to_permid[r.path] = r.id
# 加载所有 rolepermission 记录
rp_recs = await sor.sqlExe("SELECT permid FROM rolepermission", {})
# 构建 perm_id -> 有权限 的标记
perms_with_roles = set()
for r in rp_recs:
perms_with_roles.add(r.permid)
print(f" permission 表共 {len(path_to_permid)} 条记录")
print(f" 有角色关联的 permission 共 {len(perms_with_roles)}")
# 4. 检查每个文件
unauth_files = []
authed_files = []
for rel_path, abs_path in all_files:
permid = path_to_permid.get(rel_path)
if permid is None:
# 路径完全未注册
unauth_files.append((rel_path, abs_path, "路径未注册"))
elif permid not in perms_with_roles:
# 路径有 permission 记录但没有任何角色关联
unauth_files.append((rel_path, abs_path, "有记录但无角色"))
else:
authed_files.append(rel_path)
# 5. 输出结果
print(f"\n{'='*80}")
print(f"结果汇总:")
print(f" 已授权文件: {len(authed_files)}")
print(f" 未授权文件: {len(unauth_files)}")
print(f"{'='*80}")
if not unauth_files:
print("\n所有文件均有权限覆盖,无需处理。")
return
print(f"\n未授权文件清单 ({len(unauth_files)} 个):\n")
# 按模块目录分组
from collections import defaultdict
by_module = defaultdict(list)
for rel_path, abs_path, reason in unauth_files:
# 提取模块名: /harnessed_agent/xxx -> harnessed_agent
parts = rel_path.strip('/').split('/')
module = parts[0] if parts else 'root'
by_module[module].append((rel_path, abs_path, reason))
for module in sorted(by_module.keys()):
items = by_module[module]
print(f"\n--- {module}/ ({len(items)} 个文件) ---")
for rel_path, abs_path, reason in items:
print(f" [{reason}] {rel_path}")
# 也输出纯路径列表(方便管道处理)
print(f"\n{'='*80}")
print("纯路径列表(方便复制):")
for rel_path, abs_path, reason in unauth_files:
print(f" {rel_path}")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())