""" create_superuser.py - Create an owner organization superuser This script creates: 1. A superuser role with full permissions on all CRM modules 2. The initial superuser account assigned to this role 3. Admin role with permissions to manage users, roles, and permissions """ import sys import os import asyncio from sqlor.dbpools import DBPools from appPublic.uniqueID import getID from appPublic.jsonConfig import getConfig from ahserver.globalEnv import password_encode def get_config(app_dir=None): if app_dir is None: app_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) return getConfig(app_dir, {'workdir': app_dir}) def ensure_dbpools(app_dir=None): """Initialize database connection if not already done.""" try: from sqlor.dbpools import get_pools pools = get_pools() if pools: return pools except: pass config = get_config(app_dir) return DBPools(config.databases) async def ensure_permission(sor, path, name='', permtype='page'): """Create permission if it doesn't exist, return its id.""" recs = await sor.R('permission', {'path': path}) if recs: return recs[0].id pid = getID() await sor.C('permission', { 'id': pid, 'name': name or path, 'path': path, 'permtype': permtype, 'parentid': '', }) return pid async def ensure_role(sor, name, orgtypeid='customer'): """Create role if it doesn't exist, return its id.""" recs = await sor.R('role', {'name': name, 'orgtypeid': orgtypeid}) if recs: return recs[0].id rid = getID() await sor.C('role', { 'id': rid, 'name': name, 'orgtypeid': orgtypeid, 'description': name, }) return rid async def assign_role_to_user(sor, userid, roleid): """Assign a role to a user, skip if already assigned.""" recs = await sor.R('userrole', {'userid': userid, 'roleid': roleid}) if recs: return False await sor.C('userrole', { 'id': getID(), 'userid': userid, 'roleid': roleid, }) return True async def assign_perm_to_role(sor, roleid, permid): """Assign a permission to a role, skip if already assigned.""" recs = await sor.R('rolepermission', {'roleid': roleid, 'permid': permid}) if recs: return False await sor.C('rolepermission', { 'id': getID(), 'roleid': roleid, 'permid': permid, }) return True async def create_or_get_user(sor, username, password, nickname='', orgid='0'): """Create a user or return existing one.""" recs = await sor.R('users', {'username': username}) if recs: return recs[0] uid = getID() await sor.C('users', { 'id': uid, 'username': username, 'password': password_encode(password), 'nick_name': nickname or username, 'orgid': orgid, 'created_at': None, 'login_fail_count': 0, 'last_login_fail': None, 'last_login': None, }) recs = await sor.R('users', {'id': uid}) return recs[0] if recs else None # ============================================================ # Permission definitions for CRM modules # ============================================================ # CRM module CRUD permissions (all tables) CRM_TABLES = [ ('customer_management', ['customers', 'customer_handover', 'public_sea_pool']), ('opportunity_management', ['opportunities', 'opportunity_products', 'opportunity_contacts']), ('contract_management', ['contracts', 'contract_items', 'contract_attachments']), ('financial_management', ['receivables', 'payments', 'financial_orders']), ('workflow_approval', ['approval_workflows', 'approval_instances', 'approval_steps']), ('unified_dashboard', ['dashboards', 'reports']), ] # RBAC management permissions RBAC_MANAGE_TABLES = [ 'users', 'role', 'permission', 'userrole', 'rolepermission', 'organization', 'orgtypes', 'userdepartment', 'userapp', ] def collect_all_permissions(): """Collect all permission paths that the superuser needs.""" perms = [] # Login and home page perms.append(('/main/login.ui', 'Login Page', 'page')) perms.append(('/main/index.ui', 'Home Page', 'page')) perms.append(('/main/menu.ui', 'Main Menu', 'page')) # User self-service (register, profile) perms.append(('/user/register.ui', 'Register', 'page')) perms.append(('/user/up_login.dspy', 'Login', 'action')) perms.append(('/user/logout.dspy', 'Logout', 'action')) # All CRM module pages and CRUD operations for module, tables in CRM_TABLES: # Module entry perms.append((f'/{module}', f'{module} Module', 'module')) perms.append((f'/{module}/index.ui', f'{module} Index', 'page')) for table in tables: perms.append((f'/{module}/{table}', f'{table} Management', 'page')) perms.append((f'/{module}/{table}/index.ui', f'{table} List', 'page')) perms.append((f'/{module}/{table}/get_{table}.dspy', f'{table} Read', 'crud')) perms.append((f'/{module}/{table}/add_{table}.dspy', f'{table} Create', 'crud')) perms.append((f'/{module}/{table}/update_{table}.dspy', f'{table} Update', 'crud')) perms.append((f'/{module}/{table}/delete_{table}.dspy', f'{table} Delete', 'crud')) # RBAC management permissions perms.append(('/rbac', 'RBAC Management', 'module')) for table in RBAC_MANAGE_TABLES: perms.append((f'/rbac/{table}', f'{table} Management', 'page')) perms.append((f'/rbac/{table}/index.ui', f'{table} List', 'page')) perms.append((f'/rbac/{table}/get_{table}.dspy', f'{table} Read', 'crud')) perms.append((f'/rbac/{table}/add_{table}.dspy', f'{table} Create', 'crud')) perms.append((f'/rbac/{table}/update_{table}.dspy', f'{table} Update', 'crud')) perms.append((f'/rbac/{table}/delete_{table}.dspy', f'{table} Delete', 'crud')) # RBAC user management scripts perms.append(('/rbac/users/get_users.dspy', 'Get Users', 'crud')) perms.append(('/rbac/users/add_users.dspy', 'Create Users', 'crud')) perms.append(('/rbac/users/update_users.dspy', 'Update Users', 'crud')) perms.append(('/rbac/users/delete_users.dspy', 'Delete Users', 'crud')) perms.append(('/rbac/role/get_role.dspy', 'Get Roles', 'crud')) perms.append(('/rbac/role/add_role.dspy', 'Create Roles', 'crud')) perms.append(('/rbac/role/update_role.dspy', 'Update Roles', 'crud')) perms.append(('/rbac/role/delete_role.dspy', 'Delete Roles', 'crud')) perms.append(('/rbac/permission/get_permission.dspy', 'Get Permissions', 'crud')) perms.append(('/rbac/permission/add_permission.dspy', 'Create Permissions', 'crud')) perms.append(('/rbac/permission/update_permission.dspy', 'Update Permissions', 'crud')) perms.append(('/rbac/permission/delete_permission.dspy', 'Delete Permissions', 'crud')) perms.append(('/rbac/userrole/get_userrole.dspy', 'Get User Roles', 'crud')) perms.append(('/rbac/userrole/add_userrole.dspy', 'Assign User Roles', 'crud')) perms.append(('/rbac/userrole/delete_userrole.dspy', 'Remove User Roles', 'crud')) perms.append(('/rbac/rolepermission/get_rolepermission.dspy', 'Get Role Permissions', 'crud')) perms.append(('/rbac/rolepermission/add_rolepermission.dspy', 'Assign Role Permissions', 'crud')) perms.append(('/rbac/rolepermission/delete_rolepermission.dspy', 'Remove Role Permissions', 'crud')) return perms async def main(): import argparse parser = argparse.ArgumentParser(description='Create CRM superuser with full permissions') parser.add_argument('--username', default='superadmin', help='Superuser username (default: superadmin)') parser.add_argument('--password', default=None, help='Superuser password (default: will prompt)') parser.add_argument('--nickname', default='系统管理员', help='Superuser display name') parser.add_argument('--dbname', default=None, help='Database name in config (default: first database)') args = parser.parse_args() if args.password is None: import getpass args.password = getpass.getpass(f'Enter password for {args.username}: ') config = get_config() dbname = args.dbname or list(config.databases.keys())[0] db = ensure_dbpools() async with db.sqlorContext(dbname) as sor: print(f"Using database: {dbname}") # 1. Collect all permissions all_perms = collect_all_permissions() print(f"\n[1/4] Creating {len(all_perms)} permissions...") perm_map = {} # path -> permid created = 0 for path, name, permtype in all_perms: permid = await ensure_permission(sor, path, name, permtype) perm_map[path] = permid created += 1 print(f" Created/verified {created} permissions") # 2. Create superuser role print(f"\n[2/4] Creating superuser role...") superuser_role_id = await ensure_role(sor, 'superuser', orgtypeid='customer') print(f" Role 'superuser' id: {superuser_role_id}") # 3. Assign all permissions to superuser role print(f"\n[3/4] Assigning all permissions to superuser role...") perm_count = 0 for path, permid in perm_map.items(): assigned = await assign_perm_to_role(sor, superuser_role_id, permid) if assigned: perm_count += 1 print(f" Assigned {perm_count} permissions to superuser role") # 4. Create admin role (subset: user/role/permission management) print(f"\n[4/4] Creating admin role and superuser account...") admin_role_id = await ensure_role(sor, 'admin', orgtypeid='customer') # Admin gets RBAC management + CRM read permissions admin_perm_count = 0 for (path, permid), (p_path, p_name, p_type) in zip(perm_map.items(), all_perms): # Admin gets all RBAC permissions if '/rbac/' in path: assigned = await assign_perm_to_role(sor, admin_role_id, permid) if assigned: admin_perm_count += 1 # Admin gets CRM read-only (page permissions) elif p_type == 'page' and any(f'/{m}' in path for m, _ in CRM_TABLES): assigned = await assign_perm_to_role(sor, admin_role_id, permid) if assigned: admin_perm_count += 1 # Admin gets login and home elif path in ('/main/login.ui', '/main/index.ui', '/main/menu.ui', '/user/register.ui'): assigned = await assign_perm_to_role(sor, admin_role_id, permid) if assigned: admin_perm_count += 1 print(f" Assigned {admin_perm_count} permissions to admin role") # Create the superuser user = await create_or_get_user(sor, args.username, args.password, args.nickname) if user: assigned_su = await assign_role_to_user(sor, user.id, superuser_role_id) assigned_admin = await assign_role_to_user(sor, user.id, admin_role_id) print(f"\n{'='*50}") print(f" Superuser created: {args.username}") print(f" User ID: {user.id}") print(f" Roles: superuser, admin") print(f"{'='*50}") else: print(f"ERROR: Failed to create user {args.username}") sys.exit(1) print("\nDone. The superuser can now:") print(" - Access all CRM modules and features") print(" - Manage users (create admins, regular users)") print(" - Manage roles and assign them to users") print(" - Manage permissions and role-permission mappings") if __name__ == '__main__': asyncio.run(main())