From 0fd5ca9dc4f77445c2e452370025ccf3de2b9cbf Mon Sep 17 00:00:00 2001 From: yumoqing Date: Mon, 1 Jun 2026 18:10:28 +0800 Subject: [PATCH] refactor: use EventDispatcher for hot_reload, remove invalidate_all_caches coupling --- .../configuredServer.cpython-310.pyc | Bin 6716 -> 6714 bytes ahserver/configuredServer.py | 2 +- ahserver/hotreload.py | 107 ++++-------------- ahserver/webapp.py | 5 +- 4 files changed, 29 insertions(+), 85 deletions(-) diff --git a/ahserver/__pycache__/configuredServer.cpython-310.pyc b/ahserver/__pycache__/configuredServer.cpython-310.pyc index 574d7fa784866a512711be98cb2027c2c0bc1994..7eb9f2b505e5773c172d55b6b630e318fdc7c785 100644 GIT binary patch delta 1697 zcmZuxO>7%Q6!z?Hovj_4{Fx@H1ExQ2+(45w32Bm2^3yZ{q!vLcl?b%i?yi3t$Gd&A zu2Nc2pdqvz_|b5QxKt_}B5?ruR?!1TBu*ShLP*>}>Xi$;Hw(?G!qR^8=FPX?e)DFY ze;9u-p7k@CR04nc>cEvhy1&Tg+1mF*N7*z>R1&MB_?&ce?$cX|)iL*g+l%A)Qo`+X z`?nHqzh_>VxUH|gh?#?K9y9sd+UjKMQ?|gEA%4;3#Y4SMoYkj~9K|pEH9cJQH@xAh z2OA!@Ys26LJ|DKL+=FW#42vy&f*lhN^nv8FNQZ*>Rqq`;MB!lq(ukmlk<8kzYdLk# zZZrcY+LNlZ8GZVkxRlPHQgu)AX1QG3giE+AwKA71kVarbC{5nH9N1x% zON|GSqS7vkZ_~f$)VE^grcbPh`|Ypg*D%~=>Qtk*_RK7cRA#x&MncIrFRpgxPQFR+ zT_$h{T!R0V#)UOnEQ@=Y(}yb*RtcW4@G9mk%Gbo-nVnIpIM4ye{dkp{!5p60e>YBp zvNY%j;~K4>vNPH&i6Tv%SITc#Jj8-D1E2=j)!=m!WKPV0Iyox!CO+u#Z0FM4RSM%u z>3=)K!WL0dz=^WAjmIk2BbA>!zZ*~Vq>b~uI5LqsYovw#dH;`O1R4>S+ zV>j?}!%fV|?1GxqJ$Dhn2Y8QEWAzfBbw5g;qvW>O?kOKqZCj!_^_5K3c&&juM?q=0 zH7CSNp3RNlQjU_~H$x9(8+Crg3*i}h4+zx1M3W+eWZIzFu&5<`N_~V-%0!oAn-Yt} zVK&^M$;ULwR{hY5HflNL3aDaG|H!oCH_GD2Tu=T3Vs8T`vhMTkbT%eke%RFY8-cgU*mog9Iv!5(jZgtHgS>xc-S6NM@qbQM7S#|2c~Zo zFf66vCCZ#3c$uJwK>hoyQM61zcQdNOJ&NuUko_2_8!(5tF?@Ijf|=-GX5MT!byH8Z zwNC=AmQKN!I7ppe@8+OUXoR&54?e*VZnnmHZ?KuY<2**$xVv{x$oTZ`QkhD!H!HH>|sD~Cs0o8z(Wq0jt>e|-Z zbw~phA8iN?Cm3--6;j2AniC-3fc^sr!F^7sXD&!cNSt_YR&7@mmi9Mq-uumO-n@Cc z{&D8#nM^C4PA2g8ODVsu4St%*G53dKPq6tb#cGiy>WS(!KF_&Xcj!i%+=up7Q>hn{6O8kS=_ zYhJ()sw?`Fxbwd`zN5~vNq$$&>xa<+NBAG=NcM3uj}j0of&x!#BbteGKGr+hEo*04 zN=6GrDRSPkdh!UGCWC1jRG7s>zMFJfjt3pCXD(a0)FtQD7h#iT%tl4yrKK&G**W z*m3k>-jt0wAd+?`T(d5dAe8x+#$1X5Lbcm%nBw=0(kz_}ULq(HEKyj&=sB86GONe3 zh$_^5bzhMkren&`o+6av&k9!CV?u4Wf{0(>camFk7l`zL2^VR)a(`iTo>iP)BG)Sj zrXFow;jg7~8CiEd=+tWdCS2m%sqyHdq(Xszm%5jgim{@S&?^6S|M{Ga<{pbm{|I53 zr_)RNTeP3XFD8ekrJzG=T>_8bf0Z%tCI##KtMr-4H8OpIM>aT*K8yV82wQo6C!Oa< zGjA8EzOvk6A2v>>y@2;~zrPfy35_l+F4p}u^Sn|bb)?DLNw%6+5ZZvNfx3H7ftPU* z%!y?p?}-Y#i4P`Q>8mUNnQ^6*)9~zOr)@dmrj&w^C=bu`K;>$r@_XO+Y>B_$KO|o{ zDDYeTcjw;4n5oG|7rJ9Nv2)=X`b4HICk@X%3(x|1k3?d1;-3xtrB}$g!*34OCML*y zihy1kk@N$jVh8tSXI_^@(4o}{JrKRN>soa$gvV%IKtMHt4FU~8^g^d; zQNOpz^$~)Zj(TE;9OsC`Y`962kNNHFpuR<{9 diff --git a/ahserver/configuredServer.py b/ahserver/configuredServer.py index 770e1ca..f8dac20 100644 --- a/ahserver/configuredServer.py +++ b/ahserver/configuredServer.py @@ -22,7 +22,7 @@ from .serverenv import ServerEnv from .filestorage import TmpFileRecord from .loadplugins import load_plugins from .real_ip import real_ip_middleware -from .hotreload import HotReloader, get_i18n_paths, hot_reload_task, hot_reload_handler, invalidate_all_caches +from .hotreload import HotReloader, get_i18n_paths, hot_reload_task, hot_reload_handler startup_coros = [] cleanupctx_coros = [] diff --git a/ahserver/hotreload.py b/ahserver/hotreload.py index e47a9e6..92a4299 100644 --- a/ahserver/hotreload.py +++ b/ahserver/hotreload.py @@ -6,19 +6,20 @@ Watches file mtimes and triggers reload of cached resources: - i18n files (MiniI18N singleton) - Jinja2 template cache (auto_reload already handles this) -Module caches (rbac/pricing/uapi/llmage) can be cleared via: +Module cache invalidation via EventDispatcher: +- ahserver dispatches 'hot_reload' event +- Each module binds its own cache-clear handler in load_XXX() +- No coupling: ahserver doesn't import or know about any module + +Trigger sources: - HTTP endpoint: GET /__hot_reload__ (triggers all workers via signal file) -- Automatic when config.json changes +- config.json mtime change (auto-detected by FileWatcher) +- Signal file mtime change (cross-process, reuse_port multi-worker) Cross-process cache invalidation (reuse_port multi-process): - GET /__hot_reload__ writes to /tmp/.sage_cache_invalidate signal file - All workers detect signal file mtime change within their check interval -- Each worker independently clears its own caches -- No cross-process IPC or Redis needed - -Multi-process safe: each process independently checks mtimes via stat(), -no cross-process coordination needed. When a file changes on disk, -all processes detect it on their next check cycle. +- Each worker dispatches 'hot_reload' event independently Usage in conf/config.json: { @@ -118,7 +119,8 @@ class HotReloader: """Check for file changes and reload if needed. Returns: - dict with keys 'config', 'i18n', 'signal' indicating what was reloaded + dict with keys 'config', 'i18n', 'signal' indicating what was reloaded. + Caller should dispatch 'hot_reload' event if dict is non-empty. """ if not self._should_check(): return {} @@ -140,22 +142,17 @@ class HotReloader: # Check signal file (cross-process cache invalidation) if self._check_signal_file(): - invalidate_all_caches() reloaded['signal'] = True return reloaded def _reload_config(self): - """Clear JsonConfig singleton so next getConfig() call reloads from disk. - Also clear all module caches since config changes may affect them. - """ + """Clear JsonConfig singleton so next getConfig() call reloads from disk.""" try: from appPublic.jsonConfig import JsonConfig # SingletonDecorator stores instance as .instance JsonConfig.instance = None info('[hot_reload] config.json changed, cache cleared') - # Also clear module caches since config may affect module_cache settings - invalidate_all_caches() except Exception as e: warning(f'[hot_reload] failed to reload config: {e}') @@ -195,7 +192,10 @@ async def hot_reload_task(app, reloader): """Background task that periodically checks for file changes. Added to app.on_startup when hot_reload is enabled. + Dispatches 'hot_reload' event via EventDispatcher when changes detected. """ + from .serverenv import ServerEnv + dispatcher = ServerEnv().event_dispatcher info(f'[hot_reload] started, interval={reloader._interval}s') try: while True: @@ -203,92 +203,33 @@ async def hot_reload_task(app, reloader): reloaded = reloader.check_and_reload() if reloaded: info(f'[hot_reload] reloaded: {list(reloaded.keys())}') + await dispatcher.dispatch('hot_reload', reloaded) except asyncio.CancelledError: info('[hot_reload] stopped') raise -def invalidate_all_caches(): - """Clear all module caches (rbac/pricing/uapi/llmage). - - Called automatically when config.json changes, or manually via - GET /__hot_reload__ endpoint. - - Each module cache is cleared independently with try/except to - prevent one module's failure from blocking others. - """ - cleared = [] - - # rbac: UserPermissions is NOT a singleton — the actual instance is - # stored on ServerEnv by load_rbac(). Must get that instance, not - # create a new one (which would have empty caches). - try: - from ahserver.serverenv import ServerEnv - g = ServerEnv() - up = getattr(g, 'userpermissions', None) - if up is not None: - up.ur_caches.clear() - up.invalidate_rp_cache() - cleared.append('rbac') - else: - debug('[hot_reload] rbac: userpermissions not found on ServerEnv') - except Exception as e: - debug(f'[hot_reload] rbac cache clear skipped: {e}') - - # pricing: PricingProgram class-level pricing_data dict - try: - from pricing.pricing import PricingProgram - PricingProgram.pricing_data.clear() - cleared.append('pricing') - except Exception as e: - debug(f'[hot_reload] pricing cache clear skipped: {e}') - - # uapi: UAPIData singleton (@SingletonDecorator) with 3 cache dicts - try: - from uapi.apidata import UAPIData - ud = UAPIData() - ud.apidata.clear() - ud.org_users.clear() - ud.apikeys.clear() - cleared.append('uapi') - except Exception as e: - debug(f'[hot_reload] uapi cache clear skipped: {e}') - - # llmage: module-level _uapi_cache and _uapiio_cache - try: - from llmage.utils import invalidate_uapi_cache - invalidate_uapi_cache() # clears both _uapi_cache and _uapiio_cache - cleared.append('llmage') - except Exception as e: - debug(f'[hot_reload] llmage cache clear skipped: {e}') - - if cleared: - info(f'[hot_reload] cleared caches: {cleared}') - return cleared - - async def hot_reload_handler(request): """HTTP endpoint handler for GET /__hot_reload__. - Triggers cache invalidation across all workers via signal file. - Each worker detects the signal file change within its check interval - and clears its own caches. + Triggers cache invalidation across all workers via signal file, + and immediately dispatches 'hot_reload' for the current worker. Returns JSON with confirmation that signal was sent. """ from aiohttp import web + from .serverenv import ServerEnv - # Write signal file - all workers will detect this + # Write signal file - other workers will detect this with open(SIGNAL_FILE, 'w') as f: f.write(str(time.time())) - # Also clear current worker's cache immediately - cleared = invalidate_all_caches() + # Dispatch immediately for current worker + dispatcher = ServerEnv().event_dispatcher + await dispatcher.dispatch('hot_reload', {'source': 'http_endpoint'}) return web.json_response({ 'status': 'ok', - 'cleared': cleared, - 'message': 'Signal sent to all workers', + 'message': 'Signal sent to all workers, current worker dispatched hot_reload', 'timestamp': time.time() }) - diff --git a/ahserver/webapp.py b/ahserver/webapp.py index b0b2039..3e670b2 100644 --- a/ahserver/webapp.py +++ b/ahserver/webapp.py @@ -9,6 +9,7 @@ from appPublic.log import MyLogger, info, debug, warning import argparse from appPublic.folderUtils import ProgramPath from appPublic.jsonConfig import getConfig +from appPublic.event_dispatcher import EventDispatcher from sqlor.dbpools import DBPools from ahserver.configuredServer import ConfiguredServer from ahserver.serverenv import ServerEnv @@ -33,8 +34,10 @@ def webserver(init_func, workdir, port=None, app=None): else: logger = MyLogger('webapp', levelname='info') DBPools(config.databases) - init_func() + # Create EventDispatcher BEFORE init_func() so all load_XXX() can bind se = ServerEnv() + se.event_dispatcher = EventDispatcher() + init_func() se.workdir = workdir se.port = port server = ConfiguredServer(workdir=workdir, app=app)