524 lines
14 KiB
Python
524 lines
14 KiB
Python
import json
|
||
import yaml
|
||
from ahserver.serverenv import ServerEnv
|
||
from ahserver.filestorage import FileStorage
|
||
from sqlor.dbpools import DBPools, get_sor_context
|
||
from appPublic.log import debug, exception, info, MyLogger
|
||
from appPublic.timeUtils import curDateString
|
||
from appPublic.dictObject import DictObject
|
||
from .write_pattern import write_pattern_xlsx, load_xlsx_pricing
|
||
import yaml
|
||
|
||
"""
|
||
采用yaml描述定价策略,
|
||
在pricing_program的pricing_spec表中定义定价的数据字段
|
||
遵循一下规格:
|
||
字典结构,key是字段名,需定义字段的type(类型),label(标题),value_mode(值方式)(可选),options(可选项)(可选)
|
||
必须要有一个price字段,其type: float
|
||
|
||
value_mode 有下列可能的取值
|
||
between # 定价表中此字段以如此格式给出:
|
||
# "小值 ~ 大值", "~"的前面或后面可以加"=",
|
||
# 表示含小值,或含大值
|
||
in # 定价表中此字段值有如下格式:值1 值2 ...
|
||
= # 缺省,给定单一值
|
||
> #
|
||
>=
|
||
<
|
||
<=
|
||
|
||
|
||
例子(vidu定价字段):
|
||
'''
|
||
model:
|
||
type: str
|
||
label: "模型"
|
||
options:
|
||
- "viduq3-pro"
|
||
- "viduq3-turbo"
|
||
|
||
resolution:
|
||
type: str
|
||
label: "分辨率"
|
||
options:
|
||
- "1024p"
|
||
- "720p"
|
||
- "540p"
|
||
|
||
duration:
|
||
type: int # 建议改为 int 或 str,times 不是标准类型
|
||
label: "时长"
|
||
# 如果时长也有选项,需要补全,例如:
|
||
# options:
|
||
# - 5
|
||
# - 10
|
||
|
||
off_peak:
|
||
type: int # 建议改为 int 或 str,因为值是 0 和 1
|
||
label: "错峰执行"
|
||
options:
|
||
- off_peak # 正常时段
|
||
- normal # 错峰
|
||
price:
|
||
type: float
|
||
label: 单价
|
||
'''
|
||
|
||
pricing_program_timing表中的pricing_data字段的数据是一个只有一个属性"pricings"的字典
|
||
其值为定价条目列表,每个定价条目是个字典,key值为pricing_spec字段定义的字段,
|
||
pricings:
|
||
- resolution: 480p
|
||
duration: 4
|
||
audio: false
|
||
- resolution: 480p
|
||
duration: 8
|
||
audio: false
|
||
- resolution: 480p
|
||
duration: 12
|
||
audio: false
|
||
- resolution: 480p
|
||
duration: 4
|
||
audio: true
|
||
- resolution: 480p
|
||
duration: 8
|
||
audio: true
|
||
- resolution: 480p
|
||
duration: 12
|
||
audio: true
|
||
- resolution: 720p
|
||
duration: 4
|
||
audio: false
|
||
- resolution: 720p
|
||
duration: 8
|
||
audio: false
|
||
- resolution: 720p
|
||
duration: 12
|
||
audio: false
|
||
- resolution: 720p
|
||
duration: 4
|
||
audio: true
|
||
- resolution: 720p
|
||
duration: 8
|
||
audio: true
|
||
- resolution: 720p
|
||
duration: 12
|
||
audio: true
|
||
"""
|
||
|
||
typefuncs = {
|
||
'int': int,
|
||
'float': float
|
||
}
|
||
def typevalue(v, t):
|
||
f = typefuncs.get(t)
|
||
if not f:
|
||
return v
|
||
return f(v)
|
||
|
||
def check_value(field, spec_value, data_value):
|
||
if field.value_mode == 'between':
|
||
arr = spec_value.strip().split()
|
||
if len(arr) < 2 or len(arr) > 3:
|
||
e = f'{spec_value=} error'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
if (arr[0] is None or arr[-1] is None) and field.type == 'str':
|
||
e = f'字符串类型between方法的两个值任何一个都不能为空'
|
||
exception(e)
|
||
raise e
|
||
else:
|
||
if arr[0] is None:
|
||
arr[0] = -float('inf')
|
||
if arr[-1] is None:
|
||
arr[-1] = float('inf')
|
||
arr[0] = float(arr[0])
|
||
arr[-1] = float(arr[-1])
|
||
fvalue = float(data_value)
|
||
|
||
if len(arr) == 2 or arr[1] == '=~' :
|
||
return arr[0] <= fvalue and fvalue < arr[-1]
|
||
if arr[1] == '~':
|
||
return arr[0] < fvalue and fvalue < arr[-1]
|
||
if arr[1] == '~=':
|
||
return arr[0] < fvalue and fvalue <= arr[-1]
|
||
e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~='
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
if field.value_mode == 'in':
|
||
arr = spec_value.strip().split()
|
||
arr = [ typevalue(a, field.type) for a in arr ]
|
||
# debug(f'{arr=}, {data_value=}')
|
||
return data_value in arr
|
||
|
||
mode = field.value_mode
|
||
if not mode or mode == '=':
|
||
mode = '=='
|
||
ns = {
|
||
"a": data_value,
|
||
"b": typevalue(spec_value, field.type)
|
||
}
|
||
script = f'a {mode} b'
|
||
x = eval(script, ns)
|
||
return x
|
||
|
||
def data_mapping(ns, name, v):
|
||
if f'{name}_mappings' in ns.keys():
|
||
mappings = ns[f'{name}_mappings']
|
||
if mappings:
|
||
return mappings.get(v, v)
|
||
return v
|
||
|
||
class PricingProgram:
|
||
pricing_data = {}
|
||
|
||
@staticmethod
|
||
async def get_pricing_program(ppid):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
recs = await sor.R('pricing_program', {'id': ppid})
|
||
if recs:
|
||
return recs[0]
|
||
e = f'pricing_program(id={ppid}) not found'
|
||
raise e
|
||
e = f'read pricing_program(id={ppid}) failed'
|
||
raise e
|
||
|
||
@staticmethod
|
||
async def load_pricing_data(pptid, webpath_xlsx):
|
||
env = ServerEnv()
|
||
fs = FileStorage()
|
||
fp = fs.realPath(webpath_xlsx)
|
||
pricings = load_xlsx_pricing(fp)
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
ppts = await sor.R('pricing_program_timing', {'id': pptid})
|
||
if ppts:
|
||
ppt = ppts[0]
|
||
pps = await sor.R('pricing_program', {'id': ppt.ppid})
|
||
if not pps:
|
||
e = f'pricing_program({pptid}) can not find pricing_program'
|
||
exception(f)
|
||
pp = pps[0]
|
||
pp_spec = yaml.safe_load(pp.pricing_spec)
|
||
formula = None
|
||
if pp_spec.get('fields'):
|
||
fields = pp_spec.get('fields')
|
||
pricing_formula = pp_spec.get('formula')
|
||
else:
|
||
fields = pp_spec
|
||
newpricings = []
|
||
for p in pricings:
|
||
np = {}
|
||
for k,v in p.items():
|
||
for fk, fv in fields.items():
|
||
if k == fv['label']:
|
||
np.update({fk:v})
|
||
newpricings.append(np)
|
||
d = {
|
||
'fields': fields,
|
||
'formula': formula,
|
||
'pricings': newpricings
|
||
}
|
||
debug(f'{d=}')
|
||
ppt.pricing_data = yaml.dump(d, allow_unicode=True)
|
||
await sor.U('pricing_program_timing', {
|
||
'id': ppt.id,
|
||
'pricing_data': ppt.pricing_data
|
||
})
|
||
return True
|
||
e = f'pricing_program_timing(id={pptid}) not found'
|
||
exception(e)
|
||
raise Exception(e)
|
||
e = f'pricing_program_timing(id={pptid}) read failed'
|
||
exception(e)
|
||
raise e
|
||
|
||
@staticmethod
|
||
async def write_pricing_patten(request, ppid):
|
||
async with get_sor_context(request._run_ns, 'pricing') as sor:
|
||
env = request._run_ns
|
||
recs = await sor.R('pricing_program', {'id': ppid})
|
||
if not recs:
|
||
debug(f'id={ppid} pricing_program not found')
|
||
r = recs[0]
|
||
x = DictObject(** yaml.safe_load(r.pricing_spec))
|
||
fields = x
|
||
if x.get('fields'):
|
||
fields = x['fields']
|
||
|
||
fpath = write_pattern_xlsx(r.name, fields)
|
||
return fpath
|
||
|
||
@staticmethod
|
||
def pp_db2app(pp):
|
||
try:
|
||
pp.pricing_spec = yaml.safe_load(pp.pricing_spec)
|
||
except Exception as e:
|
||
e = f'{pp.pricing_spec}:yaml数据格式错误'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
def pp_app2db(pp):
|
||
try:
|
||
pp.pricing_spec = yaml.dump(pp.pricing_spec, allow_unicode=True)
|
||
except Exception as e:
|
||
e = f'{pp.pricing_spec}:导出到yaml失败'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
def ppt_db2app(ppt):
|
||
try:
|
||
ppt.pricing_data = yaml.safe_load(ppt.pricing_data)
|
||
except Exception as e:
|
||
e = f'{ppt.pricing_data}:yaml数据格式错误'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
def ppt_app2db(ppt):
|
||
try:
|
||
ppt.pricing_data = yaml.dump(ppt.pricing_data, allow_unicode=True)
|
||
except Exception as e:
|
||
e = f'{ppt.pricing_data}:yaml数据格式错误'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
async def get_ppid_pricing(ppid):
|
||
dat = curDateString()
|
||
k = f'{ppid}.{dat}'
|
||
d = PricingProgram.pricing_data.get(k)
|
||
if d:
|
||
return d
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
sql = """select a.name, a.ownerid, a.providerid,
|
||
pricing_belong, discount, b.pricing_data
|
||
from pricing_program a, pricing_program_timing b
|
||
where a.id = b.ppid
|
||
and a.id = ${ppid}$
|
||
and b.enabled_date <= ${biz_date}$
|
||
and b.expired_date > ${biz_date}$
|
||
order by b.enabled_date desc"""
|
||
recs = await sor.sqlExe(sql, {
|
||
'ppid': ppid,
|
||
'biz_date': dat
|
||
})
|
||
if len(recs) == 0:
|
||
e = Exception(f'{ppid=},{dat=} data not found')
|
||
exception(f'{e}')
|
||
raise
|
||
d = recs[0]
|
||
PricingProgram.pricing_data[k] = d
|
||
dates = PricingProgram.pricing_data.get(ppid, [])
|
||
dates.append(dat)
|
||
dates_cnt = len(dates)
|
||
if dates_cnt > 2:
|
||
for i in range(dates_cnt - 2):
|
||
dk = f'{ppid}.{dates[i]}'
|
||
PricingProgram.pricing_data[dk]
|
||
dates = dates[-2:]
|
||
PricingProgram.pricing_data[ppid] = dates
|
||
return d
|
||
|
||
async def buffered_charging(ppid, data):
|
||
r = await PricingProgram.get_ppid_pricing(ppid)
|
||
prices = PricingProgram.get_pricing_from_ymalstr(data,
|
||
r.pricing_data)
|
||
amt = 0.0
|
||
for p in prices:
|
||
p.cost = p.amount * r.discount
|
||
return prices
|
||
|
||
async def charging(sor, ppid, data):
|
||
if ppid is None:
|
||
e = Exception(f'ppid is None, {data=}')
|
||
exception(f'{e}')
|
||
raise e
|
||
if data is None:
|
||
e = Exception(f'{ppid=} data is None')
|
||
exception(f'{e}')
|
||
raise e
|
||
env = ServerEnv()
|
||
biz_date = await env.get_business_date(sor)
|
||
sql = """select a.name, a.ownerid, a.providerid,
|
||
pricing_belong, discount, b.pricing_data
|
||
from pricing_program a, pricing_program_timing b
|
||
where a.id = b.ppid
|
||
and a.id = ${ppid}$
|
||
and b.enabled_date <= ${biz_date}$
|
||
and b.expired_date > ${biz_date}$
|
||
order by b.enabled_date desc"""
|
||
recs = await sor.sqlExe(sql, {
|
||
'ppid': ppid,
|
||
'biz_date': biz_date
|
||
})
|
||
if recs:
|
||
r = recs[0]
|
||
r.prices = PricingProgram.get_pricing_from_ymalstr(data,
|
||
r.pricing_data)
|
||
debug(f'{r.prices=}')
|
||
amt = 0.0
|
||
for p in r.prices:
|
||
p.cost = p.amount * r.discount
|
||
return r.prices
|
||
|
||
@staticmethod
|
||
def get_pricing_from_ymalstr(config_data, yamlstr):
|
||
"""
|
||
yamlstr是从
|
||
"""
|
||
if config_data is None:
|
||
e = Exception(f'config_data is None, {yamlstr=}')
|
||
exception(f'{e=}')
|
||
raise e
|
||
config_data = config_data.copy()
|
||
d = None
|
||
try:
|
||
d = yaml.safe_load(yamlstr)
|
||
except Exception as e:
|
||
exception(f'yaml.sage_load({yamlstr}) error: {e}')
|
||
raise e
|
||
d = DictObject(**d)
|
||
if not d.fields:
|
||
exception(f'{d} has not "fields"')
|
||
raise Exception(f'定价定义中没有fields数据')
|
||
if not d.pricings:
|
||
exception(f'{d} has not "pricings"')
|
||
raise Exception(f'定价定义中没有pricing数据')
|
||
ret_items = []
|
||
for i, p in enumerate(d.pricings):
|
||
if not p.formula:
|
||
debug(f'无公式:{p=}')
|
||
continue
|
||
p_ok = True
|
||
times = 1
|
||
unit = 1
|
||
ns = DictObject(**config_data)
|
||
for k,spec_value in p.items():
|
||
if spec_value is None:
|
||
continue
|
||
if k == 'formula':
|
||
continue
|
||
f = d.fields.get(k)
|
||
if not f:
|
||
e = f'定价项({i})中的{k}在fields中没有定义'
|
||
exception(f'{e}')
|
||
raise Exception(e)
|
||
data_value = config_data.get(k)
|
||
# p[f'old_{k}'] = data_value
|
||
# p[f'mapping_{k}'] = data_mapping(d, k, data_value) #需要mapping的数据转换
|
||
data_value = data_mapping(d, k, data_value)
|
||
if data_value is None:
|
||
if 'default' in f.keys():
|
||
data_value = f['default']
|
||
else:
|
||
e = f'数据({config_data})没有({k})数据'
|
||
exception(e)
|
||
raise Exception(e)
|
||
try:
|
||
flg = check_value(f, spec_value, data_value)
|
||
if not flg:
|
||
# 条件不满足
|
||
# debug(f'条件不满足:{p=},{spec_value=}, {data_value=}, {k=}')
|
||
p_ok = False
|
||
break
|
||
except Exception as e:
|
||
msg = f'{p=},{f}: {spec_value=}, {data_value=}'
|
||
exception(f'{e}:{msg}')
|
||
break
|
||
if p_ok and p.formula:
|
||
np = p.copy()
|
||
formula = p.formula
|
||
if not formula:
|
||
e = f'{p} not formula found'
|
||
exception(e)
|
||
raise Exception(e)
|
||
debug(f'{formula=}, {ns=}, {p=}, {d.fields=}')
|
||
np.data = config_data
|
||
np.amount = eval(formula, config_data.copy())
|
||
ret_items.append(np)
|
||
else:
|
||
info(f'{config_data=}, {p=}, {d.model_mappings=}, mismatched')
|
||
if len(ret_items) == 0:
|
||
e = f'{config_data=}{yamlstr=}没有找到合适的定价'
|
||
exception(e)
|
||
raise Exception(e)
|
||
return ret_items
|
||
|
||
async def get_pricing_program(ppid):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
recs = await sor.R('pricing_program', {'id': ppid})
|
||
if len(recs) == 0:
|
||
exception(f'{ppid} not found in pricing_program_timing')
|
||
return None
|
||
pp = recs[0]
|
||
if ppt.pricing_data is None:
|
||
exception(f'{ppid} pricing_data is None in pricing_program_timing')
|
||
return None
|
||
try:
|
||
PricingProgram.pp_db2app(ppt)
|
||
except Exception as e:
|
||
return None
|
||
|
||
async def get_pricing_program_timeing(pptid):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
recs = await sor.R('pricing_program_timing', {'id': pptid})
|
||
if len(recs) == 0:
|
||
exception(f'{pptid} not found in pricing_program_timing')
|
||
return None
|
||
ppt = recs[0]
|
||
if ppt.pricing_data is None:
|
||
exception(f'{pptid} pricing_data is None in pricing_program_timing')
|
||
return None
|
||
try:
|
||
PricingProgram.ppt_db2app(ppt)
|
||
except Exception as e:
|
||
return None
|
||
return ppt
|
||
|
||
async def test_pricing(pptid, data):
|
||
ppt = get_pricing_program_timeing(pptid)
|
||
prices = PricingProgram.get_pricing_from_ymalstr(data, ppt.pricing_data)
|
||
if prices is None:
|
||
return None
|
||
amount = 0
|
||
for p in prices:
|
||
amount += p.amount
|
||
return amount
|
||
|
||
if __name__ == '__main__':
|
||
MyLogger('Pricing', levelname='info')
|
||
yamlstr = """fields:
|
||
formula:
|
||
label: 计算公式
|
||
type: str
|
||
model:
|
||
label: 模型
|
||
type: str
|
||
model_mappings: # 模型映射
|
||
"doubao-seed-2-0-pro-260215": "doubao-seed-2-0-pro"
|
||
"doubao-seed-2-0-lite-260215": "doubao-seed-2-0-lite"
|
||
"doubao-seed-2-0-mini-260215": "doubao-seed-2-0-mini"
|
||
"doubao-seed-2-0-code-preview-260215": "doubao-seed-2-0-code"
|
||
pricings:
|
||
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-pro
|
||
- formula: 0.6 * float(prompt_tokens) + 3.6 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-lite
|
||
- formula: (0.2 * float(prompt_tokens) + 2 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-mini
|
||
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-code
|
||
"""
|
||
config_data = {'completion_tokens': 1416, 'prompt_tokens': 52, 'total_tokens': 1468, 'prompt_tokens_details': {'cached_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 884}, 'model': 'doubao-seed-2-0-pro-260215'}
|
||
print(f'{config_data=}, {yamlstr=}')
|
||
p = PricingProgram.get_pricing_from_ymalstr(config_data, yamlstr)
|
||
print(f'{p=}')
|
||
|