from appPublic.log import info, debug, exception from ahserver.auth_api import AuthAPI from appPublic.jsonConfig import getConfig from appPublic.registerfunction import getRegisterFunctionByName from sqlor.dbpools import DBPools class MyAuthAPI(AuthAPI): async def checkUserPermission(self, request, user, path): config = getConfig() if user is None: user = 'anonymous_user' # print(f'kboss check permission {user} for {path}') ''' sql = """select distinct a.path, d.id from permission a left join rolepermission b on a.id = b.permid left join userrole c on c.roleid = b.roleid left join users d on d.id = c.userid where a.path = ${path}$ and a.del_flg = '0' and (b.del_flg = '0' or b.del_flg is NULL) and (c.del_flg = '0' or c.del_flg is NULL) and (d.del_flg = '0' or d.del_flg is NULL) and (d.user_status = '0' or d.user_status is NULL) -- and (d.id = ${user}$ or d.username is Null) """ ''' sql = """select distinct a.*, c.userid from (select id, path from permission where path=${path}$ and del_flg='0') a right join rolepermission b on a.id = b.permid right join userrole c on b.roleid = c.roleid where c.userid = ${user}$ and b.del_flg='0' and c.del_flg='0'""" db = DBPools() dbname = config.authdb or 'kboss' print(f'database name is {dbname}') async with db.sqlorContext(dbname) as sor: perms = await sor.R('permission', {'path':path}) if len(perms) == 0: debug(f'{path=} not found in permission, can access') return True recs = await sor.sqlExe(sql, {'path':path, 'user':user}) for r in recs: id = r['id'] if id is not None: debug(f'{user=} can access {path=}') return True debug(f'{user=} has not permission to call {path=}, {recs=}') return False debug(f'error happened {user}, {path}') return False