cms/scripts/init_superuser.py
yumoqing 569c8e6715 refactor: CMS从Sage子模块重构为独立ahserver Web应用
架构变更:
- CMS作为独立进程运行(端口9090),不再嵌入Sage
- 使用ahserver框架,复用rbac模块做认证授权
- 所有模块共享sage数据库(配置在conf/config.json)

新增文件:
- app/cms.py: 独立Web应用主入口(webapp(init))
- app/global_func.py: 全局函数(get_module_dbname/UiWindow等)
- conf/config.json: 应用配置模板(数据库/路径/处理器/Redis)
- start.sh/stop.sh: 进程管理脚本
- pyproject.toml: 顶层Python包配置

路径重构(去掉/entcms前缀):
- 官网首页: /entcms/index.ui → /index.ui
- 管理后台: /entcms/admin.ui → /admin.ui
- API: /entcms/api/xxx.dspy → /api/xxx.dspy
- CRUD: /entcms/cms_content_list → /cms_content_list
- dingdingflow保持/dingdingflow前缀(映射子目录)

config.json路径映射:
- entcms/wwwroot → / (根路径)
- dingdingflow/wwwroot → /dingdingflow
- bricks/dist → /bricks

构建脚本(build.sh):
- 创建独立venv(py3/)
- 安装核心依赖(apppublic/sqlor/ahserver/bricks/rbac等)
- json2ddl生成CMS业务表DDL
- xls2ui生成CRUD UI
- 生成systemd服务文件

load_path.py更新:
- entcms: 所有路径去掉/entcms前缀
- dingdingflow: 保持/dingdingflow前缀
- 查找set_role_perm.py支持CMS和Sage两种环境

init_superuser.py更新:
- 支持CMS独立环境(自动检测py3/conf)
- 创建superuser角色并分配全部权限
2026-05-27 17:20:36 +08:00

127 lines
4.4 KiB
Python

"""
初始化超级用户
用法:
CMS环境: py3/bin/python scripts/init_superuser.py [username] [password]
Sage环境: cd ~/repos/sage && ./py3/bin/python ~/repos/cms/scripts/init_superuser.py [username] [password]
默认: admin / admin123
"""
import os, sys, asyncio
# 自动检测运行环境
def find_workdir():
"""查找应用根目录"""
script_dir = os.path.dirname(os.path.abspath(__file__))
cms_root = os.path.dirname(script_dir)
# 检查是否是CMS独立环境
if os.path.isdir(os.path.join(cms_root, "py3", "bin")) and \
os.path.isfile(os.path.join(cms_root, "conf", "config.json")):
return cms_root
# 检查Sage环境
for c in [os.path.expanduser("~/repos/sage"), os.path.expanduser("~/sage")]:
if os.path.isdir(os.path.join(c, "py3", "bin")):
return c
return None
workdir = find_workdir()
if not workdir:
print("ERROR: 找不到CMS或Sage应用目录")
sys.exit(1)
# 确保Python路径
sys.path.insert(0, os.path.join(workdir, "py3", "lib"))
os.chdir(workdir)
from sqlor.dbpools import DBPools
from appPublic.jsonConfig import getConfig
from appPublic.uniqueID import getID
from appPublic.password import password_encode
async def main():
username = sys.argv[1] if len(sys.argv) > 1 else "admin"
password = sys.argv[2] if len(sys.argv) > 2 else "admin123"
config = getConfig('.')
db = DBPools(config.databases)
async with db.sqlorContext('sage') as sor:
# 检查用户是否存在
existing = await sor.R('users', {'username': username})
if existing:
print(f"用户 {username} 已存在 (id={existing[0]['id']})")
await sor.U('users', {
'id': existing[0]['id'],
'passwd': password_encode(password)
})
print(f"密码已更新为: {password}")
else:
user_id = getID()
await sor.C('users', {
'id': user_id,
'username': username,
'passwd': password_encode(password),
'orgid': '0',
'orgtypeid': 'owner',
'status': '1',
})
print(f"用户已创建: {username} (id={user_id})")
# 查找或创建superuser角色
roles = await sor.R('role', {'orgtypeid': 'owner', 'name': 'superuser'})
if not roles:
role_id = getID()
await sor.C('role', {
'id': role_id,
'orgtypeid': 'owner',
'name': 'superuser'
})
print(f"角色 owner.superuser 已创建 (id={role_id})")
else:
role_id = roles[0]['id']
print(f"角色 owner.superuser 已存在 (id={role_id})")
# 获取用户ID
users = await sor.R('users', {'username': username})
uid = users[0]['id']
# 分配角色
try:
ur = await sor.R('userrole', {'userid': uid, 'roleid': role_id})
if not ur:
await sor.C('userrole', {
'id': getID(),
'userid': uid,
'roleid': role_id,
})
print(f"已分配角色 owner.superuser 给用户 {username}")
else:
print(f"用户 {username} 已拥有 owner.superuser 角色")
except Exception as e:
print(f"注意: userrole表操作异常: {e}")
print("可能需要手动分配角色")
# 给superuser分配全部权限
try:
all_perms = await sor.R('permission', {})
for perm in all_perms:
existing_rp = await sor.R('rolepermission', {
'roleid': role_id,
'permid': perm['id']
})
if not existing_rp:
await sor.C('rolepermission', {
'id': getID(),
'roleid': role_id,
'permid': perm['id']
})
print(f"已将全部 {len(all_perms)} 条权限分配给 owner.superuser")
except Exception as e:
print(f"注意: 权限分配异常: {e}")
print(f"\n登录信息:")
print(f" 用户名: {username}")
print(f" 密码: {password}")
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())