From b981d07f40a3c9f52c044c2ee581125c7c419e67 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 19 May 2026 01:12:55 +0800 Subject: [PATCH] fix: refresh pricing cache for timing id events --- pricing/init.py | 4 +- pricing/pricing.py | 112 ++++++++++++++++++++++++---------- test_pricing_cache_events.py | 113 +++++++++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+), 34 deletions(-) create mode 100644 test_pricing_cache_events.py diff --git a/pricing/init.py b/pricing/init.py index 150990d..4961111 100644 --- a/pricing/init.py +++ b/pricing/init.py @@ -10,9 +10,11 @@ from ahserver.serverenv import ServerEnv def _bind_pricing_events(dbpools, dbname): """Bind database events to Pricing cache invalidation handlers.""" bindings = [ - # pricing_program_timing: 新增(已有 u 的 reload_pp_data,但统一用新方法) + # pricing_program_timing: SQLor 事件 payload 不保证含 ppid, + # U/C 通过 id 反查 ppid,D 需 before 预缓存供 after 刷新缓存。 (f'{dbname}:pricing_program_timing:c:after', PricingProgram.on_timing_create), (f'{dbname}:pricing_program_timing:u:after', PricingProgram.reload_pp_data), + (f'{dbname}:pricing_program_timing:d:before', PricingProgram.precache_timing_delete), (f'{dbname}:pricing_program_timing:d:after', PricingProgram.on_timing_delete), # pricing_program: 增删改均刷新该程序的全部缓存 (f'{dbname}:pricing_program:c:after', PricingProgram.reload_pricing_program), diff --git a/pricing/pricing.py b/pricing/pricing.py index a9f993a..eedb7d7 100644 --- a/pricing/pricing.py +++ b/pricing/pricing.py @@ -316,21 +316,85 @@ class PricingProgram: del PricingProgram.pricing_data[ppid] @staticmethod - async def reload_pp_data(ppt): + def _extract_ppid_from_timing_event(data): + """从 pricing_program_timing 事件 payload 中直接提取 ppid。""" + if not data: + return None + ppid = data.get('ppid') + if ppid: + return ppid + old_data = data.get('old') or data.get('old_row') or data.get('old_data') + if isinstance(old_data, dict): + return old_data.get('ppid') + return None + + @staticmethod + async def _get_ppid_by_timing_id(pptid): + """通过 pricing_program_timing.id 反查 ppid。删除后记录不存在时返回 None。""" + if not pptid: + return None env = ServerEnv() - ppid = None async with get_sor_context(env, 'pricing') as sor: - recs = await sor.R('pricing_program_timing', {'id': ppt['id']}) - if len(recs) == 0: - exception(f'{ppt["id"]} not found in pricing_program_timing') - return - ppid = recs[0].ppid - debug(f'--EventHandle {ppt}') - dat = curDateString() - k = f'{ppid}.{dat}' - if PricingProgram.pricing_data.get(k): - del PricingProgram.pricing_data[k] - await PricingProgram.get_ppid_pricing(ppid) + recs = await sor.R('pricing_program_timing', {'id': pptid}) + if not recs: + return None + return recs[0].ppid + + @staticmethod + async def _resolve_timing_event_ppid(data, allow_lookup=True): + """解析 pricing_program_timing 事件影响的 ppid。 + + SQLor 的 C/U/D 事件 payload 是调用 sor.C/U/D 时传入的 ns, + 不保证包含完整行。常见 U/D payload 只有主键 id 和修改字段; + 因此需要在记录仍存在时按 id 反查 ppid。删除事件发生在 delete + 之后,若 d:after 仅有 id,则数据库已无法反查,只能依赖 d:before + 预缓存或 payload 中 old/ppid 兜底。 + """ + ppid = PricingProgram._extract_ppid_from_timing_event(data) + if ppid: + return ppid + pptid = data.get('id') if data else None + if allow_lookup and pptid: + ppid = await PricingProgram._get_ppid_by_timing_id(pptid) + if ppid: + return ppid + if pptid: + return PricingProgram.pricing_data.get(f'timing:{pptid}:ppid') + return None + + @staticmethod + async def precache_timing_delete(data): + """删除前预缓存 pricing_program_timing.id -> ppid,供 d:after 使用。""" + pptid = data.get('id') if data else None + if not pptid: + exception(f'id not found in timing delete before event data: {data}') + return + ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=True) + if not ppid: + exception(f'ppid not found in timing delete before event data: {data}') + return + PricingProgram.pricing_data[f'timing:{pptid}:ppid'] = ppid + debug(f'--EventHandle timing delete before: id={pptid}, ppid={ppid}') + + @staticmethod + async def _reload_timing_pp_data(data, action): + ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=(action != 'delete')) + if not ppid: + exception(f'ppid not found in timing {action} event data: {data}') + return + debug(f'--EventHandle timing {action}: {data}') + await PricingProgram._invalidate_ppid_cache(ppid) + try: + await PricingProgram.get_ppid_pricing(ppid) + except Exception as e: + debug(f'timing {action}: get_ppid_pricing failed for ppid={ppid}: {e}') + finally: + if action == 'delete' and data and data.get('id'): + PricingProgram.pricing_data.pop(f'timing:{data.get("id")}:ppid', None) + + @staticmethod + async def reload_pp_data(ppt): + await PricingProgram._reload_timing_pp_data(ppt, 'update') @staticmethod async def reload_pricing_program(data): @@ -349,30 +413,12 @@ class PricingProgram: @staticmethod async def on_timing_create(data): """处理 pricing_program_timing 新增事件。""" - ppid = data.get('ppid') - if not ppid: - exception(f'ppid not found in timing create event data: {data}') - return - debug(f'--EventHandle timing create: {data}') - await PricingProgram._invalidate_ppid_cache(ppid) - try: - await PricingProgram.get_ppid_pricing(ppid) - except Exception as e: - debug(f'on_timing_create: get_ppid_pricing failed for ppid={ppid}: {e}') + await PricingProgram._reload_timing_pp_data(data, 'create') @staticmethod async def on_timing_delete(data): """处理 pricing_program_timing 删除事件。""" - ppid = data.get('ppid') - if not ppid: - exception(f'ppid not found in timing delete event data: {data}') - return - debug(f'--EventHandle timing delete: {data}') - await PricingProgram._invalidate_ppid_cache(ppid) - try: - await PricingProgram.get_ppid_pricing(ppid) - except Exception as e: - debug(f'on_timing_delete: get_ppid_pricing failed for ppid={ppid}: {e}') + await PricingProgram._reload_timing_pp_data(data, 'delete') @staticmethod async def get_ppid_pricing(ppid): diff --git a/test_pricing_cache_events.py b/test_pricing_cache_events.py new file mode 100644 index 0000000..d7137de --- /dev/null +++ b/test_pricing_cache_events.py @@ -0,0 +1,113 @@ +import asyncio +from unittest.mock import AsyncMock, patch + +from pricing.pricing import PricingProgram +from pricing.init import _bind_pricing_events +from sqlor.dbpools import DBPools + + +class DummyRecord: + def __init__(self, ppid): + self.ppid = ppid + + +class DummySor: + def __init__(self, records): + self.records = records + + async def R(self, tablename, ns): + assert tablename == 'pricing_program_timing' + assert ns == {'id': 'ppt-1'} + return self.records + + +class DummySorContext: + def __init__(self, records): + self.records = records + + async def __aenter__(self): + return DummySor(self.records) + + async def __aexit__(self, exc_type, exc, tb): + return False + + +async def _update_payload_with_only_id_resolves_ppid_by_timing_id(): + PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()} + + with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([DummyRecord('pp-1')])), \ + patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing: + await PricingProgram.reload_pp_data({'id': 'ppt-1', 'pricing_data': 'changed'}) + + assert 'pp-1.2026-05-19' not in PricingProgram.pricing_data + assert 'pp-1' not in PricingProgram.pricing_data + get_ppid_pricing.assert_awaited_once_with('pp-1') + + +async def _delete_after_payload_with_only_id_uses_before_precache(): + PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()} + + with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([DummyRecord('pp-1')])), \ + patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing: + await PricingProgram.precache_timing_delete({'id': 'ppt-1'}) + assert PricingProgram.pricing_data.get('timing:ppt-1:ppid') == 'pp-1' + await PricingProgram.on_timing_delete({'id': 'ppt-1'}) + + assert 'pp-1.2026-05-19' not in PricingProgram.pricing_data + assert 'pp-1' not in PricingProgram.pricing_data + assert 'timing:ppt-1:ppid' not in PricingProgram.pricing_data + get_ppid_pricing.assert_awaited_once_with('pp-1') + + +def test_pricing_bindings_include_delete_before_handler(): + dbpools = DBPools() + dbpools._events = {} + _bind_pricing_events(dbpools, 'pricingdb') + assert 'pricingdb:pricing_program_timing:d:before' in dbpools._events + + +def test_sqlor_delete_dispatches_dbpools_before_event(): + # Code-path guard: SQLor.D must dispatch DBPools d:before before executing delete, + # otherwise pricing delete pre-cache handlers bound with dbpools.bind() never run. + import inspect + from sqlor.sor import SQLor + + source = inspect.getsource(SQLor.D) + before_dispatch = "await self.dbpools.dispatch(rfname, ns)" + before_register = "ret = await rf.exe(rfname, ns)" + delete_execute = "r = await self.execute(sql, ns)" + assert before_dispatch in source + assert source.index(before_dispatch) < source.index(before_register) < source.index(delete_execute) + + +def test_payload_boundary_from_sqlor_crud_is_input_namespace_not_full_row(): + # SQLor.C/U/D dispatch the ns object passed by callers. Therefore U/D calls with + # {'id': ...} do not automatically include ppid unless caller supplied it. + import inspect + from sqlor.sor import SQLor + + update_source = inspect.getsource(SQLor.U) + delete_source = inspect.getsource(SQLor.D) + assert "await self.dbpools.dispatch(rfname, ns)" in update_source + assert "await self.dbpools.dispatch(rfname, ns)" in delete_source + assert "select *" not in delete_source.lower() + + +def test_update_payload_without_ppid_and_missing_row_does_not_clear_unknown_cache(): + async def run(): + PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()} + with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([])), \ + patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing: + await PricingProgram.reload_pp_data({'id': 'ppt-1'}) + assert 'pp-1.2026-05-19' in PricingProgram.pricing_data + get_ppid_pricing.assert_not_awaited() + + asyncio.run(run()) + + +def test_update_payload_with_only_id_resolves_ppid_by_timing_id_sync(): + asyncio.run(_update_payload_with_only_id_resolves_ppid_by_timing_id()) + + +def test_delete_after_payload_with_only_id_uses_before_precache_sync(): + asyncio.run(_delete_after_payload_with_only_id_uses_before_precache())