From 39f8eb7d94b68bf9d7f6b7c5bf4fb7c26dcea714 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 26 May 2026 18:31:04 +0800 Subject: [PATCH] Revert "feat: add cross-process cache invalidation via Redis Pub/Sub" This reverts commit 8cec17c04295665eb4b750e2070c17fa3b06a939. --- rbac/init.py | 22 ---------------- rbac/set_role_perms.py | 27 ------------------- rbac/userperm.py | 59 ++++++++++++++---------------------------- 3 files changed, 20 insertions(+), 88 deletions(-) diff --git a/rbac/init.py b/rbac/init.py index d37ec09..0e22547 100644 --- a/rbac/init.py +++ b/rbac/init.py @@ -26,7 +26,6 @@ from rbac.set_role_perms import ( set_role_perms ) from appPublic.log import debug -from ahserver.cache_sync import get_cache_sync async def get_owner_orgid(*args, **kw): return '0' @@ -65,27 +64,6 @@ def _bind_rbac_events(dbpools, dbname, up): debug(f'RBAC event bound: {event_name}') -async def start_cache_sync(): - """Start cache_sync and register RBAC reload callbacks.""" - env = ServerEnv() - cache_sync = get_cache_sync() - - # Get Redis URL from session config - try: - redis_url = env.conf.website.session_redis.url - except AttributeError: - redis_url = "redis://127.0.0.1:6379" - - await cache_sync.start(redis_url) - debug(f'RBAC cache_sync started with Redis URL: {redis_url}') - - # Register callbacks for cache invalidation messages from other processes - up = env.userpermissions - cache_sync.register('rbac:rp', up.invalidate_rp_cache) - cache_sync.register('rbac:ur:all', up.invalidate_all_user_caches) - # Note: rbac:ur:{userid} callbacks are handled by the invalidate_user_cache method itself - - def load_rbac(): AuthAPI.checkUserPermission = objcheckperm env = ServerEnv() diff --git a/rbac/set_role_perms.py b/rbac/set_role_perms.py index 35db6d6..40c7beb 100644 --- a/rbac/set_role_perms.py +++ b/rbac/set_role_perms.py @@ -114,31 +114,6 @@ async def set_role_perms(dbname, module, orgtype, role, items): for tblname in items: await set_role_perm(dbname, module, orgtype, role, tblname) -async def send_rbac_invalidation(): - """Send cache invalidation message to all processes via Redis Pub/Sub.""" - try: - from ahserver.cache_sync import get_cache_sync - cache_sync = get_cache_sync() - # Use default Redis URL for CLI scripts - try: - from ahserver.serverenv import ServerEnv - env = ServerEnv() - redis_url = env.conf.website.session_redis.url - except (AttributeError, Exception): - redis_url = "redis://127.0.0.1:6379" - - await cache_sync.start(redis_url) - # Invalidate both role-permission and all user caches - # (CLI scripts typically change permissions/roles) - await cache_sync.invalidate('rbac:rp') - await cache_sync.invalidate('rbac:ur:all') - debug('RBAC CLI: sent cache invalidation messages') - # Give a moment for the message to be published - await asyncio.sleep(0.1) - await cache_sync.stop() - except Exception as e: - print(f'Warning: Failed to send cache invalidation: {e}') - if __name__ == '__main__': async def main(): if len(sys.argv) < 6: @@ -149,8 +124,6 @@ if __name__ == '__main__': orgtype = sys.argv[3] role = sys.argv[4] await set_role_perms(dbname, module, orgtype, role, sys.argv[5:]) - # Send invalidation message to all running Sage processes - await send_rbac_invalidation() def run(coro): p = '.' diff --git a/rbac/userperm.py b/rbac/userperm.py index 285aea4..5fc3c00 100644 --- a/rbac/userperm.py +++ b/rbac/userperm.py @@ -4,8 +4,6 @@ from sqlor.dbpools import get_sor_context from ahserver.serverenv import ServerEnv from appPublic.Singleton import SingletonDecorator from appPublic.log import debug, error -from ahserver.cache_sync import get_cache_sync - class LRUCache: """Async-safe LRU cache with TTL support. @@ -83,82 +81,82 @@ class UserPermissions: # Async lock for rp_caches initialization (lazy init) self._rp_lock = None - async def on_user_update(self, data): + def on_user_update(self, data): """Event handler for users table update. Clears the specific user's permission cache. """ try: userid = getattr(data, 'id', None) if userid: - await self.invalidate_user_cache(userid) + self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (users update)') except Exception as e: error(f'RBAC on_user_update handler error: {e}') - async def on_user_create(self, data): + def on_user_create(self, data): """Event handler for users table insert. Clears the specific user's permission cache. """ try: userid = getattr(data, 'id', None) if userid: - await self.invalidate_user_cache(userid) + self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (users create)') except Exception as e: error(f'RBAC on_user_create handler error: {e}') - async def on_user_delete(self, data): + def on_user_delete(self, data): """Event handler for users table delete. Clears the specific user's permission cache. """ try: userid = getattr(data, 'id', None) if userid: - await self.invalidate_user_cache(userid) + self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (users delete)') except Exception as e: error(f'RBAC on_user_delete handler error: {e}') - async def on_rolepermission_change(self, data): + def on_rolepermission_change(self, data): """Event handler for rolepermission table C/U/D. Clears the role-permission cache. """ try: - await self.invalidate_rp_cache() + self.invalidate_rp_cache() debug('RBAC role-permission cache invalidated (rolepermission change)') except Exception as e: error(f'RBAC on_rolepermission_change handler error: {e}') - async def on_permission_change(self, data): + def on_permission_change(self, data): """Event handler for permission table update. Clears the role-permission cache. """ try: - await self.invalidate_rp_cache() + self.invalidate_rp_cache() debug('RBAC role-permission cache invalidated (permission change)') except Exception as e: error(f'RBAC on_permission_change handler error: {e}') - async def on_role_change(self, data): + def on_role_change(self, data): """Event handler for role table C/U/D. Clears all user caches and role-permission cache, since role changes may affect any user. """ try: - await self.invalidate_all_user_caches() - await self.invalidate_rp_cache() + self.invalidate_all_user_caches() + self.invalidate_rp_cache() debug('RBAC all caches invalidated (role change)') except Exception as e: error(f'RBAC on_role_change handler error: {e}') - async def on_userrole_change(self, data): + def on_userrole_change(self, data): """Event handler for userrole table C/U/D. Clears the specific user's permission cache based on userid. """ try: userid = getattr(data, 'userid', None) if userid: - await self.invalidate_user_cache(userid) + self.invalidate_user_cache(userid) debug(f'RBAC cache invalidated for user id={userid} (userrole change)') except Exception as e: error(f'RBAC on_userrole_change handler error: {e}') @@ -182,37 +180,20 @@ class UserPermissions: return self.ur_caches.get(userid) return None - async def invalidate_user_cache(self, userid): + def invalidate_user_cache(self, userid): """Invalidate cache for a specific user. Call this after role changes, user creation, etc. - Also broadcasts invalidation to all other processes via Redis Pub/Sub. """ self.ur_caches.invalidate(userid) - # Broadcast to other processes - cache_sync = get_cache_sync() - if cache_sync.is_running: - await cache_sync.invalidate(f'rbac:ur:{userid}') - async def invalidate_all_user_caches(self): - """Invalidate all user role caches. - Also broadcasts invalidation to all other processes via Redis Pub/Sub. - """ + def invalidate_all_user_caches(self): + """Invalidate all user role caches.""" self.ur_caches.clear() - # Broadcast to other processes - cache_sync = get_cache_sync() - if cache_sync.is_running: - await cache_sync.invalidate('rbac:ur:all') - async def invalidate_rp_cache(self): - """Invalidate role-permission cache (after permission changes). - Also broadcasts invalidation to all other processes via Redis Pub/Sub. - """ + def invalidate_rp_cache(self): + """Invalidate role-permission cache (after permission changes).""" self.rp_caches = None self.rp_cache_loaded_at = 0 - # Broadcast to other processes - cache_sync = get_cache_sync() - if cache_sync.is_running: - await cache_sync.invalidate('rbac:rp') async def load_roleperms(self, sor): """Load all role-permission mappings into cache.