fix: refresh pricing cache for timing id events

This commit is contained in:
yumoqing 2026-05-19 01:12:55 +08:00
parent c85b9527c3
commit b981d07f40
3 changed files with 195 additions and 34 deletions

View File

@ -10,9 +10,11 @@ from ahserver.serverenv import ServerEnv
def _bind_pricing_events(dbpools, dbname):
"""Bind database events to Pricing cache invalidation handlers."""
bindings = [
# pricing_program_timing: 新增(已有 u 的 reload_pp_data但统一用新方法
# pricing_program_timing: SQLor 事件 payload 不保证含 ppid
# U/C 通过 id 反查 ppidD 需 before 预缓存供 after 刷新缓存。
(f'{dbname}:pricing_program_timing:c:after', PricingProgram.on_timing_create),
(f'{dbname}:pricing_program_timing:u:after', PricingProgram.reload_pp_data),
(f'{dbname}:pricing_program_timing:d:before', PricingProgram.precache_timing_delete),
(f'{dbname}:pricing_program_timing:d:after', PricingProgram.on_timing_delete),
# pricing_program: 增删改均刷新该程序的全部缓存
(f'{dbname}:pricing_program:c:after', PricingProgram.reload_pricing_program),

View File

@ -316,21 +316,85 @@ class PricingProgram:
del PricingProgram.pricing_data[ppid]
@staticmethod
async def reload_pp_data(ppt):
def _extract_ppid_from_timing_event(data):
"""从 pricing_program_timing 事件 payload 中直接提取 ppid。"""
if not data:
return None
ppid = data.get('ppid')
if ppid:
return ppid
old_data = data.get('old') or data.get('old_row') or data.get('old_data')
if isinstance(old_data, dict):
return old_data.get('ppid')
return None
@staticmethod
async def _get_ppid_by_timing_id(pptid):
"""通过 pricing_program_timing.id 反查 ppid。删除后记录不存在时返回 None。"""
if not pptid:
return None
env = ServerEnv()
ppid = None
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program_timing', {'id': ppt['id']})
if len(recs) == 0:
exception(f'{ppt["id"]} not found in pricing_program_timing')
return
ppid = recs[0].ppid
debug(f'--EventHandle {ppt}')
dat = curDateString()
k = f'{ppid}.{dat}'
if PricingProgram.pricing_data.get(k):
del PricingProgram.pricing_data[k]
await PricingProgram.get_ppid_pricing(ppid)
recs = await sor.R('pricing_program_timing', {'id': pptid})
if not recs:
return None
return recs[0].ppid
@staticmethod
async def _resolve_timing_event_ppid(data, allow_lookup=True):
"""解析 pricing_program_timing 事件影响的 ppid。
SQLor C/U/D 事件 payload 是调用 sor.C/U/D 时传入的 ns
不保证包含完整行常见 U/D payload 只有主键 id 和修改字段
因此需要在记录仍存在时按 id 反查 ppid删除事件发生在 delete
之后 d:after 仅有 id则数据库已无法反查只能依赖 d:before
预缓存或 payload old/ppid 兜底
"""
ppid = PricingProgram._extract_ppid_from_timing_event(data)
if ppid:
return ppid
pptid = data.get('id') if data else None
if allow_lookup and pptid:
ppid = await PricingProgram._get_ppid_by_timing_id(pptid)
if ppid:
return ppid
if pptid:
return PricingProgram.pricing_data.get(f'timing:{pptid}:ppid')
return None
@staticmethod
async def precache_timing_delete(data):
"""删除前预缓存 pricing_program_timing.id -> ppid供 d:after 使用。"""
pptid = data.get('id') if data else None
if not pptid:
exception(f'id not found in timing delete before event data: {data}')
return
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=True)
if not ppid:
exception(f'ppid not found in timing delete before event data: {data}')
return
PricingProgram.pricing_data[f'timing:{pptid}:ppid'] = ppid
debug(f'--EventHandle timing delete before: id={pptid}, ppid={ppid}')
@staticmethod
async def _reload_timing_pp_data(data, action):
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=(action != 'delete'))
if not ppid:
exception(f'ppid not found in timing {action} event data: {data}')
return
debug(f'--EventHandle timing {action}: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'timing {action}: get_ppid_pricing failed for ppid={ppid}: {e}')
finally:
if action == 'delete' and data and data.get('id'):
PricingProgram.pricing_data.pop(f'timing:{data.get("id")}:ppid', None)
@staticmethod
async def reload_pp_data(ppt):
await PricingProgram._reload_timing_pp_data(ppt, 'update')
@staticmethod
async def reload_pricing_program(data):
@ -349,30 +413,12 @@ class PricingProgram:
@staticmethod
async def on_timing_create(data):
"""处理 pricing_program_timing 新增事件。"""
ppid = data.get('ppid')
if not ppid:
exception(f'ppid not found in timing create event data: {data}')
return
debug(f'--EventHandle timing create: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'on_timing_create: get_ppid_pricing failed for ppid={ppid}: {e}')
await PricingProgram._reload_timing_pp_data(data, 'create')
@staticmethod
async def on_timing_delete(data):
"""处理 pricing_program_timing 删除事件。"""
ppid = data.get('ppid')
if not ppid:
exception(f'ppid not found in timing delete event data: {data}')
return
debug(f'--EventHandle timing delete: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'on_timing_delete: get_ppid_pricing failed for ppid={ppid}: {e}')
await PricingProgram._reload_timing_pp_data(data, 'delete')
@staticmethod
async def get_ppid_pricing(ppid):

View File

@ -0,0 +1,113 @@
import asyncio
from unittest.mock import AsyncMock, patch
from pricing.pricing import PricingProgram
from pricing.init import _bind_pricing_events
from sqlor.dbpools import DBPools
class DummyRecord:
def __init__(self, ppid):
self.ppid = ppid
class DummySor:
def __init__(self, records):
self.records = records
async def R(self, tablename, ns):
assert tablename == 'pricing_program_timing'
assert ns == {'id': 'ppt-1'}
return self.records
class DummySorContext:
def __init__(self, records):
self.records = records
async def __aenter__(self):
return DummySor(self.records)
async def __aexit__(self, exc_type, exc, tb):
return False
async def _update_payload_with_only_id_resolves_ppid_by_timing_id():
PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()}
with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([DummyRecord('pp-1')])), \
patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing:
await PricingProgram.reload_pp_data({'id': 'ppt-1', 'pricing_data': 'changed'})
assert 'pp-1.2026-05-19' not in PricingProgram.pricing_data
assert 'pp-1' not in PricingProgram.pricing_data
get_ppid_pricing.assert_awaited_once_with('pp-1')
async def _delete_after_payload_with_only_id_uses_before_precache():
PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()}
with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([DummyRecord('pp-1')])), \
patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing:
await PricingProgram.precache_timing_delete({'id': 'ppt-1'})
assert PricingProgram.pricing_data.get('timing:ppt-1:ppid') == 'pp-1'
await PricingProgram.on_timing_delete({'id': 'ppt-1'})
assert 'pp-1.2026-05-19' not in PricingProgram.pricing_data
assert 'pp-1' not in PricingProgram.pricing_data
assert 'timing:ppt-1:ppid' not in PricingProgram.pricing_data
get_ppid_pricing.assert_awaited_once_with('pp-1')
def test_pricing_bindings_include_delete_before_handler():
dbpools = DBPools()
dbpools._events = {}
_bind_pricing_events(dbpools, 'pricingdb')
assert 'pricingdb:pricing_program_timing:d:before' in dbpools._events
def test_sqlor_delete_dispatches_dbpools_before_event():
# Code-path guard: SQLor.D must dispatch DBPools d:before before executing delete,
# otherwise pricing delete pre-cache handlers bound with dbpools.bind() never run.
import inspect
from sqlor.sor import SQLor
source = inspect.getsource(SQLor.D)
before_dispatch = "await self.dbpools.dispatch(rfname, ns)"
before_register = "ret = await rf.exe(rfname, ns)"
delete_execute = "r = await self.execute(sql, ns)"
assert before_dispatch in source
assert source.index(before_dispatch) < source.index(before_register) < source.index(delete_execute)
def test_payload_boundary_from_sqlor_crud_is_input_namespace_not_full_row():
# SQLor.C/U/D dispatch the ns object passed by callers. Therefore U/D calls with
# {'id': ...} do not automatically include ppid unless caller supplied it.
import inspect
from sqlor.sor import SQLor
update_source = inspect.getsource(SQLor.U)
delete_source = inspect.getsource(SQLor.D)
assert "await self.dbpools.dispatch(rfname, ns)" in update_source
assert "await self.dbpools.dispatch(rfname, ns)" in delete_source
assert "select *" not in delete_source.lower()
def test_update_payload_without_ppid_and_missing_row_does_not_clear_unknown_cache():
async def run():
PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()}
with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([])), \
patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing:
await PricingProgram.reload_pp_data({'id': 'ppt-1'})
assert 'pp-1.2026-05-19' in PricingProgram.pricing_data
get_ppid_pricing.assert_not_awaited()
asyncio.run(run())
def test_update_payload_with_only_id_resolves_ppid_by_timing_id_sync():
asyncio.run(_update_payload_with_only_id_resolves_ppid_by_timing_id())
def test_delete_after_payload_with_only_id_uses_before_precache_sync():
asyncio.run(_delete_after_payload_with_only_id_uses_before_precache())