# 查找 wwwroot 下符号链接目录中没有任何角色权限的文件 import os wwwroot = params_kw.get('wwwroot', '').strip() # 定位 wwwroot if not wwwroot: sage_root = os.environ.get('SAGE_ROOT') if sage_root: wwwroot = os.path.join(sage_root, 'wwwroot') else: # 从当前文件路径向上找 wwwroot # 生产: ~/token/sage/wwwroot/rbac/find_unauth_files.dspy # 开发: ~/repos/sage/wwwroot/rbac/find_unauth_files.dspy (通过symlink) # 或: ~/repos/rbac/wwwroot/find_unauth_files.dspy (rbac repo自身) this_file = os.path.abspath(__file__) # 如果路径中包含 /wwwroot/ 段,直接截断 if '/wwwroot/' in this_file: idx = this_file.index('/wwwroot/') wwwroot = this_file[:idx + len('/wwwroot')] else: # 尝试找 wwwroot 子目录 for level in range(5): candidate = this_file for _ in range(level): candidate = os.path.dirname(candidate) if not candidate or candidate == '/': break wr = os.path.join(candidate, 'wwwroot') if os.path.isdir(wr): wwwroot = wr break if not wwwroot: return UiError(title='错误', message='无法自动定位 wwwroot 目录,请通过参数传入 wwwroot 路径,或设置 SAGE_ROOT 环境变量。') wwwroot = os.path.abspath(wwwroot) if not os.path.isdir(wwwroot): return UiError(title='错误', message=f"wwwroot 目录不存在: {wwwroot}") def find_symlink_dirs(root): symlinks = [] for entry in sorted(os.listdir(root)): full = os.path.join(root, entry) if os.path.islink(full) and os.path.isdir(full): target = os.readlink(full) symlinks.append((entry, full, target)) return symlinks def walk_symlink_dirs(root, symlinks): real_root = os.path.realpath(root) visited_real = set() files = [] for name, link_path, target in symlinks: for r, dirs, filenames in os.walk(link_path, followlinks=True): real_r = os.path.realpath(r) if real_r in visited_real: dirs.clear() continue visited_real.add(real_r) dirs[:] = [d for d in dirs if os.path.realpath(os.path.join(r, d)) != real_root] for fname in sorted(filenames): abs_path = os.path.join(r, fname) rel = '/' + os.path.relpath(abs_path, root) files.append((rel, abs_path)) return files symlinks = find_symlink_dirs(wwwroot) if not symlinks: return UiMessage(title='扫描结果', message=f"在 {wwwroot} 中未发现任何符号链接目录。") symlink_info = '
'.join([f" {name}/ → {target}" for name, _, target in symlinks]) all_files = walk_symlink_dirs(wwwroot, symlinks) async with get_sor_context(request._run_ns, 'rbac') as sor: perm_recs = await sor.sqlExe("SELECT id, path FROM permission", {}) path_to_permid = {} for r in perm_recs: path_to_permid[r.path] = r.id rp_recs = await sor.sqlExe("SELECT DISTINCT permid FROM rolepermission", {}) perms_with_roles = set(r.permid for r in rp_recs) unauth_files = [] authed_count = 0 for rel_path, abs_path in all_files: permid = path_to_permid.get(rel_path) if permid is None: unauth_files.append((rel_path, "路径未注册")) elif permid not in perms_with_roles: unauth_files.append((rel_path, "有记录但无角色")) else: authed_count += 1 # 按模块分组 from collections import defaultdict by_module = defaultdict(list) for rel_path, reason in unauth_files: parts = rel_path.strip('/').split('/') module = parts[0] if parts else 'root' by_module[module].append((rel_path, reason)) # 构建 HTML 输出 html = f"

wwwroot: {wwwroot}

" html += f"

符号链接目录 ({len(symlinks)}):
{symlink_info}

" html += f"

总文件数: {len(all_files)} | 已授权: {authed_count} | 未授权: {len(unauth_files)}

" if not unauth_files: html += "

所有文件均有权限覆盖,无需处理。

" else: html += f"

未授权文件 ({len(unauth_files)} 个):

" for module in sorted(by_module.keys()): items = by_module[module] html += f"

{module}/ ({len(items)} 个文件)

" html += "" return UiMessage(title='扫描结果', message=html)