fix: implement missing get_all_spec_fields_by_pptid, sor_get_spec_fields, get_pricing_specs_by_pptid

This commit is contained in:
yumoqing 2026-06-12 14:18:52 +08:00
parent 00abc0caa4
commit ac3d7c6d91
3 changed files with 75 additions and 1 deletions

View File

@ -3,4 +3,7 @@ from pricing.pricing import (
test_pricing,
generate_formula_from_factors,
get_pricing_display,
get_pricing_specs_by_pptid,
sor_get_spec_fields,
get_all_spec_fields_by_pptid,
)

View File

@ -4,7 +4,10 @@ from pricing.pricing import (
PricingProgram,
test_pricing,
generate_formula_from_factors,
get_pricing_display
get_pricing_display,
get_pricing_specs_by_pptid,
sor_get_spec_fields,
get_all_spec_fields_by_pptid,
)
from ahserver.serverenv import ServerEnv
@ -37,6 +40,9 @@ def load_pricing():
env.test_pricing = test_pricing
env.generate_formula_from_factors = generate_formula_from_factors
env.get_pricing_display = get_pricing_display
env.get_pricing_specs_by_pptid = get_pricing_specs_by_pptid
env.sor_get_spec_fields = sor_get_spec_fields
env.get_all_spec_fields_by_pptid = get_all_spec_fields_by_pptid
# Bind hot_reload event — only when running in ahserver (event_dispatcher available)
if getattr(env, 'event_dispatcher', None) is not None:
env.event_dispatcher.bind('hot_reload', PricingProgram.on_hot_reload)

View File

@ -982,6 +982,71 @@ async def test_pricing(pptid, data):
amount += p.amount
return amount
async def get_pricing_specs_by_pptid(pptid):
"""Get all pricing_spec records associated with a pricing_program_timing."""
if not pptid:
return []
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.sqlExe(
"SELECT DISTINCT s.* FROM pricing_spec s "
"INNER JOIN pricing_item pi ON pi.psid = s.id "
"WHERE pi.pptid = ${pptid}$",
{'pptid': pptid}
)
return list(recs)
async def sor_get_spec_fields(sor, item_id):
"""Get spec field names for a pricing_item by looking up its psid."""
if not item_id:
return []
recs = await sor.R('pricing_item', {'id': item_id})
if not recs or not recs[0].psid:
return []
spec_recs = await sor.R('pricing_spec', {'id': recs[0].psid})
if not spec_recs or not spec_recs[0].spec_names:
return []
try:
spec_data = json.loads(spec_recs[0].spec_names)
if isinstance(spec_data, list):
return [f['name'] for f in spec_data if isinstance(f, dict) and 'name' in f]
except (json.JSONDecodeError, TypeError, KeyError):
pass
return []
async def get_all_spec_fields_by_pptid(request, pptid):
"""Template function: returns JSON string of dynamic field definitions
from pricing_spec.spec_names for all specs linked to the given pptid.
Used in index.ui to inject extra fields into the fields array."""
if not pptid:
return ''
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
specs = await sor.sqlExe(
"SELECT DISTINCT s.spec_names FROM pricing_spec s "
"INNER JOIN pricing_item pi ON pi.psid = s.id "
"WHERE pi.pptid = ${pptid}$ AND s.spec_names IS NOT NULL AND s.spec_names != ''",
{'pptid': pptid}
)
all_fields = []
seen = set()
for spec in specs:
try:
fields = json.loads(spec.spec_names)
if isinstance(fields, list):
for f in fields:
if isinstance(f, dict) and 'name' in f and f['name'] not in seen:
seen.add(f['name'])
all_fields.append(f)
except (json.JSONDecodeError, TypeError):
continue
if not all_fields:
return ''
return ',\n'.join(json.dumps(f, ensure_ascii=False) for f in all_fields)
if __name__ == '__main__':
MyLogger('Pricing', levelname='info')
yamlstr = """fields: