Revert "feat: add cross-process cache invalidation via Redis Pub/Sub"

This reverts commit 8cec17c04295665eb4b750e2070c17fa3b06a939.
This commit is contained in:
yumoqing 2026-05-26 18:31:04 +08:00
parent 8fdb31a850
commit 39f8eb7d94
3 changed files with 20 additions and 88 deletions

View File

@ -26,7 +26,6 @@ from rbac.set_role_perms import (
set_role_perms
)
from appPublic.log import debug
from ahserver.cache_sync import get_cache_sync
async def get_owner_orgid(*args, **kw):
return '0'
@ -65,27 +64,6 @@ def _bind_rbac_events(dbpools, dbname, up):
debug(f'RBAC event bound: {event_name}')
async def start_cache_sync():
"""Start cache_sync and register RBAC reload callbacks."""
env = ServerEnv()
cache_sync = get_cache_sync()
# Get Redis URL from session config
try:
redis_url = env.conf.website.session_redis.url
except AttributeError:
redis_url = "redis://127.0.0.1:6379"
await cache_sync.start(redis_url)
debug(f'RBAC cache_sync started with Redis URL: {redis_url}')
# Register callbacks for cache invalidation messages from other processes
up = env.userpermissions
cache_sync.register('rbac:rp', up.invalidate_rp_cache)
cache_sync.register('rbac:ur:all', up.invalidate_all_user_caches)
# Note: rbac:ur:{userid} callbacks are handled by the invalidate_user_cache method itself
def load_rbac():
AuthAPI.checkUserPermission = objcheckperm
env = ServerEnv()

View File

@ -114,31 +114,6 @@ async def set_role_perms(dbname, module, orgtype, role, items):
for tblname in items:
await set_role_perm(dbname, module, orgtype, role, tblname)
async def send_rbac_invalidation():
"""Send cache invalidation message to all processes via Redis Pub/Sub."""
try:
from ahserver.cache_sync import get_cache_sync
cache_sync = get_cache_sync()
# Use default Redis URL for CLI scripts
try:
from ahserver.serverenv import ServerEnv
env = ServerEnv()
redis_url = env.conf.website.session_redis.url
except (AttributeError, Exception):
redis_url = "redis://127.0.0.1:6379"
await cache_sync.start(redis_url)
# Invalidate both role-permission and all user caches
# (CLI scripts typically change permissions/roles)
await cache_sync.invalidate('rbac:rp')
await cache_sync.invalidate('rbac:ur:all')
debug('RBAC CLI: sent cache invalidation messages')
# Give a moment for the message to be published
await asyncio.sleep(0.1)
await cache_sync.stop()
except Exception as e:
print(f'Warning: Failed to send cache invalidation: {e}')
if __name__ == '__main__':
async def main():
if len(sys.argv) < 6:
@ -149,8 +124,6 @@ if __name__ == '__main__':
orgtype = sys.argv[3]
role = sys.argv[4]
await set_role_perms(dbname, module, orgtype, role, sys.argv[5:])
# Send invalidation message to all running Sage processes
await send_rbac_invalidation()
def run(coro):
p = '.'

View File

