feat: cross-process cache invalidation via signal file for reuse_port multi-process

This commit is contained in:
yumoqing 2026-06-01 17:14:32 +08:00
parent cd578de80d
commit 42eff6cda0

View File

@ -7,9 +7,15 @@ Watches file mtimes and triggers reload of cached resources:
- Jinja2 template cache (auto_reload already handles this) - Jinja2 template cache (auto_reload already handles this)
Module caches (rbac/pricing/uapi/llmage) can be cleared via: Module caches (rbac/pricing/uapi/llmage) can be cleared via:
- HTTP endpoint: GET /__hot_reload__ - HTTP endpoint: GET /__hot_reload__ (triggers all workers via signal file)
- Automatic when config.json changes - Automatic when config.json changes
Cross-process cache invalidation (reuse_port multi-process):
- GET /__hot_reload__ writes to /tmp/.sage_cache_invalidate signal file
- All workers detect signal file mtime change within their check interval
- Each worker independently clears its own caches
- No cross-process IPC or Redis needed
Multi-process safe: each process independently checks mtimes via stat(), Multi-process safe: each process independently checks mtimes via stat(),
no cross-process coordination needed. When a file changes on disk, no cross-process coordination needed. When a file changes on disk,
all processes detect it on their next check cycle. all processes detect it on their next check cycle.
@ -64,6 +70,9 @@ class FileWatcher:
return changed return changed
SIGNAL_FILE = '/tmp/.sage_cache_invalidate'
class HotReloader: class HotReloader:
"""Hot-reload cached resources when source files change. """Hot-reload cached resources when source files change.
@ -78,6 +87,11 @@ class HotReloader:
self._i18n_paths = i18n_paths or [] self._i18n_paths = i18n_paths or []
self._last_check = 0 self._last_check = 0
self._interval = 2 # seconds between checks self._interval = 2 # seconds between checks
# Record current signal file mtime to avoid false trigger on startup
try:
self._last_signal_mtime = os.path.getmtime(SIGNAL_FILE)
except OSError:
self._last_signal_mtime = 0
def set_interval(self, interval): def set_interval(self, interval):
self._interval = interval self._interval = interval
@ -89,11 +103,22 @@ class HotReloader:
self._last_check = now self._last_check = now
return True return True
def _check_signal_file(self):
"""Check if cache invalidation signal file was updated."""
try:
mtime = os.path.getmtime(SIGNAL_FILE)
if mtime > self._last_signal_mtime:
self._last_signal_mtime = mtime
return True
except OSError:
pass
return False
def check_and_reload(self): def check_and_reload(self):
"""Check for file changes and reload if needed. """Check for file changes and reload if needed.
Returns: Returns:
dict with keys 'config', 'i18n' indicating what was reloaded dict with keys 'config', 'i18n', 'signal' indicating what was reloaded
""" """
if not self._should_check(): if not self._should_check():
return {} return {}
@ -113,6 +138,11 @@ class HotReloader:
self._reload_i18n() self._reload_i18n()
reloaded['i18n'] = True reloaded['i18n'] = True
# Check signal file (cross-process cache invalidation)
if self._check_signal_file():
invalidate_all_caches()
reloaded['signal'] = True
return reloaded return reloaded
def _reload_config(self): def _reload_config(self):
@ -233,16 +263,25 @@ def invalidate_all_caches():
async def hot_reload_handler(request): async def hot_reload_handler(request):
"""HTTP endpoint handler for GET /__hot_reload__. """HTTP endpoint handler for GET /__hot_reload__.
Manually triggers cache invalidation. Useful for development Triggers cache invalidation across all workers via signal file.
when database changes aren't detected automatically. Each worker detects the signal file change within its check interval
and clears its own caches.
Returns JSON with list of cleared caches. Returns JSON with confirmation that signal was sent.
""" """
from aiohttp import web from aiohttp import web
# Write signal file - all workers will detect this
with open(SIGNAL_FILE, 'w') as f:
f.write(str(time.time()))
# Also clear current worker's cache immediately
cleared = invalidate_all_caches() cleared = invalidate_all_caches()
return web.json_response({ return web.json_response({
'status': 'ok', 'status': 'ok',
'cleared': cleared, 'cleared': cleared,
'message': 'Signal sent to all workers',
'timestamp': time.time() 'timestamp': time.time()
}) })