fix: RBAC crash when cache disabled - return roles directly from get_userroles

When module_cache.rbac=false in config.json, LRUCache.get() always returns
None and LRUCache.set() is a no-op. This caused get_userroles() to store
roles into a disabled cache, then callers read back None, leading to
TypeError in check_roles_path() when iterating over None.

Fix: get_userroles() now returns the roles list directly. Callers use the
return value instead of relying solely on cache reads. Added safety
fallback to deny access if roles is somehow still None.
This commit is contained in:
yumoqing 2026-05-29 22:43:39 +08:00
parent fa9f7f5146
commit 67687883ff

View File

@ -193,9 +193,13 @@ class UserPermissions:
return roles return roles
async with get_sor_context(ServerEnv(), 'rbac') as sor: async with get_sor_context(ServerEnv(), 'rbac') as sor:
await self.get_userroles(sor, userid) roles = await self.get_userroles(sor, userid)
# When cache is enabled, get_userroles stored it and we can read back;
# when cache is disabled, get_userroles returns the list directly.
if roles is not None:
return roles
return self.ur_caches.get(userid) return self.ur_caches.get(userid)
return None return ['any', 'logined']
def invalidate_user_cache(self, userid): def invalidate_user_cache(self, userid):
"""Invalidate cache for a specific user. """Invalidate cache for a specific user.
@ -255,7 +259,9 @@ order by c.orgtypeid, c.name"""
self.rp_cache_loaded_at = now self.rp_cache_loaded_at = now
async def get_userroles(self, sor, userid): async def get_userroles(self, sor, userid):
"""Load user roles from database and cache them.""" """Load user roles from database and cache them.
Returns the roles list directly (needed when cache is disabled).
"""
recs = await sor.sqlExe('''select b.id, b.orgtypeid, b.name recs = await sor.sqlExe('''select b.id, b.orgtypeid, b.name
from users a, role b, userrole c from users a, role b, userrole c
where a.id = c.userid where a.id = c.userid
@ -266,7 +272,9 @@ where a.id = c.userid
roles.append(f'{r.orgtypeid}.{r.name}') roles.append(f'{r.orgtypeid}.{r.name}')
roles.append(f'{r.orgtypeid}.*') roles.append(f'{r.orgtypeid}.*')
roles.append(f'*.{r.name}') roles.append(f'*.{r.name}')
self.ur_caches.set(userid, sorted(list(set(roles)))) roles = sorted(list(set(roles)))
self.ur_caches.set(userid, roles)
return roles
def check_roles_path(self, roles, path): def check_roles_path(self, roles, path):
"""Check if any of the roles has access to the given path. """Check if any of the roles has access to the given path.
@ -320,7 +328,13 @@ where a.id = c.userid
if self.rp_caches is None: if self.rp_caches is None:
await self.load_roleperms(sor) await self.load_roleperms(sor)
if not roles: if not roles:
await self.get_userroles(sor, userid) roles = await self.get_userroles(sor, userid)
roles = self.ur_caches.get(userid) # When cache is enabled, fall back to cache read
if roles is None:
roles = self.ur_caches.get(userid)
# Safety fallback: if roles is still None (shouldn't happen), deny access
if not roles:
return False
return self.check_roles_path(roles, path) return self.check_roles_path(roles, path)