pricing/pricing/pricing.py
2026-03-25 18:31:48 +08:00

372 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import yaml
from ahserver.serverenv import ServerEnv
from ahserver.filestorage import FileStorage
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception
from appPublic.dictObject import DictObject
from .write_pattern import write_pattern_xlsx, load_xlsx_pricing
import yaml
"""
采用yaml描述定价策略
在pricing_program的pricing_spec表中定义定价的数据字段
遵循一下规格:
字典结构key是字段名需定义字段的type(类型)label(标题)value_mode(值方式)可选options(可选项)(可选)
必须要有一个price字段其type: float
value_mode 有下列可能的取值
between # 定价表中此字段以如此格式给出:
# "小值 ~ 大值", "~"的前面或后面可以加"="
# 表示含小值,或含大值
in # 定价表中此字段值有如下格式值1 值2 ...
= # 缺省,给定单一值
> #
>=
<
<=
例子vidu定价字段
'''
model:
type: str
label: "模型"
options:
- "viduq3-pro"
- "viduq3-turbo"
resolution:
type: str
label: "分辨率"
options:
- "1024p"
- "720p"
- "540p"
duration:
type: int # 建议改为 int 或 strtimes 不是标准类型
label: "时长"
# 如果时长也有选项,需要补全,例如:
# options:
# - 5
# - 10
off_peak:
type: int # 建议改为 int 或 str因为值是 0 和 1
label: "错峰执行"
options:
- off_peak # 正常时段
- normal # 错峰
price:
type: float
label: 单价
'''
pricing_program_timing表中的pricing_data字段的数据是一个只有一个属性"pricings"的字典
其值为定价条目列表每个定价条目是个字典key值为pricing_spec字段定义的字段
pricings:
- resolution: 480p
duration: 4
audio: false
- resolution: 480p
duration: 8
audio: false
- resolution: 480p
duration: 12
audio: false
- resolution: 480p
duration: 4
audio: true
- resolution: 480p
duration: 8
audio: true
- resolution: 480p
duration: 12
audio: true
- resolution: 720p
duration: 4
audio: false
- resolution: 720p
duration: 8
audio: false
- resolution: 720p
duration: 12
audio: false
- resolution: 720p
duration: 4
audio: true
- resolution: 720p
duration: 8
audio: true
- resolution: 720p
duration: 12
audio: true
"""
typefuncs = {
'int': int,
'float': float
}
def typevalue(v, t):
f = typefuncs.get(t)
if not f:
return v
return f(v)
def check_value(field, spec_value, data_value):
if field.value_mode == 'between':
arr = spec_value.split(' ')
if len(arr) < 2 or len(arr) > 3:
e = f'{spec_value=} error'
exception(e)
raise Exception(e)
if (arr[0] is None or arr[-1] is None) and field.type == 'str':
e = f'字符串类型between方法的两个值任何一个都不能为空'
exception(e)
raise e
else:
if arr[0] is None:
arr[0] = -float('inf')
if arr[-1] is None:
arr[-1] = float('inf')
if len(arr) == 2 or arr[1] == '=~' :
return arr[0] <= data_value and data_value < arr[-1]
if arr[1] == '~':
return arr[0] < data_value and data_value < arr[-1]
if arr[1] == '~=':
return arr[0] < data_value and data_value <= arr[-1]
e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~='
exception(e)
raise Exception(e)
if field.value_mode == 'in':
arr = spec_value.split(' ')
arr = [ typevalue(a, field.type) for a in arr ]
debug(f'{arr=}, {data_value=}')
return data_value in arr
mode = field.value_mode
if not mode or mode == '=':
mode = '=='
ns = {
"a": data_value,
"b": typevalue(spec_value, field.type)
}
script = f'a {mode} b'
x = eval(script, ns)
return x
class PricingProgram:
@staticmethod
async def get_pricing_program(ppid):
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program', {'id': ppid})
if recs:
return recs[0]
e = f'pricing_program(id={ppid}) not found'
raise e
e = f'read pricing_program(id={ppid}) failed'
raise e
@staticmethod
async def load_pricing_data(pptid, webpath_xlsx):
env = ServerEnv()
fs = FileStorage()
fp = fs.realPath(webpath_xlsx)
pricings = load_xlsx_pricing(fp)
async with get_sor_context(env, 'pricing') as sor:
ppts = await sor.R('pricing_program_timing', {'id': pptid})
if ppts:
ppt = ppts[0]
pps = await sor.R('pricing_program', {'id': ppt.ppid})
if not pps:
e = f'pricing_program({pptid}) can not find pricing_program'
exception(f)
pp = pps[0]
pp_spec = yaml.safe_load(pp.pricing_spec)
formula = None
if pp_spec.get('fields'):
fields = pp_spec.get('fields')
pricing_formula = pp_spec.get('formula')
else:
fields = pp_spec
newpricings = []
for p in pricings:
np = {}
for k,v in p.items():
for fk, fv in fields.items():
if k == fv['label']:
np.update({fk:v})
newpricings.append(np)
d = {
'fields': fields,
'formula': formula,
'pricings': newpricings
}
debug(f'{d=}')
ppt.pricing_data = yaml.dump(d, allow_unicode=True)
await sor.U('pricing_program_timing', {
'id': ppt.id,
'pricing_data': ppt.pricing_data
})
return True
e = f'pricing_program_timing(id={pptid}) not found'
exception(e)
raise Exception(e)
e = f'pricing_program_timing(id={pptid}) read failed'
exception(e)
raise e
@staticmethod
async def write_pricing_patten(request, ppid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
env = request._run_ns
recs = await sor.R('pricing_program', {'id': ppid})
if not recs:
debug(f'id={ppid} pricing_program not found')
r = recs[0]
x = DictObject(** yaml.safe_load(r.pricing_spec))
fields = x
if x.get('fields'):
fields = x['fields']
fpath = write_pattern_xlsx(r.name, fields)
return fpath
@staticmethod
def pp_db2app(pp):
try:
pp.pricing_spec = yaml.safe_load(pp.pricing_spec)
except Exception as e:
e = f'{pp.pricing_spec}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
def pp_app2db(pp):
try:
pp.pricing_spec = yaml.dump(pp.pricing_spec, allow_unicode=True)
except Exception as e:
e = f'{pp.pricing_spec}:导出到yaml失败'
exception(e)
raise Exception(e)
@staticmethod
def ppt_db2app(ppt):
try:
ppt.pricing_data = yaml.safe_load(ppt.pricing_data)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
def ppt_app2db(ppt):
try:
ppt.pricing_data = yaml.dump(ppt.pricing_data, allow_unicode=True)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
async def charging(sor, ppid, data):
env = ServerEnv()
biz_date = await env.get_business_date(sor)
sql = """select a.name, a.ownerid, a.providerid,
pricing_belong, discount, b.pricing_data
from pricing_program a, pricing_program_timing b
where a.id = b.ppid
and a.id = ${ppid}$
and b.enabled_date <= ${biz_date}$
and b.expired_date > ${biz_date}$
order by b.enabled_date desc"""
recs = await sor.sqlExe(sql, {
'ppid': ppid,
'biz_date': biz_date
})
if recs:
r = recs[0]
r.prices = PricingProgram.get_pricing_from_ymalstr(data,
r.pricing_data)
debug(f'{r.prices=}')
amt = 0.0
for p in r.prices:
p.cost = p.amount * r.discount
return r.prices
@staticmethod
def get_pricing_from_ymalstr(config_data, yamlstr):
"""
yamlstr是从
"""
d = None
try:
d = yaml.safe_load(yamlstr)
except Exception as e:
exception(f'yaml.sage_load({yamlstr}) error: {e}')
raise e
d = DictObject(**d)
if not d.fields:
exception(f'{d} has not "fields"')
raise Exception(f'定价定义中没有fields数据')
if not d.pricings:
exception(f'{d} has not "pricings"')
raise Exception(f'定价定义中没有pricing数据')
formula = d.formula
ret_items = []
for i, p in enumerate(d.pricings):
if not p.formula:
debug(f'无公式:{p=}')
continue
p_ok = True
times = 1
unit = 1
ns = DictObject()
for k, f in d.fields.items():
if f.type == 'factor':
ns[k] = float(config_data.get(k))
for k,spec_value in p.items():
if k == 'formula':
continue
f = d.fields.get(k)
if not f:
e = f'定价项({i})中的{k}在fields中没有定义'
exception(f'{e}')
raise Exception(e)
data_value = config_data.get(k)
if data_value is None:
e = f'数据({config_data})没有({k})数据'
exception(e)
raise Exception(e)
else:
try:
flg = check_value(f, spec_value, data_value)
if not flg:
# 条件不满足
# debug(f'条件不满足:{p=},{spec_value=}, {data_value=}, {k=}')
p_ok = False
break
except Exception as e:
msg = f'{p=},{f}: {spec_value=}, {data_value=}'
exception(f'{e}:{msg}')
break
if p_ok and p.formula:
np = p.copy()
formula = p.formula
if not formula:
e = f'{p} not formula found'
exception(e)
raise Exception(e)
debug(f'{formula=}, {ns=}, {p=}, {d.fields=}')
np.amount = eval(formula, ns)
ret_items.append(np)
if len(ret_items) == 0:
e = f'{config_data=}{yamlstr=}没有找到合适的定价'
exception(e)
raise Exception(e)
return ret_items