From 8cec17c04295665eb4b750e2070c17fa3b06a939 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 26 May 2026 13:52:10 +0800 Subject: [PATCH] feat: add cross-process cache invalidation via Redis Pub/Sub - userperm.py: All invalidate_* and on_* handlers changed to async - Each invalidation now broadcasts via cache_sync.invalidate() - invalidate_user_cache() -> 'rbac:ur:{userid}' - invalidate_all_user_caches() -> 'rbac:ur:all' - invalidate_rp_cache() -> 'rbac:rp' - init.py: Added start_cache_sync() async function - Starts Redis Pub/Sub subscription - Registers callbacks for rbac:rp and rbac:ur:all channels - set_role_perms.py: CLI script now sends invalidation after execution - send_rbac_invalidation() starts cache_sync, publishes, then stops Compatible with existing EventDispatcher (already supports async handlers) --- rbac/init.py | 22 ++++++++++++++++ rbac/set_role_perms.py | 27 +++++++++++++++++++ rbac/userperm.py | 59 ++++++++++++++++++++++++++++-------------- 3 files changed, 88 insertions(+), 20 deletions(-) diff --git a/rbac/init.py b/rbac/init.py index 0e22547..d37ec09 100644 --- a/rbac/init.py +++ b/rbac/init.py @@ -26,6 +26,7 @@ from rbac.set_role_perms import ( set_role_perms ) from appPublic.log import debug +from ahserver.cache_sync import get_cache_sync async def get_owner_orgid(*args, **kw): return '0' @@ -64,6 +65,27 @@ def _bind_rbac_events(dbpools, dbname, up): debug(f'RBAC event bound: {event_name}') +async def start_cache_sync(): + """Start cache_sync and register RBAC reload callbacks.""" + env = ServerEnv() + cache_sync = get_cache_sync() + + # Get Redis URL from session config + try: + redis_url = env.conf.website.session_redis.url + except AttributeError: + redis_url = "redis://127.0.0.1:6379" + + await cache_sync.start(redis_url) + debug(f'RBAC cache_sync started with Redis URL: {redis_url}') + + # Register callbacks for cache invalidation messages from other processes + up = env.userpermissions + cache_sync.register('rbac:rp', up.invalidate_rp_cache) + cache_sync.register('rbac:ur:all', up.invalidate_all_user_caches) + # Note: rbac:ur:{userid} callbacks are handled by the invalidate_user_cache method itself + + def load_rbac(): AuthAPI.checkUserPermission = objcheckperm env = ServerEnv() diff --git a/rbac/set_role_perms.py b/rbac/set_role_perms.py index 40c7beb..35db6d6 100644 --- a/rbac/set_role_perms.py +++ b/rbac/set_role_perms.py @@ -114,6 +114,31 @@ async def set_role_perms(dbname, module, orgtype, role, items): for tblname in items: await set_role_perm(dbname, module, orgtype, role, tblname) +async def send_rbac_invalidation(): + """Send cache invalidation message to all processes via Redis Pub/Sub.""" + try: + from ahserver.cache_sync import get_cache_sync + cache_sync = get_cache_sync() + # Use default Redis URL for CLI scripts + try: + from ahserver.serverenv import ServerEnv + env = ServerEnv() + redis_url = env.conf.website.session_redis.url + except (AttributeError, Exception): + redis_url = "redis://127.0.0.1:6379" + + await cache_sync.start(redis_url) + # Invalidate both role-permission and all user caches + # (CLI scripts typically change permissions/roles) + await cache_sync.invalidate('rbac:rp') + await cache_sync.invalidate('rbac:ur:all') + debug('RBAC CLI: sent cache invalidation messages') + # Give a moment for the message to be published + await asyncio.sleep(0.1) + await cache_sync.stop() + except Exception as e: + print(f'Warning: Failed to send cache invalidation: {e}') + if __name__ == '__main__': async def main(): if len(sys.argv) < 6: @@ -124,6 +149,8 @@ if __name__ == '__main__': orgtype = sys.argv[3] role = sys.argv[4] await set_role_perms(dbname, module, orgtype, role, sys.argv[5:]) + # Send invalidation message to all running Sage processes + await send_rbac_invalidation() def run(coro): p = '.' diff --git a/rbac/userperm.py b/rbac/userperm.py index 5fc3c00..285aea4 100644 --- a/rbac/userperm.py +++ b/rbac/userperm.py @@ -4,6 +4,8 @@ from sqlor.dbpools import get_sor_context from ahserver.serverenv import ServerEnv from appPublic.Singleton import SingletonDecorator from appPublic.log import debug, error +from ahserver.cache_sync import get_cache_sync + class LRUCache: """Async-safe LRU cache with TTL support. @@ -81,82 +83,82 @@ class UserPermissions: # Async lock for rp_caches initialization (lazy init) self._rp_lock = None - def on_user_update(self, data): + async def on_user_update(self, data): """Event handler for users table update. Clears the specific user's permission cache. """ try: userid = getattr(data, 'id', None) if userid: - self.invalidate_user_cache(userid) + await self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (users update)') except Exception as e: error(f'RBAC on_user_update handler error: {e}') - def on_user_create(self, data): + async def on_user_create(self, data): """Event handler for users table insert. Clears the specific user's permission cache. """ try: userid = getattr(data, 'id', None) if userid: - self.invalidate_user_cache(userid) + await self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (users create)') except Exception as e: error(f'RBAC on_user_create handler error: {e}') - def on_user_delete(self, data): + async def on_user_delete(self, data): """Event handler for users table delete. Clears the specific user's permission cache. """ try: userid = getattr(data, 'id', None) if userid: - self.invalidate_user_cache(userid) + await self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (users delete)') except Exception as e: error(f'RBAC on_user_delete handler error: {e}') - def on_rolepermission_change(self, data): + async def on_rolepermission_change(self, data): """Event handler for rolepermission table C/U/D. Clears the role-permission cache. """ try: - self.invalidate_rp_cache() + await self.invalidate_rp_cache() debug('RBAC role-permission cache invalidated (rolepermission change)') except Exception as e: error(f'RBAC on_rolepermission_change handler error: {e}') - def on_permission_change(self, data): + async def on_permission_change(self, data): """Event handler for permission table update. Clears the role-permission cache. """ try: - self.invalidate_rp_cache() + await self.invalidate_rp_cache() debug('RBAC role-permission cache invalidated (permission change)') except Exception as e: error(f'RBAC on_permission_change handler error: {e}') - def on_role_change(self, data): + async def on_role_change(self, data): """Event handler for role table C/U/D. Clears all user caches and role-permission cache, since role changes may affect any user. """ try: - self.invalidate_all_user_caches() - self.invalidate_rp_cache() + await self.invalidate_all_user_caches() + await self.invalidate_rp_cache() debug('RBAC all caches invalidated (role change)') except Exception as e: error(f'RBAC on_role_change handler error: {e}') - def on_userrole_change(self, data): + async def on_userrole_change(self, data): """Event handler for userrole table C/U/D. Clears the specific user's permission cache based on userid. """ try: userid = getattr(data, 'userid', None) if userid: - self.invalidate_user_cache(userid) + await self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (userrole change)') except Exception as e: error(f'RBAC on_userrole_change handler error: {e}') @@ -180,20 +182,37 @@ class UserPermissions: return self.ur_caches.get(userid) return None - def invalidate_user_cache(self, userid): + async def invalidate_user_cache(self, userid): """Invalidate cache for a specific user. Call this after role changes, user creation, etc. + Also broadcasts invalidation to all other processes via Redis Pub/Sub. """ self.ur_caches.invalidate(userid) + # Broadcast to other processes + cache_sync = get_cache_sync() + if cache_sync.is_running: + await cache_sync.invalidate(f'rbac:ur:{userid}') - def invalidate_all_user_caches(self): - """Invalidate all user role caches.""" + async def invalidate_all_user_caches(self): + """Invalidate all user role caches. + Also broadcasts invalidation to all other processes via Redis Pub/Sub. + """ self.ur_caches.clear() + # Broadcast to other processes + cache_sync = get_cache_sync() + if cache_sync.is_running: + await cache_sync.invalidate('rbac:ur:all') - def invalidate_rp_cache(self): - """Invalidate role-permission cache (after permission changes).""" + async def invalidate_rp_cache(self): + """Invalidate role-permission cache (after permission changes). + Also broadcasts invalidation to all other processes via Redis Pub/Sub. + """ self.rp_caches = None self.rp_cache_loaded_at = 0 + # Broadcast to other processes + cache_sync = get_cache_sync() + if cache_sync.is_running: + await cache_sync.invalidate('rbac:rp') async def load_roleperms(self, sor): """Load all role-permission mappings into cache.