remove app/init_permissions.py, keep only root init_permissions.py
This commit is contained in:
parent
b4785d49ab
commit
010ab6e354
@ -1,427 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Integrated CRM - RBAC 权限初始化脚本
|
|
||||||
|
|
||||||
功能:
|
|
||||||
1. 遍历 wwwroot 下所有文件(含子目录和 ln -s 链接目录),将路径写入 permission 表
|
|
||||||
2. 为各机构的角色设置每个 path 的权限(通过 orgtypeid='*' 通配所有机构)
|
|
||||||
3. 初始化 admin_superuser 用户(用户名 super,密码 Kyy@123456)
|
|
||||||
|
|
||||||
依赖 RBAC 通配机制:
|
|
||||||
- 用户角色会展开为 orgtypeid.name、orgtypeid.*、*.name 三种 key
|
|
||||||
- 授权给 orgtypeid='*' 的角色,等于授权给所有机构的同名角色
|
|
||||||
- *.admin_superuser 匹配所有机构的 admin_superuser 角色
|
|
||||||
|
|
||||||
使用方法:
|
|
||||||
cd ~/repos/integrated_crm_app
|
|
||||||
source py3/bin/activate
|
|
||||||
python app/init_permissions.py
|
|
||||||
|
|
||||||
部署流程:
|
|
||||||
1. 停止应用:pkill -f integrated_crm_app.py
|
|
||||||
2. 执行: python app/init_permissions.py
|
|
||||||
3. 启动应用:nohup python app/integrated_crm_app.py --port 8080 &
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import importlib.util
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 配置
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
_script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
||||||
# 脚本可能在项目根目录或 app/ 子目录,统一指向项目根
|
|
||||||
if os.path.basename(_script_dir) == 'app':
|
|
||||||
APP_ROOT = os.path.dirname(_script_dir)
|
|
||||||
else:
|
|
||||||
APP_ROOT = _script_dir
|
|
||||||
sys.path.insert(0, APP_ROOT)
|
|
||||||
|
|
||||||
# perm_config.py 路径
|
|
||||||
PERM_CONFIG_PATH = os.path.join(APP_ROOT, "perm_config.py")
|
|
||||||
|
|
||||||
# wwwroot 根目录(含符号链接子目录)
|
|
||||||
WWWROOT_BASE = os.path.join(APP_ROOT, "wwwroot")
|
|
||||||
|
|
||||||
# admin_superuser 初始账号密码
|
|
||||||
ADMIN_USERNAME = "super"
|
|
||||||
ADMIN_PASSWORD = "Kyy@123456"
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 加载 perm_config
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
def load_perm_config():
|
|
||||||
spec = importlib.util.spec_from_file_location("perm_config", PERM_CONFIG_PATH)
|
|
||||||
mod = importlib.util.module_from_spec(spec)
|
|
||||||
spec.loader.exec_module(mod)
|
|
||||||
return mod
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 遍历 wwwroot 所有文件(含 ln -s 链接目录)
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
def scan_wwwroot(wwwroot_path):
|
|
||||||
"""
|
|
||||||
遍历 wwwroot 目录下所有文件(递归,跟随符号链接)。
|
|
||||||
返回 URL 路径列表,如 ['/login.ui', '/customer_management/customer_list.ui']。
|
|
||||||
|
|
||||||
关键:
|
|
||||||
1. os.walk 默认不跟随符号链接,需要 followlinks=True
|
|
||||||
2. 排除 self-referencing symlinks(如 main -> wwwroot)防止死循环
|
|
||||||
3. /main 前缀由 config.json paths 映射产生,此处返回相对路径即可,
|
|
||||||
脚本会自动注册 /xxx 和 /main/xxx 双路径。
|
|
||||||
"""
|
|
||||||
paths = []
|
|
||||||
if not os.path.isdir(wwwroot_path):
|
|
||||||
print(f" [WARN] wwwroot not found: {wwwroot_path}")
|
|
||||||
return paths
|
|
||||||
|
|
||||||
real_wwwroot = os.path.realpath(wwwroot_path)
|
|
||||||
visited_real = set()
|
|
||||||
|
|
||||||
for root, dirs, files in os.walk(wwwroot_path, followlinks=True):
|
|
||||||
real_root = os.path.realpath(root)
|
|
||||||
if real_root in visited_real:
|
|
||||||
dirs.clear() # 防止进入循环目录
|
|
||||||
continue
|
|
||||||
visited_real.add(real_root)
|
|
||||||
|
|
||||||
# 过滤掉指向 wwwroot 自身的符号链接目录
|
|
||||||
safe_dirs = []
|
|
||||||
for d in dirs:
|
|
||||||
real_d = os.path.realpath(os.path.join(root, d))
|
|
||||||
if real_d != real_wwwroot and real_d not in visited_real:
|
|
||||||
safe_dirs.append(d)
|
|
||||||
dirs[:] = safe_dirs
|
|
||||||
|
|
||||||
for f in files:
|
|
||||||
# 注册 .ui, .dspy, .js, .css 文件的权限
|
|
||||||
if not (f.endswith('.ui') or f.endswith('.dspy') or f.endswith('.js') or f.endswith('.css')):
|
|
||||||
continue
|
|
||||||
full = os.path.join(root, f)
|
|
||||||
rel = os.path.relpath(full, wwwroot_path)
|
|
||||||
url_path = '/' + rel.replace(os.sep, '/')
|
|
||||||
paths.append(url_path)
|
|
||||||
|
|
||||||
return sorted(paths)
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 数据库操作(sqlor CRUD)
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
async def ensure_permission(sor, path, permtype='page'):
|
|
||||||
"""确保 permission 记录存在,返回 permission ID。"""
|
|
||||||
recs = await sor.R('permission', {'path': path})
|
|
||||||
if recs:
|
|
||||||
return recs[0].id
|
|
||||||
|
|
||||||
from appPublic.uniqueID import getID
|
|
||||||
permid = getID()
|
|
||||||
await sor.C('permission', {
|
|
||||||
'id': permid,
|
|
||||||
'path': path,
|
|
||||||
'permtype': permtype,
|
|
||||||
'parentid': '',
|
|
||||||
'description': f'Auto-generated for {path}',
|
|
||||||
})
|
|
||||||
return permid
|
|
||||||
|
|
||||||
|
|
||||||
async def ensure_role(sor, role_id, name, desc='', orgtypeid='*'):
|
|
||||||
"""
|
|
||||||
确保角色存在。
|
|
||||||
匹配顺序:中文名 -> 英文ID -> 创建新角色。
|
|
||||||
默认 orgtypeid='*' 表示通配所有机构。
|
|
||||||
"""
|
|
||||||
# 按中文名匹配
|
|
||||||
recs = await sor.R('role', {'name': name})
|
|
||||||
if recs:
|
|
||||||
return recs[0].id
|
|
||||||
|
|
||||||
# 按英文ID匹配
|
|
||||||
recs = await sor.R('role', {'id': role_id})
|
|
||||||
if recs:
|
|
||||||
return recs[0].id
|
|
||||||
|
|
||||||
# 创建新角色
|
|
||||||
from appPublic.uniqueID import getID
|
|
||||||
rid = role_id
|
|
||||||
await sor.C('role', {
|
|
||||||
'id': rid,
|
|
||||||
'orgtypeid': orgtypeid,
|
|
||||||
'name': name,
|
|
||||||
'role_name': role_id,
|
|
||||||
'description': desc,
|
|
||||||
})
|
|
||||||
print(f" [CREATED] role: {role_id} ({name}) orgtypeid={orgtypeid} id={rid}")
|
|
||||||
return rid
|
|
||||||
|
|
||||||
|
|
||||||
async def grant_perm(sor, role_id, perm_id):
|
|
||||||
"""授予角色权限(幂等)。"""
|
|
||||||
existing = await sor.R('rolepermission', {'roleid': role_id, 'permid': perm_id})
|
|
||||||
if not existing:
|
|
||||||
from appPublic.uniqueID import getID
|
|
||||||
await sor.C('rolepermission', {
|
|
||||||
'id': getID(),
|
|
||||||
'roleid': role_id,
|
|
||||||
'permid': perm_id,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
def dual_paths(path):
|
|
||||||
"""返回路径列表。只使用原始路径,不加 /main 前缀。"""
|
|
||||||
# 如果路径以 /main 开头,去掉 /main 前缀(统一使用无前缀路径)
|
|
||||||
if path.startswith('/main'):
|
|
||||||
cleaned = path[5:] # '/main/xxx' -> '/xxx'
|
|
||||||
if not cleaned:
|
|
||||||
cleaned = '/'
|
|
||||||
return [cleaned]
|
|
||||||
return [path]
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 构建路径->角色映射
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
def build_path_role_map(perm_config, all_files):
|
|
||||||
"""
|
|
||||||
根据 perm_config.PERMISSION_MATRIX 和扫描到的文件列表,
|
|
||||||
构建 {url_path: [role_ids]} 映射。
|
|
||||||
|
|
||||||
对于非通配路径(精确路径),如果该路径在 all_files 中,加入映射。
|
|
||||||
对于通配路径(/module/**),展开到该目录下所有匹配文件。
|
|
||||||
"""
|
|
||||||
path_roles = {} # path -> set of role_ids
|
|
||||||
|
|
||||||
for _section, entries in perm_config.PERMISSION_MATRIX.items():
|
|
||||||
for pattern, roles in entries.items():
|
|
||||||
if '**' in pattern:
|
|
||||||
# 通配模式:展开到 wwwroot 下匹配的文件
|
|
||||||
base = pattern.replace('/**', '').lstrip('/')
|
|
||||||
prefix = '/' + base if base else ''
|
|
||||||
for fp in all_files:
|
|
||||||
if fp.startswith(prefix):
|
|
||||||
if fp not in path_roles:
|
|
||||||
path_roles[fp] = set()
|
|
||||||
path_roles[fp].update(roles)
|
|
||||||
else:
|
|
||||||
# 精确路径:如果该文件存在,加入映射
|
|
||||||
if pattern in all_files:
|
|
||||||
if pattern not in path_roles:
|
|
||||||
path_roles[pattern] = set()
|
|
||||||
path_roles[pattern].update(roles)
|
|
||||||
|
|
||||||
return {k: list(v) for k, v in path_roles.items()}
|
|
||||||
|
|
||||||
|
|
||||||
# ============================================================
|
|
||||||
# 主流程
|
|
||||||
# ============================================================
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
print("=" * 65)
|
|
||||||
print("Integrated CRM - RBAC 权限初始化")
|
|
||||||
print("=" * 65)
|
|
||||||
|
|
||||||
# [1] 加载配置
|
|
||||||
pc = load_perm_config()
|
|
||||||
print(f"\n[1/6] 加载 perm_config.py")
|
|
||||||
print(f" 角色定义: {len(pc.ROLES)}")
|
|
||||||
print(f" 权限配置段: {len(pc.PERMISSION_MATRIX)}")
|
|
||||||
|
|
||||||
# [2] 扫描 wwwroot 所有文件
|
|
||||||
all_files = scan_wwwroot(WWWROOT_BASE)
|
|
||||||
print(f"\n[2/6] 扫描 wwwroot 文件(含符号链接): {len(all_files)} 个")
|
|
||||||
if all_files:
|
|
||||||
print(f" 示例: {all_files[0]}")
|
|
||||||
if len(all_files) > 1:
|
|
||||||
print(f" ...")
|
|
||||||
print(f" {all_files[-1]}")
|
|
||||||
|
|
||||||
# [3] 构建 路径->角色 映射
|
|
||||||
path_role_map = build_path_role_map(pc, all_files)
|
|
||||||
print(f"\n[3/6] 路径-角色映射: {len(path_role_map)} 个路径需要授权")
|
|
||||||
|
|
||||||
# [4] 连接数据库:用 config.json 中的 databases 初始化 DBPools
|
|
||||||
from appPublic.jsonConfig import getConfig
|
|
||||||
from sqlor.dbpools import DBPools
|
|
||||||
config = getConfig('.')
|
|
||||||
DBPools(config.databases)
|
|
||||||
db = DBPools()
|
|
||||||
# 获取数据库名(config.json databases 中定义的第一个/默认数据库名)
|
|
||||||
dbname = list(config.databases.keys())[0]
|
|
||||||
print(f"\n[4/6] 连接数据库 {dbname}...")
|
|
||||||
|
|
||||||
async with db.sqlorContext(dbname) as sor:
|
|
||||||
|
|
||||||
# ---- 4a: 创建/验证所有角色 ----
|
|
||||||
# 关键:所有角色使用 orgtypeid='*' 创建,配合 RBAC 通配机制
|
|
||||||
print(f"\n [4a] 注册角色(orgtypeid='*' 通配所有机构)...")
|
|
||||||
role_ids = {}
|
|
||||||
for rid, (name, desc) in pc.ROLES.items():
|
|
||||||
role_ids[rid] = await ensure_role(sor, rid, name, desc, orgtypeid='*')
|
|
||||||
|
|
||||||
# ---- 4b: 注册 permission + 授权 ----
|
|
||||||
print(f"\n [4b] 注册权限并授权...")
|
|
||||||
perm_count = 0
|
|
||||||
grant_count = 0
|
|
||||||
error_count = 0
|
|
||||||
|
|
||||||
for path, roles in path_role_map.items():
|
|
||||||
for p in dual_paths(path):
|
|
||||||
try:
|
|
||||||
permid = await ensure_permission(sor, p, permtype='page')
|
|
||||||
perm_count += 1
|
|
||||||
|
|
||||||
for rn in roles:
|
|
||||||
if rn in ('any', 'logined'):
|
|
||||||
continue
|
|
||||||
if rn not in role_ids:
|
|
||||||
print(f" [WARN] 未知角色: {rn} (path: {p})")
|
|
||||||
error_count += 1
|
|
||||||
continue
|
|
||||||
await grant_perm(sor, role_ids[rn], permid)
|
|
||||||
grant_count += 1
|
|
||||||
except Exception as e:
|
|
||||||
print(f" [ERROR] {p}: {e}")
|
|
||||||
error_count += 1
|
|
||||||
|
|
||||||
print(f" 注册权限: {perm_count}, 授权次数: {grant_count}, 错误: {error_count}")
|
|
||||||
|
|
||||||
# ---- 4c: 注册 CRUD API 权限 ----
|
|
||||||
print(f"\n [4c] 注册 CRUD API 权限...")
|
|
||||||
crud_count = 0
|
|
||||||
for module, tables in pc.CRUD_TABLES.items():
|
|
||||||
for table in tables:
|
|
||||||
for action in ('list', 'create', 'update', 'delete'):
|
|
||||||
path = f"/{module}/api/{table}_{action}.dspy"
|
|
||||||
try:
|
|
||||||
permid = await ensure_permission(sor, path, permtype='api')
|
|
||||||
await grant_perm(sor, role_ids['admin_superuser'], permid)
|
|
||||||
crud_count += 1
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
print(f" 注册 CRUD API: {crud_count}")
|
|
||||||
|
|
||||||
# ---- 4d: 同步 admin_superuser 权限到各机构角色 ----
|
|
||||||
print(f"\n [4d] 同步 admin_superuser 权限到各机构已有角色...")
|
|
||||||
admin_perms = await sor.R('rolepermission', {'roleid': role_ids['admin_superuser']})
|
|
||||||
sync_count = 0
|
|
||||||
# 查找所有 orgtypeid='customer' 且名称为 admin/superuser 的角色
|
|
||||||
customer_roles = await sor.R('role', {'orgtypeid': 'customer'})
|
|
||||||
for r in customer_roles:
|
|
||||||
if r.name in ('admin', 'superuser', '系统管理员'):
|
|
||||||
for g in admin_perms:
|
|
||||||
await grant_perm(sor, r.id, g.permid)
|
|
||||||
sync_count += 1
|
|
||||||
print(f" 同步授权: {sync_count}")
|
|
||||||
|
|
||||||
# ---- 5: 创建 admin_superuser 用户 ----
|
|
||||||
print(f"\n[5/6] 创建 admin_superuser 用户...")
|
|
||||||
await create_admin_user(sor, ADMIN_USERNAME, ADMIN_PASSWORD, role_ids['admin_superuser'])
|
|
||||||
|
|
||||||
# ---- 6: 汇总 ----
|
|
||||||
print(f"\n[6/6] 初始化汇总")
|
|
||||||
print("-" * 65)
|
|
||||||
print(f" 文件扫描: {len(all_files)} 个")
|
|
||||||
print(f" 路径注册: {len(path_role_map)} 个原始路径 → {perm_count} 个(含/main变体)")
|
|
||||||
print(f" 角色配置: {len(role_ids)} 个")
|
|
||||||
print(f" 权限授权: {grant_count} 次")
|
|
||||||
print(f" CRUD API: {crud_count} 个")
|
|
||||||
print(f" 机构同步: {sync_count} 次")
|
|
||||||
print(f" 管理员账号: {ADMIN_USERNAME} / {ADMIN_PASSWORD}")
|
|
||||||
print(f" 错误: {error_count}")
|
|
||||||
print()
|
|
||||||
print(" 重要:重启应用以刷新 RBAC 权限缓存!")
|
|
||||||
print(" pkill -f integrated_crm_app.py")
|
|
||||||
print(" nohup python app/integrated_crm_app.py --port 8080 &")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# 角色权限数量统计
|
|
||||||
print("角色权限明细:")
|
|
||||||
print("-" * 65)
|
|
||||||
for rid, dbid in role_ids.items():
|
|
||||||
name, desc = pc.ROLES[rid]
|
|
||||||
count = len(await sor.R('rolepermission', {'roleid': dbid}))
|
|
||||||
print(f" {rid:25s} | {name:10s} | {count:4d} 条权限 | {desc}")
|
|
||||||
|
|
||||||
|
|
||||||
async def create_admin_user(sor, username, password, superuser_role_id):
|
|
||||||
"""
|
|
||||||
创建 admin_superuser 用户并分配角色。
|
|
||||||
使用 password_encode() 对密码进行 bcrypt 加密。
|
|
||||||
"""
|
|
||||||
from ahserver.globalEnv import password_encode
|
|
||||||
from appPublic.uniqueID import getID
|
|
||||||
from appPublic.timeUtils import timestampstr
|
|
||||||
|
|
||||||
# 检查用户是否已存在
|
|
||||||
recs = await sor.R('users', {'username': username})
|
|
||||||
if recs:
|
|
||||||
uid = recs[0].id
|
|
||||||
encoded_pw = password_encode(password)
|
|
||||||
# 用原始 SQL 更新密码,避免 sor.U() 错误生成 userid WHERE 条件
|
|
||||||
await sor.sqlExe(
|
|
||||||
"UPDATE users SET password = ${pw}$ WHERE id = ${uid}$",
|
|
||||||
{'pw': encoded_pw, 'uid': uid}
|
|
||||||
)
|
|
||||||
print(f" 用户 {username} 已存在,已更新密码")
|
|
||||||
# 检查是否已有角色
|
|
||||||
existing_roles = await sor.R('userrole', {'userid': uid, 'roleid': superuser_role_id})
|
|
||||||
if not existing_roles:
|
|
||||||
await sor.C('userrole', {
|
|
||||||
'id': getID(),
|
|
||||||
'userid': uid,
|
|
||||||
'roleid': superuser_role_id,
|
|
||||||
})
|
|
||||||
print(f" 已添加 admin_superuser 角色")
|
|
||||||
return uid
|
|
||||||
|
|
||||||
# 创建新用户
|
|
||||||
uid = getID()
|
|
||||||
encoded_pw = password_encode(password)
|
|
||||||
now = timestampstr()
|
|
||||||
|
|
||||||
# 创建用户(orgid 与 userid 相同,表示自属机构)
|
|
||||||
await sor.C('users', {
|
|
||||||
'id': uid,
|
|
||||||
'username': username,
|
|
||||||
'password': encoded_pw,
|
|
||||||
'orgid': uid,
|
|
||||||
'nick_name': '系统管理员',
|
|
||||||
'created_at': now,
|
|
||||||
'login_fail_count': 0,
|
|
||||||
})
|
|
||||||
|
|
||||||
# 创建对应机构
|
|
||||||
await sor.C('organization', {
|
|
||||||
'id': uid,
|
|
||||||
'orgname': '系统管理',
|
|
||||||
})
|
|
||||||
await sor.C('orgtypes', {
|
|
||||||
'id': getID(),
|
|
||||||
'orgid': uid,
|
|
||||||
'orgtypeid': 'platform',
|
|
||||||
})
|
|
||||||
|
|
||||||
# 分配 admin_superuser 角色
|
|
||||||
await sor.C('userrole', {
|
|
||||||
'id': getID(),
|
|
||||||
'userid': uid,
|
|
||||||
'roleid': superuser_role_id,
|
|
||||||
})
|
|
||||||
|
|
||||||
print(f" [CREATED] user: {username} id={uid} (密码已bcrypt加密)")
|
|
||||||
return uid
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(main())
|
|
||||||
@ -32,12 +32,7 @@ import importlib.util
|
|||||||
# 配置
|
# 配置
|
||||||
# ============================================================
|
# ============================================================
|
||||||
|
|
||||||
_script_dir = os.path.dirname(os.path.abspath(__file__))
|
APP_ROOT = os.path.dirname(os.path.abspath(__file__))
|
||||||
# 脚本可能在项目根目录或 app/ 子目录,统一指向项目根
|
|
||||||
if os.path.basename(_script_dir) == 'app':
|
|
||||||
APP_ROOT = os.path.dirname(_script_dir)
|
|
||||||
else:
|
|
||||||
APP_ROOT = _script_dir
|
|
||||||
sys.path.insert(0, APP_ROOT)
|
sys.path.insert(0, APP_ROOT)
|
||||||
|
|
||||||
# perm_config.py 路径
|
# perm_config.py 路径
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user