import sys import asyncio from sqlor.dbpools import DBPools from appPublic.jsonConfig import getConfig from appPublic.uniqueID import getID async def delete_anonymous_perm(sor, permid): await sor.D('rolepermission', { 'roleid': 'anonymous', 'permid': permid }) async def add_roleperm(sor, roleid, permid): ns = { 'roleid': roleid, 'permid': permid } recs = await sor.R('rolepermission', ns.copy()) if recs: print(f'{roleid}, {permid} 已经存在') return ns['id'] = getID() await sor.C('rolepermission', ns) print(f'{roleid}, {permid} perm add') return async def main(): config = getConfig('.') db = DBPools(config.databases) if len(sys.argv) < 3: print(f'{sys.argv[0]} role path') sys.exit(1) role = sys.argv[1] path = sys.argv[2] async with db.sqlorContext('sage') as sor: perms = None if '%' in path: perms = await sor.sqlExe("select * from permission where path like ${path}$", {'path': path}) else: perms = await sor.R('permission', {'path': path}) if len(perms) < 1: perms = [ DictObject(**{ 'id': getID(), 'path': path }) ] await sor.C('permission', perms[0].copy()) if role in ['anonymous', 'logined']: for p in perms: await add_roleperm(sor, role, p.id) return if role in ['anonymous', 'any', 'logined']: await add_roleperm(sor, role, p.id) else: orgtypeid, name = role.split('.') ns = { 'orgtypeid': orgtypeid, 'name': name } roles = await sor.R('role', ns.copy()) if not roles: ns['id'] = getID() await sor.C('role', ns.copy()) else: ns['id'] = roles[0].id for p in perms: await add_roleperm(sor, ns['id'], p.id) await delete_anonymous_perm(sor, p.id) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())