refactor: use EventDispatcher for hot_reload, remove invalidate_all_caches coupling

This commit is contained in:
yumoqing 2026-06-01 18:10:28 +08:00
parent 4e19bb8d06
commit 0fd5ca9dc4
4 changed files with 29 additions and 85 deletions

View File

@ -22,7 +22,7 @@ from .serverenv import ServerEnv
from .filestorage import TmpFileRecord from .filestorage import TmpFileRecord
from .loadplugins import load_plugins from .loadplugins import load_plugins
from .real_ip import real_ip_middleware from .real_ip import real_ip_middleware
from .hotreload import HotReloader, get_i18n_paths, hot_reload_task, hot_reload_handler, invalidate_all_caches from .hotreload import HotReloader, get_i18n_paths, hot_reload_task, hot_reload_handler
startup_coros = [] startup_coros = []
cleanupctx_coros = [] cleanupctx_coros = []

View File

@ -6,19 +6,20 @@ Watches file mtimes and triggers reload of cached resources:
- i18n files (MiniI18N singleton) - i18n files (MiniI18N singleton)
- Jinja2 template cache (auto_reload already handles this) - Jinja2 template cache (auto_reload already handles this)
Module caches (rbac/pricing/uapi/llmage) can be cleared via: Module cache invalidation via EventDispatcher:
- ahserver dispatches 'hot_reload' event
- Each module binds its own cache-clear handler in load_XXX()
- No coupling: ahserver doesn't import or know about any module
Trigger sources:
- HTTP endpoint: GET /__hot_reload__ (triggers all workers via signal file) - HTTP endpoint: GET /__hot_reload__ (triggers all workers via signal file)
- Automatic when config.json changes - config.json mtime change (auto-detected by FileWatcher)
- Signal file mtime change (cross-process, reuse_port multi-worker)
Cross-process cache invalidation (reuse_port multi-process): Cross-process cache invalidation (reuse_port multi-process):
- GET /__hot_reload__ writes to /tmp/.sage_cache_invalidate signal file - GET /__hot_reload__ writes to /tmp/.sage_cache_invalidate signal file
- All workers detect signal file mtime change within their check interval - All workers detect signal file mtime change within their check interval
- Each worker independently clears its own caches - Each worker dispatches 'hot_reload' event independently
- No cross-process IPC or Redis needed
Multi-process safe: each process independently checks mtimes via stat(),
no cross-process coordination needed. When a file changes on disk,
all processes detect it on their next check cycle.
Usage in conf/config.json: Usage in conf/config.json:
{ {
@ -118,7 +119,8 @@ class HotReloader:
"""Check for file changes and reload if needed. """Check for file changes and reload if needed.
Returns: Returns:
dict with keys 'config', 'i18n', 'signal' indicating what was reloaded dict with keys 'config', 'i18n', 'signal' indicating what was reloaded.
Caller should dispatch 'hot_reload' event if dict is non-empty.
""" """
if not self._should_check(): if not self._should_check():
return {} return {}
@ -140,22 +142,17 @@ class HotReloader:
# Check signal file (cross-process cache invalidation) # Check signal file (cross-process cache invalidation)
if self._check_signal_file(): if self._check_signal_file():
invalidate_all_caches()
reloaded['signal'] = True reloaded['signal'] = True
return reloaded return reloaded
def _reload_config(self): def _reload_config(self):
"""Clear JsonConfig singleton so next getConfig() call reloads from disk. """Clear JsonConfig singleton so next getConfig() call reloads from disk."""
Also clear all module caches since config changes may affect them.
"""
try: try:
from appPublic.jsonConfig import JsonConfig from appPublic.jsonConfig import JsonConfig
# SingletonDecorator stores instance as .instance # SingletonDecorator stores instance as .instance
JsonConfig.instance = None JsonConfig.instance = None
info('[hot_reload] config.json changed, cache cleared') info('[hot_reload] config.json changed, cache cleared')
# Also clear module caches since config may affect module_cache settings
invalidate_all_caches()
except Exception as e: except Exception as e:
warning(f'[hot_reload] failed to reload config: {e}') warning(f'[hot_reload] failed to reload config: {e}')
@ -195,7 +192,10 @@ async def hot_reload_task(app, reloader):
"""Background task that periodically checks for file changes. """Background task that periodically checks for file changes.
Added to app.on_startup when hot_reload is enabled. Added to app.on_startup when hot_reload is enabled.
Dispatches 'hot_reload' event via EventDispatcher when changes detected.
""" """
from .serverenv import ServerEnv
dispatcher = ServerEnv().event_dispatcher
info(f'[hot_reload] started, interval={reloader._interval}s') info(f'[hot_reload] started, interval={reloader._interval}s')
try: try:
while True: while True:
@ -203,92 +203,33 @@ async def hot_reload_task(app, reloader):
reloaded = reloader.check_and_reload() reloaded = reloader.check_and_reload()
if reloaded: if reloaded:
info(f'[hot_reload] reloaded: {list(reloaded.keys())}') info(f'[hot_reload] reloaded: {list(reloaded.keys())}')
await dispatcher.dispatch('hot_reload', reloaded)
except asyncio.CancelledError: except asyncio.CancelledError:
info('[hot_reload] stopped') info('[hot_reload] stopped')
raise raise
def invalidate_all_caches():
"""Clear all module caches (rbac/pricing/uapi/llmage).
Called automatically when config.json changes, or manually via
GET /__hot_reload__ endpoint.
Each module cache is cleared independently with try/except to
prevent one module's failure from blocking others.
"""
cleared = []
# rbac: UserPermissions is NOT a singleton — the actual instance is
# stored on ServerEnv by load_rbac(). Must get that instance, not
# create a new one (which would have empty caches).
try:
from ahserver.serverenv import ServerEnv
g = ServerEnv()
up = getattr(g, 'userpermissions', None)
if up is not None:
up.ur_caches.clear()
up.invalidate_rp_cache()
cleared.append('rbac')
else:
debug('[hot_reload] rbac: userpermissions not found on ServerEnv')
except Exception as e:
debug(f'[hot_reload] rbac cache clear skipped: {e}')
# pricing: PricingProgram class-level pricing_data dict
try:
from pricing.pricing import PricingProgram
PricingProgram.pricing_data.clear()
cleared.append('pricing')
except Exception as e:
debug(f'[hot_reload] pricing cache clear skipped: {e}')
# uapi: UAPIData singleton (@SingletonDecorator) with 3 cache dicts
try:
from uapi.apidata import UAPIData
ud = UAPIData()
ud.apidata.clear()
ud.org_users.clear()
ud.apikeys.clear()
cleared.append('uapi')
except Exception as e:
debug(f'[hot_reload] uapi cache clear skipped: {e}')
# llmage: module-level _uapi_cache and _uapiio_cache
try:
from llmage.utils import invalidate_uapi_cache
invalidate_uapi_cache() # clears both _uapi_cache and _uapiio_cache
cleared.append('llmage')
except Exception as e:
debug(f'[hot_reload] llmage cache clear skipped: {e}')
if cleared:
info(f'[hot_reload] cleared caches: {cleared}')
return cleared
async def hot_reload_handler(request): async def hot_reload_handler(request):
"""HTTP endpoint handler for GET /__hot_reload__. """HTTP endpoint handler for GET /__hot_reload__.
Triggers cache invalidation across all workers via signal file. Triggers cache invalidation across all workers via signal file,
Each worker detects the signal file change within its check interval and immediately dispatches 'hot_reload' for the current worker.
and clears its own caches.
Returns JSON with confirmation that signal was sent. Returns JSON with confirmation that signal was sent.
""" """
from aiohttp import web from aiohttp import web
from .serverenv import ServerEnv
# Write signal file - all workers will detect this # Write signal file - other workers will detect this
with open(SIGNAL_FILE, 'w') as f: with open(SIGNAL_FILE, 'w') as f:
f.write(str(time.time())) f.write(str(time.time()))
# Also clear current worker's cache immediately # Dispatch immediately for current worker
cleared = invalidate_all_caches() dispatcher = ServerEnv().event_dispatcher
await dispatcher.dispatch('hot_reload', {'source': 'http_endpoint'})
return web.json_response({ return web.json_response({
'status': 'ok', 'status': 'ok',
'cleared': cleared, 'message': 'Signal sent to all workers, current worker dispatched hot_reload',
'message': 'Signal sent to all workers',
'timestamp': time.time() 'timestamp': time.time()
}) })

View File

@ -9,6 +9,7 @@ from appPublic.log import MyLogger, info, debug, warning
import argparse import argparse
from appPublic.folderUtils import ProgramPath from appPublic.folderUtils import ProgramPath
from appPublic.jsonConfig import getConfig from appPublic.jsonConfig import getConfig
from appPublic.event_dispatcher import EventDispatcher
from sqlor.dbpools import DBPools from sqlor.dbpools import DBPools
from ahserver.configuredServer import ConfiguredServer from ahserver.configuredServer import ConfiguredServer
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
@ -33,8 +34,10 @@ def webserver(init_func, workdir, port=None, app=None):
else: else:
logger = MyLogger('webapp', levelname='info') logger = MyLogger('webapp', levelname='info')
DBPools(config.databases) DBPools(config.databases)
init_func() # Create EventDispatcher BEFORE init_func() so all load_XXX() can bind
se = ServerEnv() se = ServerEnv()
se.event_dispatcher = EventDispatcher()
init_func()
se.workdir = workdir se.workdir = workdir
se.port = port se.port = port
server = ConfiguredServer(workdir=workdir, app=app) server = ConfiguredServer(workdir=workdir, app=app)