@ -4,8 +4,6 @@ from sqlor.dbpools import get_sor_context
from ahserver.serverenv import ServerEnv
from appPublic.Singleton import SingletonDecorator
from appPublic.log import debug, error
from ahserver.cache_sync import get_cache_sync
class LRUCache:
"""Async-safe LRU cache with TTL support.
@ -83,82 +81,82 @@ class UserPermissions:
# Async lock for rp_caches initialization (lazy init)
self._rp_lock = None
async def on_user_update(self, data):
def on_user_update(self, data):
"""Event handler for users table update.
Clears the specific user's permission cache.
"""
try:
userid = getattr(data, 'id', None)
if userid:
await self.invalidate_user_cache(userid)
self.invalidate_user_cache(userid)
debug(f'RBAC cache invalidated for user id={userid} (users update)')
except Exception as e:
error(f'RBAC on_user_update handler error: {e}')
async def on_user_create(self, data):
def on_user_create(self, data):
"""Event handler for users table insert.
Clears the specific user's permission cache.
"""
try:
userid = getattr(data, 'id', None)
if userid:
await self.invalidate_user_cache(userid)
self.invalidate_user_cache(userid)
debug(f'RBAC cache invalidated for user id={userid} (users create)')
except Exception as e:
error(f'RBAC on_user_create handler error: {e}')
async def on_user_delete(self, data):
def on_user_delete(self, data):
"""Event handler for users table delete.
Clears the specific user's permission cache.
"""
try:
userid = getattr(data, 'id', None)
if userid:
await self.invalidate_user_cache(userid)
self.invalidate_user_cache(userid)
debug(f'RBAC cache invalidated for user id={userid} (users delete)')
except Exception as e:
error(f'RBAC on_user_delete handler error: {e}')
async def on_rolepermission_change(self, data):
def on_rolepermission_change(self, data):
"""Event handler for rolepermission table C/U/D.
Clears the role-permission cache.
"""
try:
await self.invalidate_rp_cache()
self.invalidate_rp_cache()
debug('RBAC role-permission cache invalidated (rolepermission change)')
except Exception as e:
error(f'RBAC on_rolepermission_change handler error: {e}')
async def on_permission_change(self, data):
def on_permission_change(self, data):
"""Event handler for permission table update.
Clears the role-permission cache.
"""
try:
await self.invalidate_rp_cache()
self.invalidate_rp_cache()
debug('RBAC role-permission cache invalidated (permission change)')
except Exception as e:
error(f'RBAC on_permission_change handler error: {e}')
async def on_role_change(self, data):
def on_role_change(self, data):
"""Event handler for role table C/U/D.
Clears all user caches and role-permission cache,
since role changes may affect any user.
"""
try:
await self.invalidate_all_user_caches()
await self.invalidate_rp_cache()
self.invalidate_all_user_caches()
self.invalidate_rp_cache()
debug('RBAC all caches invalidated (role change)')
except Exception as e:
error(f'RBAC on_role_change handler error: {e}')
async def on_userrole_change(self, data):
def on_userrole_change(self, data):
"""Event handler for userrole table C/U/D.
Clears the specific user's permission cache based on userid.
"""
try:
userid = getattr(data, 'userid', None)
if userid:
await self.invalidate_user_cache(userid)
self.invalidate_user_cache(userid)
debug(f'RBAC cache invalidated for user id={userid} (userrole change)')
except Exception as e:
error(f'RBAC on_userrole_change handler error: {e}')
@ -182,37 +180,20 @@ class UserPermissions:
return self.ur_caches.get(userid)
return None
async def invalidate_user_cache(self, userid):
def invalidate_user_cache(self, userid):
"""Invalidate cache for a specific user.
Call this after role changes, user creation, etc.
Also broadcasts invalidation to all other processes via Redis Pub/Sub.
"""
self.ur_caches.invalidate(userid)
# Broadcast to other processes
cache_sync = get_cache_sync()
if cache_sync.is_running:
await cache_sync.invalidate(f'rbac:ur:{userid}')
async def invalidate_all_user_caches(self):
"""Invalidate all user role caches.
Also broadcasts invalidation to all other processes via Redis Pub/Sub.
"""
def invalidate_all_user_caches(self):
"""Invalidate all user role caches."""
self.ur_caches.clear()
# Broadcast to other processes
cache_sync = get_cache_sync()
if cache_sync.is_running:
await cache_sync.invalidate('rbac:ur:all')
async def invalidate_rp_cache(self):
"""Invalidate role-permission cache (after permission changes).
Also broadcasts invalidation to all other processes via Redis Pub/Sub.
"""
def invalidate_rp_cache(self):
"""Invalidate role-permission cache (after permission changes)."""
self.rp_caches = None
self.rp_cache_loaded_at = 0
# Broadcast to other processes
cache_sync = get_cache_sync()
if cache_sync.is_running:
await cache_sync.invalidate('rbac:rp')
async def load_roleperms(self, sor):
"""Load all role-permission mappings into cache.