feat: add cross-process cache invalidation via Redis Pub/Sub
- userperm.py: All invalidate_* and on_* handlers changed to async
- Each invalidation now broadcasts via cache_sync.invalidate()
- invalidate_user_cache() -> 'rbac:ur:{userid}'
- invalidate_all_user_caches() -> 'rbac:ur:all'
- invalidate_rp_cache() -> 'rbac:rp'
- init.py: Added start_cache_sync() async function
- Starts Redis Pub/Sub subscription
- Registers callbacks for rbac:rp and rbac:ur:all channels
- set_role_perms.py: CLI script now sends invalidation after execution
- send_rbac_invalidation() starts cache_sync, publishes, then stops
Compatible with existing EventDispatcher (already supports async handlers)
This commit is contained in:
parent
1b21f46336
commit
8cec17c042
22
rbac/init.py
22
rbac/init.py
@ -26,6 +26,7 @@ from rbac.set_role_perms import (
|
||||
set_role_perms
|
||||
)
|
||||
from appPublic.log import debug
|
||||
from ahserver.cache_sync import get_cache_sync
|
||||
|
||||
async def get_owner_orgid(*args, **kw):
|
||||
return '0'
|
||||
@ -64,6 +65,27 @@ def _bind_rbac_events(dbpools, dbname, up):
|
||||
debug(f'RBAC event bound: {event_name}')
|
||||
|
||||
|
||||
async def start_cache_sync():
|
||||
"""Start cache_sync and register RBAC reload callbacks."""
|
||||
env = ServerEnv()
|
||||
cache_sync = get_cache_sync()
|
||||
|
||||
# Get Redis URL from session config
|
||||
try:
|
||||
redis_url = env.conf.website.session_redis.url
|
||||
except AttributeError:
|
||||
redis_url = "redis://127.0.0.1:6379"
|
||||
|
||||
await cache_sync.start(redis_url)
|
||||
debug(f'RBAC cache_sync started with Redis URL: {redis_url}')
|
||||
|
||||
# Register callbacks for cache invalidation messages from other processes
|
||||
up = env.userpermissions
|
||||
cache_sync.register('rbac:rp', up.invalidate_rp_cache)
|
||||
cache_sync.register('rbac:ur:all', up.invalidate_all_user_caches)
|
||||
# Note: rbac:ur:{userid} callbacks are handled by the invalidate_user_cache method itself
|
||||
|
||||
|
||||
def load_rbac():
|
||||
AuthAPI.checkUserPermission = objcheckperm
|
||||
env = ServerEnv()
|
||||
|
||||
@ -114,6 +114,31 @@ async def set_role_perms(dbname, module, orgtype, role, items):
|
||||
for tblname in items:
|
||||
await set_role_perm(dbname, module, orgtype, role, tblname)
|
||||
|
||||
async def send_rbac_invalidation():
|
||||
"""Send cache invalidation message to all processes via Redis Pub/Sub."""
|
||||
try:
|
||||
from ahserver.cache_sync import get_cache_sync
|
||||
cache_sync = get_cache_sync()
|
||||
# Use default Redis URL for CLI scripts
|
||||
try:
|
||||
from ahserver.serverenv import ServerEnv
|
||||
env = ServerEnv()
|
||||
redis_url = env.conf.website.session_redis.url
|
||||
except (AttributeError, Exception):
|
||||
redis_url = "redis://127.0.0.1:6379"
|
||||
|
||||
await cache_sync.start(redis_url)
|
||||
# Invalidate both role-permission and all user caches
|
||||
# (CLI scripts typically change permissions/roles)
|
||||
await cache_sync.invalidate('rbac:rp')
|
||||
await cache_sync.invalidate('rbac:ur:all')
|
||||
debug('RBAC CLI: sent cache invalidation messages')
|
||||
# Give a moment for the message to be published
|
||||
await asyncio.sleep(0.1)
|
||||
await cache_sync.stop()
|
||||
except Exception as e:
|
||||
print(f'Warning: Failed to send cache invalidation: {e}')
|
||||
|
||||
if __name__ == '__main__':
|
||||
async def main():
|
||||
if len(sys.argv) < 6:
|
||||
@ -124,6 +149,8 @@ if __name__ == '__main__':
|
||||
orgtype = sys.argv[3]
|
||||
role = sys.argv[4]
|
||||
await set_role_perms(dbname, module, orgtype, role, sys.argv[5:])
|
||||
# Send invalidation message to all running Sage processes
|
||||
await send_rbac_invalidation()
|
||||
|
||||
def run(coro):
|
||||
p = '.'
|
||||
|
||||
@ -4,6 +4,8 @@ from sqlor.dbpools import get_sor_context
|
||||
from ahserver.serverenv import ServerEnv
|
||||
from appPublic.Singleton import SingletonDecorator
|
||||
from appPublic.log import debug, error
|
||||
from ahserver.cache_sync import get_cache_sync
|
||||
|
||||
|
||||
class LRUCache:
|
||||
"""Async-safe LRU cache with TTL support.
|
||||
@ -81,82 +83,82 @@ class UserPermissions:
|
||||
# Async lock for rp_caches initialization (lazy init)
|
||||
self._rp_lock = None
|
||||
|
||||
def on_user_update(self, data):
|
||||
async def on_user_update(self, data):
|
||||
"""Event handler for users table update.
|
||||
Clears the specific user's permission cache.
|
||||
"""
|
||||
try:
|
||||
userid = getattr(data, 'id', None)
|
||||
if userid:
|
||||
self.invalidate_user_cache(userid)
|
||||
await self.invalidate_user_cache(userid)
|
||||
debug(f'RBAC cache invalidated for user id={userid} (users update)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_user_update handler error: {e}')
|
||||
|
||||
def on_user_create(self, data):
|
||||
async def on_user_create(self, data):
|
||||
"""Event handler for users table insert.
|
||||
Clears the specific user's permission cache.
|
||||
"""
|
||||
try:
|
||||
userid = getattr(data, 'id', None)
|
||||
if userid:
|
||||
self.invalidate_user_cache(userid)
|
||||
await self.invalidate_user_cache(userid)
|
||||
debug(f'RBAC cache invalidated for user id={userid} (users create)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_user_create handler error: {e}')
|
||||
|
||||
def on_user_delete(self, data):
|
||||
async def on_user_delete(self, data):
|
||||
"""Event handler for users table delete.
|
||||
Clears the specific user's permission cache.
|
||||
"""
|
||||
try:
|
||||
userid = getattr(data, 'id', None)
|
||||
if userid:
|
||||
self.invalidate_user_cache(userid)
|
||||
await self.invalidate_user_cache(userid)
|
||||
debug(f'RBAC cache invalidated for user id={userid} (users delete)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_user_delete handler error: {e}')
|
||||
|
||||
def on_rolepermission_change(self, data):
|
||||
async def on_rolepermission_change(self, data):
|
||||
"""Event handler for rolepermission table C/U/D.
|
||||
Clears the role-permission cache.
|
||||
"""
|
||||
try:
|
||||
self.invalidate_rp_cache()
|
||||
await self.invalidate_rp_cache()
|
||||
debug('RBAC role-permission cache invalidated (rolepermission change)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_rolepermission_change handler error: {e}')
|
||||
|
||||
def on_permission_change(self, data):
|
||||
async def on_permission_change(self, data):
|
||||
"""Event handler for permission table update.
|
||||
Clears the role-permission cache.
|
||||
"""
|
||||
try:
|
||||
self.invalidate_rp_cache()
|
||||
await self.invalidate_rp_cache()
|
||||
debug('RBAC role-permission cache invalidated (permission change)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_permission_change handler error: {e}')
|
||||
|
||||
def on_role_change(self, data):
|
||||
async def on_role_change(self, data):
|
||||
"""Event handler for role table C/U/D.
|
||||
Clears all user caches and role-permission cache,
|
||||
since role changes may affect any user.
|
||||
"""
|
||||
try:
|
||||
self.invalidate_all_user_caches()
|
||||
self.invalidate_rp_cache()
|
||||
await self.invalidate_all_user_caches()
|
||||
await self.invalidate_rp_cache()
|
||||
debug('RBAC all caches invalidated (role change)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_role_change handler error: {e}')
|
||||
|
||||
def on_userrole_change(self, data):
|
||||
async def on_userrole_change(self, data):
|
||||
"""Event handler for userrole table C/U/D.
|
||||
Clears the specific user's permission cache based on userid.
|
||||
"""
|
||||
try:
|
||||
userid = getattr(data, 'userid', None)
|
||||
if userid:
|
||||
self.invalidate_user_cache(userid)
|
||||
await self.invalidate_user_cache(userid)
|
||||
debug(f'RBAC cache invalidated for user id={userid} (userrole change)')
|
||||
except Exception as e:
|
||||
error(f'RBAC on_userrole_change handler error: {e}')
|
||||
@ -180,20 +182,37 @@ class UserPermissions:
|
||||
return self.ur_caches.get(userid)
|
||||
return None
|
||||
|
||||
def invalidate_user_cache(self, userid):
|
||||
async def invalidate_user_cache(self, userid):
|
||||
"""Invalidate cache for a specific user.
|
||||
Call this after role changes, user creation, etc.
|
||||
Also broadcasts invalidation to all other processes via Redis Pub/Sub.
|
||||
"""
|
||||
self.ur_caches.invalidate(userid)
|
||||
# Broadcast to other processes
|
||||
cache_sync = get_cache_sync()
|
||||
if cache_sync.is_running:
|
||||
await cache_sync.invalidate(f'rbac:ur:{userid}')
|
||||
|
||||
def invalidate_all_user_caches(self):
|
||||
"""Invalidate all user role caches."""
|
||||
async def invalidate_all_user_caches(self):
|
||||
"""Invalidate all user role caches.
|
||||
Also broadcasts invalidation to all other processes via Redis Pub/Sub.
|
||||
"""
|
||||
self.ur_caches.clear()
|
||||
# Broadcast to other processes
|
||||
cache_sync = get_cache_sync()
|
||||
if cache_sync.is_running:
|
||||
await cache_sync.invalidate('rbac:ur:all')
|
||||
|
||||
def invalidate_rp_cache(self):
|
||||
"""Invalidate role-permission cache (after permission changes)."""
|
||||
async def invalidate_rp_cache(self):
|
||||
"""Invalidate role-permission cache (after permission changes).
|
||||
Also broadcasts invalidation to all other processes via Redis Pub/Sub.
|
||||
"""
|
||||
self.rp_caches = None
|
||||
self.rp_cache_loaded_at = 0
|
||||
# Broadcast to other processes
|
||||
cache_sync = get_cache_sync()
|
||||
if cache_sync.is_running:
|
||||
await cache_sync.invalidate('rbac:rp')
|
||||
|
||||
async def load_roleperms(self, sor):
|
||||
"""Load all role-permission mappings into cache.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user