feat: add create_superuser utility

This commit is contained in:
yumoqing 2026-05-28 11:43:36 +08:00
parent 9bcd8bf93a
commit a0a1c5ff3a

277
app/create_superuser.py Normal file
View File

@ -0,0 +1,277 @@
"""
create_superuser.py - Create an owner organization superuser
This script creates:
1. A superuser role with full permissions on all CRM modules
2. The initial superuser account assigned to this role
3. Admin role with permissions to manage users, roles, and permissions
"""
import sys
import os
import asyncio
from sqlor.dbpools import DBPools
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
from ahserver.globalEnv import password_encode
def get_config(app_dir=None):
if app_dir is None:
app_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
return getConfig(app_dir, {'workdir': app_dir})
def ensure_dbpools(app_dir=None):
"""Initialize database connection if not already done."""
try:
from sqlor.dbpools import get_pools
pools = get_pools()
if pools:
return pools
except:
pass
config = get_config(app_dir)
return DBPools(config.databases)
async def ensure_permission(sor, path, name='', permtype='page'):
"""Create permission if it doesn't exist, return its id."""
recs = await sor.R('permission', {'path': path})
if recs:
return recs[0].id
pid = getID()
await sor.C('permission', {
'id': pid,
'name': name or path,
'path': path,
'permtype': permtype,
'parentid': '',
})
return pid
async def ensure_role(sor, name, orgtypeid='customer'):
"""Create role if it doesn't exist, return its id."""
recs = await sor.R('role', {'name': name, 'orgtypeid': orgtypeid})
if recs:
return recs[0].id
rid = getID()
await sor.C('role', {
'id': rid,
'name': name,
'orgtypeid': orgtypeid,
'description': name,
})
return rid
async def assign_role_to_user(sor, userid, roleid):
"""Assign a role to a user, skip if already assigned."""
recs = await sor.R('userrole', {'userid': userid, 'roleid': roleid})
if recs:
return False
await sor.C('userrole', {
'id': getID(),
'userid': userid,
'roleid': roleid,
})
return True
async def assign_perm_to_role(sor, roleid, permid):
"""Assign a permission to a role, skip if already assigned."""
recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid})
if recs:
return False
await sor.C('rolepermission', {
'id': getID(),
'roleid': roleid,
'permid': permid,
})
return True
async def create_or_get_user(sor, username, password, nickname='', orgid='0'):
"""Create a user or return existing one."""
recs = await sor.R('users', {'username': username})
if recs:
return recs[0]
uid = getID()
await sor.C('users', {
'id': uid,
'username': username,
'password': password_encode(password),
'nick_name': nickname or username,
'orgid': orgid,
'created_at': None,
'login_fail_count': 0,
'last_login_fail': None,
'last_login': None,
})
recs = await sor.R('users', {'id': uid})
return recs[0] if recs else None
# ============================================================
# Permission definitions for CRM modules
# ============================================================
# CRM module CRUD permissions (all tables)
CRM_TABLES = [
('customer_management', ['customers', 'customer_handover', 'public_sea_pool']),
('opportunity_management', ['opportunities', 'opportunity_products', 'opportunity_contacts']),
('contract_management', ['contracts', 'contract_items', 'contract_attachments']),
('financial_management', ['receivables', 'payments', 'financial_orders']),
('workflow_approval', ['approval_workflows', 'approval_instances', 'approval_steps']),
('unified_dashboard', ['dashboards', 'reports']),
]
# RBAC management permissions
RBAC_MANAGE_TABLES = [
'users', 'role', 'permission', 'userrole', 'rolepermission',
'organization', 'orgtypes', 'userdepartment', 'userapp',
]
def collect_all_permissions():
"""Collect all permission paths that the superuser needs."""
perms = []
# Login and home page
perms.append(('/main/login.ui', 'Login Page', 'page'))
perms.append(('/main/index.ui', 'Home Page', 'page'))
perms.append(('/main/menu.ui', 'Main Menu', 'page'))
# User self-service (register, profile)
perms.append(('/user/register.ui', 'Register', 'page'))
perms.append(('/user/up_login.dspy', 'Login', 'action'))
perms.append(('/user/logout.dspy', 'Logout', 'action'))
# All CRM module pages and CRUD operations
for module, tables in CRM_TABLES:
# Module entry
perms.append((f'/{module}', f'{module} Module', 'module'))
perms.append((f'/{module}/index.ui', f'{module} Index', 'page'))
for table in tables:
perms.append((f'/{module}/{table}', f'{table} Management', 'page'))
perms.append((f'/{module}/{table}/index.ui', f'{table} List', 'page'))
perms.append((f'/{module}/{table}/get_{table}.dspy', f'{table} Read', 'crud'))
perms.append((f'/{module}/{table}/add_{table}.dspy', f'{table} Create', 'crud'))
perms.append((f'/{module}/{table}/update_{table}.dspy', f'{table} Update', 'crud'))
perms.append((f'/{module}/{table}/delete_{table}.dspy', f'{table} Delete', 'crud'))
# RBAC management permissions
perms.append(('/rbac', 'RBAC Management', 'module'))
for table in RBAC_MANAGE_TABLES:
perms.append((f'/rbac/{table}', f'{table} Management', 'page'))
perms.append((f'/rbac/{table}/index.ui', f'{table} List', 'page'))
perms.append((f'/rbac/{table}/get_{table}.dspy', f'{table} Read', 'crud'))
perms.append((f'/rbac/{table}/add_{table}.dspy', f'{table} Create', 'crud'))
perms.append((f'/rbac/{table}/update_{table}.dspy', f'{table} Update', 'crud'))
perms.append((f'/rbac/{table}/delete_{table}.dspy', f'{table} Delete', 'crud'))
# RBAC user management scripts
perms.append(('/rbac/users/get_users.dspy', 'Get Users', 'crud'))
perms.append(('/rbac/users/add_users.dspy', 'Create Users', 'crud'))
perms.append(('/rbac/users/update_users.dspy', 'Update Users', 'crud'))
perms.append(('/rbac/users/delete_users.dspy', 'Delete Users', 'crud'))
perms.append(('/rbac/role/get_role.dspy', 'Get Roles', 'crud'))
perms.append(('/rbac/role/add_role.dspy', 'Create Roles', 'crud'))
perms.append(('/rbac/role/update_role.dspy', 'Update Roles', 'crud'))
perms.append(('/rbac/role/delete_role.dspy', 'Delete Roles', 'crud'))
perms.append(('/rbac/permission/get_permission.dspy', 'Get Permissions', 'crud'))
perms.append(('/rbac/permission/add_permission.dspy', 'Create Permissions', 'crud'))
perms.append(('/rbac/permission/update_permission.dspy', 'Update Permissions', 'crud'))
perms.append(('/rbac/permission/delete_permission.dspy', 'Delete Permissions', 'crud'))
perms.append(('/rbac/userrole/get_userrole.dspy', 'Get User Roles', 'crud'))
perms.append(('/rbac/userrole/add_userrole.dspy', 'Assign User Roles', 'crud'))
perms.append(('/rbac/userrole/delete_userrole.dspy', 'Remove User Roles', 'crud'))
perms.append(('/rbac/rolepermission/get_rolepermission.dspy', 'Get Role Permissions', 'crud'))
perms.append(('/rbac/rolepermission/add_rolepermission.dspy', 'Assign Role Permissions', 'crud'))
perms.append(('/rbac/rolepermission/delete_rolepermission.dspy', 'Remove Role Permissions', 'crud'))
return perms
async def main():
import argparse
parser = argparse.ArgumentParser(description='Create CRM superuser with full permissions')
parser.add_argument('--username', default='superadmin', help='Superuser username (default: superadmin)')
parser.add_argument('--password', default=None, help='Superuser password (default: will prompt)')
parser.add_argument('--nickname', default='系统管理员', help='Superuser display name')
parser.add_argument('--dbname', default=None, help='Database name in config (default: first database)')
args = parser.parse_args()
if args.password is None:
import getpass
args.password = getpass.getpass(f'Enter password for {args.username}: ')
config = get_config()
dbname = args.dbname or list(config.databases.keys())[0]
db = ensure_dbpools()
async with db.sqlorContext(dbname) as sor:
print(f"Using database: {dbname}")
# 1. Collect all permissions
all_perms = collect_all_permissions()
print(f"\n[1/4] Creating {len(all_perms)} permissions...")
perm_map = {} # path -> permid
created = 0
for path, name, permtype in all_perms:
permid = await ensure_permission(sor, path, name, permtype)
perm_map[path] = permid
created += 1
print(f" Created/verified {created} permissions")
# 2. Create superuser role
print(f"\n[2/4] Creating superuser role...")
superuser_role_id = await ensure_role(sor, 'superuser', orgtypeid='customer')
print(f" Role 'superuser' id: {superuser_role_id}")
# 3. Assign all permissions to superuser role
print(f"\n[3/4] Assigning all permissions to superuser role...")
perm_count = 0
for path, permid in perm_map.items():
assigned = await assign_perm_to_role(sor, superuser_role_id, permid)
if assigned:
perm_count += 1
print(f" Assigned {perm_count} permissions to superuser role")
# 4. Create admin role (subset: user/role/permission management)
print(f"\n[4/4] Creating admin role and superuser account...")
admin_role_id = await ensure_role(sor, 'admin', orgtypeid='customer')
# Admin gets RBAC management + CRM read permissions
admin_perm_count = 0
for (path, permid), (p_path, p_name, p_type) in zip(perm_map.items(), all_perms):
# Admin gets all RBAC permissions
if '/rbac/' in path:
assigned = await assign_perm_to_role(sor, admin_role_id, permid)
if assigned:
admin_perm_count += 1
# Admin gets CRM read-only (page permissions)
elif p_type == 'page' and any(f'/{m}' in path for m, _ in CRM_TABLES):
assigned = await assign_perm_to_role(sor, admin_role_id, permid)
if assigned:
admin_perm_count += 1
# Admin gets login and home
elif path in ('/main/login.ui', '/main/index.ui', '/main/menu.ui', '/user/register.ui'):
assigned = await assign_perm_to_role(sor, admin_role_id, permid)
if assigned:
admin_perm_count += 1
print(f" Assigned {admin_perm_count} permissions to admin role")
# Create the superuser
user = await create_or_get_user(sor, args.username, args.password, args.nickname)
if user:
assigned_su = await assign_role_to_user(sor, user.id, superuser_role_id)
assigned_admin = await assign_role_to_user(sor, user.id, admin_role_id)
print(f"\n{'='*50}")
print(f" Superuser created: {args.username}")
print(f" User ID: {user.id}")
print(f" Roles: superuser, admin")
print(f"{'='*50}")
else:
print(f"ERROR: Failed to create user {args.username}")
sys.exit(1)
print("\nDone. The superuser can now:")
print(" - Access all CRM modules and features")
print(" - Manage users (create admins, regular users)")
print(" - Manage roles and assign them to users")
print(" - Manage permissions and role-permission mappings")
if __name__ == '__main__':
asyncio.run(main())