yumoqing 9eccd08ffd feat: pipeline-app独立产线后端服务
3个业务模块:
- pipeline_core: 产线定义(pipelines/steps/versions)
- pipeline_ops: 运营(定价/供应量/使用记录)
- pipeline_dist: 分销(分销商/独立定价/API密钥)

- ahserver独立部署(端口9090)
- 独立数据库pipeline
- 80个文件, 符合module/db-table/crud三规范
2026-06-11 14:49:02 +08:00

90 lines
2.4 KiB
Python

#!/usr/bin/env python3
"""
pipeline_core 模块 RBAC 权限管理脚本
使用方法:
cd ~/repos/sage
./py3/bin/python ~/test/pipeline-app/pipeline_core/scripts/load_path.py
"""
import subprocess
import os
import sys
def find_sage_root():
candidates = [
os.path.expanduser("~/repos/sage"),
os.path.expanduser("~/sage"),
]
for c in candidates:
if os.path.isdir(os.path.join(c, "py3")) and os.path.isdir(os.path.join(c, "wwwroot")):
return c
return None
SAGE_ROOT = find_sage_root()
if not SAGE_ROOT:
print("ERROR: Cannot find Sage root directory")
sys.exit(1)
PYTHON = os.path.join(SAGE_ROOT, "py3", "bin", "python")
SET_PERM_SCRIPT = os.path.join(SAGE_ROOT, "set_role_perm.py")
MOD = "pipeline_core"
# ============================================================
# 权限路径定义
# ============================================================
# operator — 产线管理人员
PATHS_OPERATOR = [
f"/{MOD}",
f"/{MOD}/index.ui",
f"/{MOD}/pipelines/",
f"/{MOD}/pipeline_steps/",
f"/{MOD}/pipeline_versions/",
f"/{MOD}/api/pipelines_create.dspy",
f"/{MOD}/api/pipelines_update.dspy",
f"/{MOD}/api/pipelines_delete.dspy",
f"/{MOD}/api/pipeline_steps_create.dspy",
f"/{MOD}/api/pipeline_steps_update.dspy",
f"/{MOD}/api/pipeline_steps_delete.dspy",
f"/{MOD}/api/pipeline_versions_create.dspy",
f"/{MOD}/api/pipeline_versions_update.dspy",
f"/{MOD}/api/pipeline_versions_delete.dspy",
f"/{MOD}/api/pipeline_publish.dspy",
]
# developer — 开发者(包含所有 operator 路径)
PATHS_DEVELOPER = PATHS_OPERATOR[:]
def run_set_perm(role, path):
cmd = [PYTHON, SET_PERM_SCRIPT, role, path]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
def register_role_paths(role, paths):
count = 0
for p in paths:
if run_set_perm(role, p):
count += 1
print(f" {role}: {count}/{len(paths)} paths registered")
return count
def main():
print(f"Sage root: {SAGE_ROOT}")
print("Registering pipeline_core module RBAC permissions...")
total = 0
total += register_role_paths("operator", PATHS_OPERATOR)
total += register_role_paths("developer", PATHS_DEVELOPER)
print(f"\nDone. Total {total} permission entries registered.")
print("NOTE: Restart Sage after permission changes to reload RBAC cache.")
if __name__ == "__main__":
main()