157 lines
5.5 KiB
Python
157 lines
5.5 KiB
Python
"""
|
||
CMS RBAC权限初始化 — any (匿名) 角色
|
||
自动扫描 wwwroot 和 bricks 下所有文件,授予 any 角色权限
|
||
|
||
规则:
|
||
- wwwroot/* → /<file>
|
||
- wwwroot/dingdingflow/* → /dingdingflow/<file>
|
||
- bricks/* → /bricks/<file>
|
||
- 排除: .pyc, __pycache__, .git, 指向其他模块的符号链接
|
||
|
||
用法: cd ~/repos/cms && py3/bin/python init_any_permissions.py
|
||
"""
|
||
import os, sys, subprocess, re
|
||
|
||
def find_app_root():
|
||
return os.path.dirname(os.path.abspath(__file__))
|
||
|
||
app_root = find_app_root()
|
||
sage_root = None
|
||
for c in [os.path.expanduser("~/repos/sage"), os.path.expanduser("~/sage")]:
|
||
if os.path.isdir(os.path.join(c, "py3", "bin")):
|
||
sage_root = c
|
||
break
|
||
if not sage_root:
|
||
print("ERROR: 找不到Sage,无法初始化权限")
|
||
sys.exit(1)
|
||
|
||
py = os.path.join(sage_root, "py3", "bin", "python")
|
||
sp = os.path.join(sage_root, "set_role_perm.py")
|
||
if not os.path.exists(sp):
|
||
print("ERROR: 找不到set_role_perm.py")
|
||
sys.exit(1)
|
||
|
||
SKIP_DIRS = {".git", "__pycache__", "node_modules", ".svn"}
|
||
SKIP_EXTS = {".pyc", ".pyo", ".swp", ".swo", ".bak", ".orig", ".log", ".pid", ".lock"}
|
||
|
||
def scan_wwwroot(wwwroot_dir, url_prefix):
|
||
"""扫描wwwroot目录下所有文件,返回URL路径列表"""
|
||
paths = []
|
||
if not os.path.isdir(wwwroot_dir):
|
||
return paths
|
||
for root, dirs, files in os.walk(wwwroot_dir):
|
||
# 排除不需要扫描的目录
|
||
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
|
||
for f in sorted(files):
|
||
# 跳过不需要的文件
|
||
_, ext = os.path.splitext(f)
|
||
if ext.lower() in SKIP_EXTS:
|
||
continue
|
||
if f.startswith("."):
|
||
continue
|
||
# 计算相对路径
|
||
rel_path = os.path.relpath(os.path.join(root, f), wwwroot_dir)
|
||
# 检查是否是符号链接指向其他模块
|
||
full_path = os.path.join(root, f)
|
||
if os.path.islink(full_path):
|
||
link_target = os.path.realpath(full_path)
|
||
# 如果链接目标不在CMS目录下,跳过
|
||
if not link_target.startswith(app_root):
|
||
print(f" SKIP (外部链接): {rel_path} -> {link_target}")
|
||
continue
|
||
url = url_prefix + "/" + rel_path.replace(os.sep, "/")
|
||
paths.append(url)
|
||
return paths
|
||
|
||
def scan_bricks(bricks_dir):
|
||
"""扫描bricks目录下所有文件,返回URL路径列表"""
|
||
paths = []
|
||
if not os.path.isdir(bricks_dir):
|
||
return paths
|
||
# 检查bricks是否为符号链接
|
||
real_bricks = os.path.realpath(bricks_dir)
|
||
for root, dirs, files in os.walk(bricks_dir):
|
||
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
|
||
for f in sorted(files):
|
||
_, ext = os.path.splitext(f)
|
||
if ext.lower() in SKIP_EXTS:
|
||
continue
|
||
if f.startswith("."):
|
||
continue
|
||
full_path = os.path.join(root, f)
|
||
# 跳过指向其他模块的符号链接
|
||
if os.path.islink(full_path):
|
||
link_target = os.path.realpath(full_path)
|
||
if not link_target.startswith(real_bricks) and not link_target.startswith(app_root):
|
||
continue
|
||
rel_path = os.path.relpath(full_path, bricks_dir)
|
||
url = "/bricks/" + rel_path.replace(os.sep, "/")
|
||
paths.append(url)
|
||
return paths
|
||
|
||
def set_any_perms(paths):
|
||
"""为路径列表设置any权限"""
|
||
count = 0
|
||
env = os.environ.copy()
|
||
env['SAGE_RBAC_DB'] = 'ocai_cms'
|
||
for p in paths:
|
||
result = subprocess.run(
|
||
[py, sp, "any", p],
|
||
cwd=sage_root,
|
||
capture_output=True,
|
||
text=True,
|
||
env=env
|
||
)
|
||
status = "✓" if result.returncode == 0 else "✗"
|
||
print(f" {status} any {p}")
|
||
count += 1
|
||
return count
|
||
|
||
print("=== CMS RBAC权限初始化 — any (匿名访问) ===")
|
||
print(f"Sage: {sage_root}")
|
||
print()
|
||
|
||
# 1. wwwroot/ 根目录文件 → / (排除dingdingflow子目录)
|
||
wwwroot_root = os.path.join(app_root, "wwwroot")
|
||
root_paths = []
|
||
if os.path.isdir(wwwroot_root):
|
||
for f in sorted(os.listdir(wwwroot_root)):
|
||
fpath = os.path.join(wwwroot_root, f)
|
||
if os.path.isfile(fpath) and not f.startswith("."):
|
||
_, ext = os.path.splitext(f)
|
||
if ext.lower() not in SKIP_EXTS:
|
||
root_paths.append("/" + f)
|
||
# api 子目录
|
||
api_dir = os.path.join(wwwroot_root, "api")
|
||
if os.path.isdir(api_dir):
|
||
for f in sorted(os.listdir(api_dir)):
|
||
fpath = os.path.join(api_dir, f)
|
||
if os.path.isfile(fpath) and not f.startswith("."):
|
||
_, ext = os.path.splitext(f)
|
||
if ext.lower() not in SKIP_EXTS:
|
||
root_paths.append("/api/" + f)
|
||
|
||
print(f"--- wwwroot/ → / ({len(root_paths)} 个文件) ---")
|
||
n1 = set_any_perms(root_paths)
|
||
|
||
# 2. wwwroot/dingdingflow/ → /dingdingflow/
|
||
dd_paths = scan_wwwroot(
|
||
os.path.join(app_root, "wwwroot", "dingdingflow"),
|
||
"/dingdingflow"
|
||
)
|
||
print(f"\n--- wwwroot/dingdingflow/ → /dingdingflow ({len(dd_paths)} 个文件) ---")
|
||
n2 = set_any_perms(dd_paths)
|
||
|
||
# 3. bricks → /bricks
|
||
bricks_dir = os.path.join(app_root, "bricks")
|
||
bricks_paths = scan_bricks(bricks_dir)
|
||
if bricks_paths:
|
||
print(f"\n--- bricks → /bricks ({len(bricks_paths)} 个文件) ---")
|
||
n3 = set_any_perms(bricks_paths)
|
||
else:
|
||
n3 = 0
|
||
print(f"\n--- bricks → /bricks (目录不存在或未构建,跳过) ---")
|
||
|
||
total = n1 + n2 + n3
|
||
print(f"\n=== 完成: 共设置 {total} 个any权限 ===")
|