pricing/pricing/pricing.py
2026-05-27 18:17:44 +08:00

659 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import yaml
from ahserver.serverenv import ServerEnv
from ahserver.filestorage import FileStorage
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, info, MyLogger
from appPublic.timeUtils import curDateString
from appPublic.dictObject import DictObject
from .write_pattern import write_pattern_xlsx, load_xlsx_pricing
import yaml
"""
采用yaml描述定价策略
在pricing_program的pricing_spec表中定义定价的数据字段
遵循一下规格:
字典结构key是字段名需定义字段的type(类型)label(标题)value_mode(值方式)可选options(可选项)(可选)
必须要有一个price字段其type: float
value_mode 有下列可能的取值
between # 定价表中此字段以如此格式给出:
# "小值 ~ 大值", "~"的前面或后面可以加"="
# 表示含小值,或含大值
in # 定价表中此字段值有如下格式值1 值2 ...
= # 缺省,给定单一值
> #
>=
<
<=
例子vidu定价字段
'''
model:
type: str
label: "模型"
options:
- "viduq3-pro"
- "viduq3-turbo"
resolution:
type: str
label: "分辨率"
options:
- "1024p"
- "720p"
- "540p"
duration:
type: int # 建议改为 int 或 strtimes 不是标准类型
label: "时长"
# 如果时长也有选项,需要补全,例如:
# options:
# - 5
# - 10
off_peak:
type: int # 建议改为 int 或 str因为值是 0 和 1
label: "错峰执行"
options:
- off_peak # 正常时段
- normal # 错峰
price:
type: float
label: 单价
'''
pricing_program_timing表中的pricing_data字段的数据是一个只有一个属性"pricings"的字典
其值为定价条目列表每个定价条目是个字典key值为pricing_spec字段定义的字段
pricings:
- resolution: 480p
duration: 4
audio: false
- resolution: 480p
duration: 8
audio: false
- resolution: 480p
duration: 12
audio: false
- resolution: 480p
duration: 4
audio: true
- resolution: 480p
duration: 8
audio: true
- resolution: 480p
duration: 12
audio: true
- resolution: 720p
duration: 4
audio: false
- resolution: 720p
duration: 8
audio: false
- resolution: 720p
duration: 12
audio: false
- resolution: 720p
duration: 4
audio: true
- resolution: 720p
duration: 8
audio: true
- resolution: 720p
duration: 12
audio: true
"""
typefuncs = {
'int': int,
'float': float
}
def typevalue(v, t):
f = typefuncs.get(t)
if not f:
return v
return f(v)
def check_value(field, spec_value, data_value):
if field.value_mode == 'between':
arr = spec_value.strip().split()
if len(arr) < 2 or len(arr) > 3:
e = f'{spec_value=} error'
exception(e)
raise Exception(e)
if (arr[0] is None or arr[-1] is None) and field.type == 'str':
e = f'字符串类型between方法的两个值任何一个都不能为空'
exception(e)
raise e
else:
if arr[0] is None:
arr[0] = -float('inf')
if arr[-1] is None:
arr[-1] = float('inf')
arr[0] = float(arr[0])
arr[-1] = float(arr[-1])
fvalue = float(data_value)
if len(arr) == 2 or arr[1] == '=~' :
return arr[0] <= fvalue and fvalue < arr[-1]
if arr[1] == '~':
return arr[0] < fvalue and fvalue < arr[-1]
if arr[1] == '~=':
return arr[0] < fvalue and fvalue <= arr[-1]
e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~='
exception(e)
raise Exception(e)
if field.value_mode == 'in':
arr = spec_value.strip().split()
arr = [ typevalue(a, field.type) for a in arr ]
# debug(f'{arr=}, {data_value=}')
return data_value in arr
mode = field.value_mode
if not mode or mode == '=':
mode = '=='
ns = {
"a": data_value,
"b": typevalue(spec_value, field.type)
}
script = f'a {mode} b'
x = eval(script, ns)
return x
def data_mapping(ns, name, v):
if f'{name}_mappings' in ns.keys():
mappings = ns[f'{name}_mappings']
if mappings:
return mappings.get(v, v)
return v
class PricingProgram:
pricing_data = {}
@staticmethod
async def get_pricing_program(ppid):
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program', {'id': ppid})
if recs:
return recs[0]
e = f'pricing_program(id={ppid}) not found'
raise e
e = f'read pricing_program(id={ppid}) failed'
raise e
@staticmethod
async def load_pricing_data(pptid, webpath_xlsx):
env = ServerEnv()
fs = FileStorage()
fp = fs.realPath(webpath_xlsx)
pricings = load_xlsx_pricing(fp)
async with get_sor_context(env, 'pricing') as sor:
ppts = await sor.R('pricing_program_timing', {'id': pptid})
if ppts:
ppt = ppts[0]
pps = await sor.R('pricing_program', {'id': ppt.ppid})
if not pps:
e = f'pricing_program({pptid}) can not find pricing_program'
exception(f)
pp = pps[0]
pp_spec = yaml.safe_load(pp.pricing_spec)
formula = None
if pp_spec.get('fields'):
fields = pp_spec.get('fields')
pricing_formula = pp_spec.get('formula')
else:
fields = pp_spec
newpricings = []
for p in pricings:
np = {}
for k,v in p.items():
for fk, fv in fields.items():
if k == fv['label']:
np.update({fk:v})
newpricings.append(np)
d = {
'fields': fields,
'formula': formula,
'pricings': newpricings
}
debug(f'{d=}')
ppt.pricing_data = yaml.dump(d, allow_unicode=True)
await sor.U('pricing_program_timing', {
'id': ppt.id,
'pricing_data': ppt.pricing_data
})
return True
e = f'pricing_program_timing(id={pptid}) not found'
exception(e)
raise Exception(e)
e = f'pricing_program_timing(id={pptid}) read failed'
exception(e)
raise e
@staticmethod
async def write_pricing_patten(request, ppid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
env = request._run_ns
recs = await sor.R('pricing_program', {'id': ppid})
if not recs:
debug(f'id={ppid} pricing_program not found')
r = recs[0]
x = DictObject(** yaml.safe_load(r.pricing_spec))
fields = x
if x.get('fields'):
fields = x['fields']
fpath = write_pattern_xlsx(r.name, fields)
return fpath
@staticmethod
async def write_pricing_data(request, pptid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
env = request._run_ns
recs = await sor.R('pricing_program_timing', {'id': pptid})
if not recs:
debug(f'id={pptid} pricing_program not found')
r = recs[0]
recs = await sor.R('pricing_program', {'id': r.ppid})
x = DictObject(** yaml.safe_load(r.pricing_data))
fields = x
if x.get('fields'):
fields = x['fields']
data = x.get('pricings')
fpath = write_pattern_xlsx(recs[0].name, fields, data=data)
return fpath
@staticmethod
def pp_db2app(pp):
try:
pp.pricing_spec = yaml.safe_load(pp.pricing_spec)
except Exception as e:
e = f'{pp.pricing_spec}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
def pp_app2db(pp):
try:
pp.pricing_spec = yaml.dump(pp.pricing_spec, allow_unicode=True)
except Exception as e:
e = f'{pp.pricing_spec}:导出到yaml失败'
exception(e)
raise Exception(e)
@staticmethod
def ppt_db2app(ppt):
try:
ppt.pricing_data = yaml.safe_load(ppt.pricing_data)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
def ppt_app2db(ppt):
try:
ppt.pricing_data = yaml.dump(ppt.pricing_data, allow_unicode=True)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
async def _invalidate_ppid_cache(ppid):
"""清除指定 ppid 的所有缓存条目。"""
dates = PricingProgram.pricing_data.get(ppid, [])
for d in dates:
k = f'{ppid}.{d}'
if k in PricingProgram.pricing_data:
del PricingProgram.pricing_data[k]
if ppid in PricingProgram.pricing_data:
del PricingProgram.pricing_data[ppid]
@staticmethod
def _extract_ppid_from_timing_event(data):
"""从 pricing_program_timing 事件 payload 中直接提取 ppid。"""
if not data:
return None
ppid = data.get('ppid')
if ppid:
return ppid
old_data = data.get('old') or data.get('old_row') or data.get('old_data')
if isinstance(old_data, dict):
return old_data.get('ppid')
return None
@staticmethod
async def _get_ppid_by_timing_id(pptid):
"""通过 pricing_program_timing.id 反查 ppid。删除后记录不存在时返回 None。"""
if not pptid:
return None
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program_timing', {'id': pptid})
if not recs:
return None
return recs[0].ppid
@staticmethod
async def _resolve_timing_event_ppid(data, allow_lookup=True):
"""解析 pricing_program_timing 事件影响的 ppid。
SQLor 的 C/U/D 事件 payload 是调用 sor.C/U/D 时传入的 ns
不保证包含完整行。常见 U/D payload 只有主键 id 和修改字段;
因此需要在记录仍存在时按 id 反查 ppid。删除事件发生在 delete
之后,若 d:after 仅有 id则数据库已无法反查只能依赖 d:before
预缓存或 payload 中 old/ppid 兜底。
"""
ppid = PricingProgram._extract_ppid_from_timing_event(data)
if ppid:
return ppid
pptid = data.get('id') if data else None
if allow_lookup and pptid:
ppid = await PricingProgram._get_ppid_by_timing_id(pptid)
if ppid:
return ppid
if pptid:
return PricingProgram.pricing_data.get(f'timing:{pptid}:ppid')
return None
@staticmethod
async def precache_timing_delete(data):
"""删除前预缓存 pricing_program_timing.id -> ppid供 d:after 使用。"""
pptid = data.get('id') if data else None
if not pptid:
exception(f'id not found in timing delete before event data: {data}')
return
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=True)
if not ppid:
exception(f'ppid not found in timing delete before event data: {data}')
return
PricingProgram.pricing_data[f'timing:{pptid}:ppid'] = ppid
debug(f'--EventHandle timing delete before: id={pptid}, ppid={ppid}')
@staticmethod
async def _reload_timing_pp_data(data, action):
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=(action != 'delete'))
if not ppid:
exception(f'ppid not found in timing {action} event data: {data}')
return
debug(f'--EventHandle timing {action}: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'timing {action}: get_ppid_pricing failed for ppid={ppid}: {e}')
finally:
if action == 'delete' and data and data.get('id'):
PricingProgram.pricing_data.pop(f'timing:{data.get("id")}:ppid', None)
@staticmethod
async def reload_pp_data(ppt):
await PricingProgram._reload_timing_pp_data(ppt, 'update')
@staticmethod
async def reload_pricing_program(data):
"""处理 pricing_program 表的增删改事件,刷新该程序的全部缓存。"""
ppid = data.get('id')
if not ppid:
exception(f'ppid (id) not found in pricing_program event data: {data}')
return
debug(f'--EventHandle pricing_program c/u/d: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'reload_pricing_program: get_ppid_pricing failed for ppid={ppid}: {e}')
@staticmethod
async def on_timing_create(data):
"""处理 pricing_program_timing 新增事件。"""
await PricingProgram._reload_timing_pp_data(data, 'create')
@staticmethod
async def on_timing_delete(data):
"""处理 pricing_program_timing 删除事件。"""
await PricingProgram._reload_timing_pp_data(data, 'delete')
@staticmethod
async def get_ppid_pricing(ppid):
dat = curDateString()
k = f'{ppid}.{dat}'
d = PricingProgram.pricing_data.get(k)
if d:
return d
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
sql = """select a.name, a.ownerid, a.providerid,
pricing_belong, discount, b.pricing_data
from pricing_program a, pricing_program_timing b
where a.id = b.ppid
and a.id = ${ppid}$
and b.enabled_date <= ${biz_date}$
and b.expired_date > ${biz_date}$
order by b.enabled_date desc"""
recs = await sor.sqlExe(sql, {
'ppid': ppid,
'biz_date': dat
})
if len(recs) == 0:
e = Exception(f'{ppid=},{dat=} data not found')
exception(f'{e}')
raise e
d = recs[0]
PricingProgram.pricing_data[k] = d
dates = PricingProgram.pricing_data.get(ppid, [])
dates.append(dat)
dates_cnt = len(dates)
if dates_cnt > 2:
for i in range(dates_cnt - 2):
dk = f'{ppid}.{dates[i]}'
PricingProgram.pricing_data[dk]
dates = dates[-2:]
PricingProgram.pricing_data[ppid] = dates
return d
async def buffered_charging(ppid, data):
r = await PricingProgram.get_ppid_pricing(ppid)
prices = PricingProgram.get_pricing_from_ymalstr(data,
r.pricing_data)
amt = 0.0
for p in prices:
p.cost = p.amount * r.discount
return prices
async def charging(sor, ppid, data):
if ppid is None:
e = Exception(f'ppid is None, {data=}')
exception(f'{e}')
raise e
if data is None:
e = Exception(f'{ppid=} data is None')
exception(f'{e}')
raise e
env = ServerEnv()
biz_date = await env.get_business_date(sor)
sql = """select a.name, a.ownerid, a.providerid,
pricing_belong, discount, b.pricing_data
from pricing_program a, pricing_program_timing b
where a.id = b.ppid
and a.id = ${ppid}$
and b.enabled_date <= ${biz_date}$
and b.expired_date > ${biz_date}$
order by b.enabled_date desc"""
recs = await sor.sqlExe(sql, {
'ppid': ppid,
'biz_date': biz_date
})
if recs:
r = recs[0]
r.prices = PricingProgram.get_pricing_from_ymalstr(data,
r.pricing_data)
debug(f'{r.prices=}')
amt = 0.0
for p in r.prices:
p.cost = p.amount * r.discount
return r.prices
@staticmethod
def get_pricing_from_ymalstr(config_data, yamlstr):
"""
yamlstr是从
"""
if config_data is None:
e = Exception(f'config_data is None, {yamlstr=}')
exception(f'{e=}')
raise e
config_data = config_data.copy()
d = None
try:
d = yaml.safe_load(yamlstr)
except Exception as e:
exception(f'yaml.sage_load({yamlstr}) error: {e}')
raise e
d = DictObject(**d)
if not d.fields:
exception(f'{d} has not "fields"')
raise Exception(f'定价定义中没有fields数据')
if not d.pricings:
exception(f'{d} has not "pricings"')
raise Exception(f'定价定义中没有pricing数据')
ret_items = []
for i, p in enumerate(d.pricings):
if not p.formula:
debug(f'无公式:{p=}')
continue
p_ok = True
times = 1
unit = 1
ns = DictObject(**config_data)
for k,spec_value in p.items():
if spec_value is None:
continue
if k == 'formula':
continue
f = d.fields.get(k)
if not f:
e = f'定价项({i})中的{k}在fields中没有定义'
exception(f'{e}')
raise Exception(e)
data_value = config_data.get(k)
# p[f'old_{k}'] = data_value
# p[f'mapping_{k}'] = data_mapping(d, k, data_value) #需要mapping的数据转换
data_value = data_mapping(d, k, data_value)
if data_value is None:
if 'default' in f.keys():
data_value = f['default']
else:
e = f'数据({config_data})没有({k})数据'
exception(e)
raise Exception(e)
try:
flg = check_value(f, spec_value, data_value)
if not flg:
# 条件不满足
# debug(f'条件不满足:{p=},{spec_value=}, {data_value=}, {k=}')
p_ok = False
break
except Exception as e:
msg = f'{p=},{f}: {spec_value=}, {data_value=}'
exception(f'{e}:{msg}')
break
if p_ok and p.formula:
np = p.copy()
formula = p.formula
if not formula:
e = f'{p} not formula found'
exception(e)
raise Exception(e)
debug(f'{formula=}, {ns=}, {p=}, {d.fields=}')
np.data = config_data
env_data = DictObject(config_data)
np.amount = eval(formula, env_data)
ret_items.append(np)
else:
info(f'{config_data=}, {p=}, {d.model_mappings=}, mismatched')
if len(ret_items) == 0:
e = f'{config_data=}{yamlstr=}没有找到合适的定价'
exception(e)
raise Exception(e)
return ret_items
async def get_pricing_program(ppid):
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program', {'id': ppid})
if len(recs) == 0:
exception(f'{ppid} not found in pricing_program_timing')
return None
pp = recs[0]
if ppt.pricing_data is None:
exception(f'{ppid} pricing_data is None in pricing_program_timing')
return None
try:
PricingProgram.pp_db2app(ppt)
except Exception as e:
return None
async def get_pricing_program_timeing(pptid):
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program_timing', {'id': pptid})
if len(recs) == 0:
exception(f'{pptid} not found in pricing_program_timing')
return None
ppt = recs[0]
if ppt.pricing_data is None:
exception(f'{pptid} pricing_data is None in pricing_program_timing')
return None
try:
PricingProgram.ppt_db2app(ppt)
except Exception as e:
return None
return ppt
async def test_pricing(pptid, data):
ppt = get_pricing_program_timeing(pptid)
prices = PricingProgram.get_pricing_from_ymalstr(data, ppt.pricing_data)
if prices is None:
return None
amount = 0
for p in prices:
amount += p.amount
return amount
if __name__ == '__main__':
MyLogger('Pricing', levelname='info')
yamlstr = """fields:
formula:
label: 计算公式
type: str
model:
label: 模型
type: str
model_mappings: # 模型映射
"doubao-seed-2-0-pro-260215": "doubao-seed-2-0-pro"
"doubao-seed-2-0-lite-260215": "doubao-seed-2-0-lite"
"doubao-seed-2-0-mini-260215": "doubao-seed-2-0-mini"
"doubao-seed-2-0-code-preview-260215": "doubao-seed-2-0-code"
pricings:
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-pro
- formula: 0.6 * float(prompt_tokens) + 3.6 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-lite
- formula: (0.2 * float(prompt_tokens) + 2 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-mini
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-code
"""
config_data = {'completion_tokens': 1416, 'prompt_tokens': 52, 'total_tokens': 1468, 'prompt_tokens_details': {'cached_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 884}, 'model': 'doubao-seed-2-0-pro-260215'}
print(f'{config_data=}, {yamlstr=}')
p = PricingProgram.get_pricing_from_ymalstr(config_data, yamlstr)
print(f'{p=}')