pricing/test_pricing_cache_events.py

114 lines
4.4 KiB
Python

import asyncio
from unittest.mock import AsyncMock, patch
from pricing.pricing import PricingProgram
from pricing.init import _bind_pricing_events
from sqlor.dbpools import DBPools
class DummyRecord:
def __init__(self, ppid):
self.ppid = ppid
class DummySor:
def __init__(self, records):
self.records = records
async def R(self, tablename, ns):
assert tablename == 'pricing_program_timing'
assert ns == {'id': 'ppt-1'}
return self.records
class DummySorContext:
def __init__(self, records):
self.records = records
async def __aenter__(self):
return DummySor(self.records)
async def __aexit__(self, exc_type, exc, tb):
return False
async def _update_payload_with_only_id_resolves_ppid_by_timing_id():
PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()}
with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([DummyRecord('pp-1')])), \
patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing:
await PricingProgram.reload_pp_data({'id': 'ppt-1', 'pricing_data': 'changed'})
assert 'pp-1.2026-05-19' not in PricingProgram.pricing_data
assert 'pp-1' not in PricingProgram.pricing_data
get_ppid_pricing.assert_awaited_once_with('pp-1')
async def _delete_after_payload_with_only_id_uses_before_precache():
PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()}
with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([DummyRecord('pp-1')])), \
patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing:
await PricingProgram.precache_timing_delete({'id': 'ppt-1'})
assert PricingProgram.pricing_data.get('timing:ppt-1:ppid') == 'pp-1'
await PricingProgram.on_timing_delete({'id': 'ppt-1'})
assert 'pp-1.2026-05-19' not in PricingProgram.pricing_data
assert 'pp-1' not in PricingProgram.pricing_data
assert 'timing:ppt-1:ppid' not in PricingProgram.pricing_data
get_ppid_pricing.assert_awaited_once_with('pp-1')
def test_pricing_bindings_include_delete_before_handler():
dbpools = DBPools()
dbpools._events = {}
_bind_pricing_events(dbpools, 'pricingdb')
assert 'pricingdb:pricing_program_timing:d:before' in dbpools._events
def test_sqlor_delete_dispatches_dbpools_before_event():
# Code-path guard: SQLor.D must dispatch DBPools d:before before executing delete,
# otherwise pricing delete pre-cache handlers bound with dbpools.bind() never run.
import inspect
from sqlor.sor import SQLor
source = inspect.getsource(SQLor.D)
before_dispatch = "await self.dbpools.dispatch(rfname, ns)"
before_register = "ret = await rf.exe(rfname, ns)"
delete_execute = "r = await self.execute(sql, ns)"
assert before_dispatch in source
assert source.index(before_dispatch) < source.index(before_register) < source.index(delete_execute)
def test_payload_boundary_from_sqlor_crud_is_input_namespace_not_full_row():
# SQLor.C/U/D dispatch the ns object passed by callers. Therefore U/D calls with
# {'id': ...} do not automatically include ppid unless caller supplied it.
import inspect
from sqlor.sor import SQLor
update_source = inspect.getsource(SQLor.U)
delete_source = inspect.getsource(SQLor.D)
assert "await self.dbpools.dispatch(rfname, ns)" in update_source
assert "await self.dbpools.dispatch(rfname, ns)" in delete_source
assert "select *" not in delete_source.lower()
def test_update_payload_without_ppid_and_missing_row_does_not_clear_unknown_cache():
async def run():
PricingProgram.pricing_data = {'pp-1': ['2026-05-19'], 'pp-1.2026-05-19': object()}
with patch('pricing.pricing.get_sor_context', return_value=DummySorContext([])), \
patch.object(PricingProgram, 'get_ppid_pricing', new=AsyncMock()) as get_ppid_pricing:
await PricingProgram.reload_pp_data({'id': 'ppt-1'})
assert 'pp-1.2026-05-19' in PricingProgram.pricing_data
get_ppid_pricing.assert_not_awaited()
asyncio.run(run())
def test_update_payload_with_only_id_resolves_ppid_by_timing_id_sync():
asyncio.run(_update_payload_with_only_id_resolves_ppid_by_timing_id())
def test_delete_after_payload_with_only_id_uses_before_precache_sync():
asyncio.run(_delete_after_payload_with_only_id_uses_before_precache())