993 lines
29 KiB
Python
993 lines
29 KiB
Python
import json
|
||
import yaml
|
||
from ahserver.serverenv import ServerEnv
|
||
from ahserver.filestorage import FileStorage
|
||
from sqlor.dbpools import DBPools, get_sor_context
|
||
from appPublic.log import debug, exception, info, MyLogger
|
||
from appPublic.timeUtils import curDateString
|
||
from appPublic.dictObject import DictObject
|
||
from appPublic.jsonConfig import getConfig
|
||
from .write_pattern import write_pattern_xlsx, load_xlsx_pricing
|
||
import yaml
|
||
|
||
|
||
def _cache_enabled():
|
||
"""Check if cache is enabled for pricing module in config.json"""
|
||
try:
|
||
config = getConfig()
|
||
module_cache = config.module_cache
|
||
if module_cache is None:
|
||
return True
|
||
return getattr(module_cache, 'pricing', True)
|
||
except Exception:
|
||
return True
|
||
|
||
"""
|
||
采用yaml描述定价策略,
|
||
在pricing_program的pricing_spec表中定义定价的数据字段
|
||
遵循一下规格:
|
||
字典结构,key是字段名,需定义字段的type(类型),label(标题),value_mode(值方式)(可选),options(可选项)(可选)
|
||
必须要有一个price字段,其type: float
|
||
|
||
value_mode 有下列可能的取值
|
||
between # 定价表中此字段以如此格式给出:
|
||
# "小值 ~ 大值", "~"的前面或后面可以加"=",
|
||
# 表示含小值,或含大值
|
||
in # 定价表中此字段值有如下格式:值1 值2 ...
|
||
= # 缺省,给定单一值
|
||
> #
|
||
>=
|
||
<
|
||
<=
|
||
|
||
|
||
例子(vidu定价字段):
|
||
'''
|
||
model:
|
||
type: str
|
||
label: "模型"
|
||
options:
|
||
- "viduq3-pro"
|
||
- "viduq3-turbo"
|
||
|
||
resolution:
|
||
type: str
|
||
label: "分辨率"
|
||
options:
|
||
- "1024p"
|
||
- "720p"
|
||
- "540p"
|
||
|
||
duration:
|
||
type: int # 建议改为 int 或 str,times 不是标准类型
|
||
label: "时长"
|
||
# 如果时长也有选项,需要补全,例如:
|
||
# options:
|
||
# - 5
|
||
# - 10
|
||
|
||
off_peak:
|
||
type: int # 建议改为 int 或 str,因为值是 0 和 1
|
||
label: "错峰执行"
|
||
options:
|
||
- off_peak # 正常时段
|
||
- normal # 错峰
|
||
price:
|
||
type: float
|
||
label: 单价
|
||
'''
|
||
|
||
pricing_program_timing表中的pricing_data字段的数据是一个只有一个属性"pricings"的字典
|
||
其值为定价条目列表,每个定价条目是个字典,key值为pricing_spec字段定义的字段,
|
||
pricings:
|
||
- resolution: 480p
|
||
duration: 4
|
||
audio: false
|
||
- resolution: 480p
|
||
duration: 8
|
||
audio: false
|
||
- resolution: 480p
|
||
duration: 12
|
||
audio: false
|
||
- resolution: 480p
|
||
duration: 4
|
||
audio: true
|
||
- resolution: 480p
|
||
duration: 8
|
||
audio: true
|
||
- resolution: 480p
|
||
duration: 12
|
||
audio: true
|
||
- resolution: 720p
|
||
duration: 4
|
||
audio: false
|
||
- resolution: 720p
|
||
duration: 8
|
||
audio: false
|
||
- resolution: 720p
|
||
duration: 12
|
||
audio: false
|
||
- resolution: 720p
|
||
duration: 4
|
||
audio: true
|
||
- resolution: 720p
|
||
duration: 8
|
||
audio: true
|
||
- resolution: 720p
|
||
duration: 12
|
||
audio: true
|
||
"""
|
||
|
||
typefuncs = {
|
||
'int': int,
|
||
'float': float
|
||
}
|
||
def typevalue(v, t):
|
||
f = typefuncs.get(t)
|
||
if not f:
|
||
return v
|
||
return f(v)
|
||
|
||
def check_value(field, spec_value, data_value):
|
||
if field.value_mode == 'between':
|
||
arr = spec_value.strip().split()
|
||
if len(arr) < 2 or len(arr) > 3:
|
||
e = f'{spec_value=} error'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
if (arr[0] is None or arr[-1] is None) and field.type == 'str':
|
||
e = f'字符串类型between方法的两个值任何一个都不能为空'
|
||
exception(e)
|
||
raise e
|
||
else:
|
||
if arr[0] is None:
|
||
arr[0] = -float('inf')
|
||
if arr[-1] is None:
|
||
arr[-1] = float('inf')
|
||
arr[0] = float(arr[0])
|
||
arr[-1] = float(arr[-1])
|
||
fvalue = float(data_value)
|
||
|
||
if len(arr) == 2 or arr[1] == '=~' :
|
||
return arr[0] <= fvalue and fvalue < arr[-1]
|
||
if arr[1] == '~':
|
||
return arr[0] < fvalue and fvalue < arr[-1]
|
||
if arr[1] == '~=':
|
||
return arr[0] < fvalue and fvalue <= arr[-1]
|
||
e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~='
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
if field.value_mode == 'in':
|
||
arr = spec_value.strip().split()
|
||
arr = [ typevalue(a, field.type) for a in arr ]
|
||
# debug(f'{arr=}, {data_value=}')
|
||
return data_value in arr
|
||
|
||
mode = field.value_mode
|
||
if not mode or mode == '=':
|
||
mode = '=='
|
||
ns = {
|
||
"a": data_value,
|
||
"b": typevalue(spec_value, field.type)
|
||
}
|
||
script = f'a {mode} b'
|
||
x = eval(script, ns)
|
||
return x
|
||
|
||
def data_mapping(ns, name, v):
|
||
if f'{name}_mappings' in ns.keys():
|
||
mappings = ns[f'{name}_mappings']
|
||
if mappings:
|
||
return mappings.get(v, v)
|
||
return v
|
||
|
||
class PricingProgram:
|
||
pricing_data = {}
|
||
|
||
@staticmethod
|
||
def on_hot_reload(data=None):
|
||
"""Event handler for hot_reload event. Clears pricing cache."""
|
||
from appPublic.log import debug
|
||
debug(f'[pricing] on_hot_reload called, clearing pricing_data (data={data})')
|
||
PricingProgram.pricing_data.clear()
|
||
|
||
@staticmethod
|
||
async def get_pricing_program(ppid):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
recs = await sor.R('pricing_program', {'id': ppid})
|
||
if recs:
|
||
return recs[0]
|
||
e = f'pricing_program(id={ppid}) not found'
|
||
raise e
|
||
e = f'read pricing_program(id={ppid}) failed'
|
||
raise e
|
||
|
||
@staticmethod
|
||
async def load_pricing_data(pptid, webpath_xlsx):
|
||
env = ServerEnv()
|
||
fs = FileStorage()
|
||
fp = fs.realPath(webpath_xlsx)
|
||
pricings = load_xlsx_pricing(fp)
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
ppts = await sor.R('pricing_program_timing', {'id': pptid})
|
||
if ppts:
|
||
ppt = ppts[0]
|
||
pps = await sor.R('pricing_program', {'id': ppt.ppid})
|
||
if not pps:
|
||
e = f'pricing_program({pptid}) can not find pricing_program'
|
||
exception(f)
|
||
pp = pps[0]
|
||
pp_spec = yaml.safe_load(pp.pricing_spec)
|
||
formula = None
|
||
if pp_spec.get('fields'):
|
||
fields = pp_spec.get('fields')
|
||
pricing_formula = pp_spec.get('formula')
|
||
else:
|
||
fields = pp_spec
|
||
newpricings = []
|
||
for p in pricings:
|
||
np = {}
|
||
for k,v in p.items():
|
||
for fk, fv in fields.items():
|
||
if k == fv['label']:
|
||
np.update({fk:v})
|
||
newpricings.append(np)
|
||
d = {
|
||
'fields': fields,
|
||
'formula': formula,
|
||
'pricings': newpricings
|
||
}
|
||
debug(f'{d=}')
|
||
ppt.pricing_data = yaml.dump(d, allow_unicode=True)
|
||
await sor.U('pricing_program_timing', {
|
||
'id': ppt.id,
|
||
'pricing_data': ppt.pricing_data
|
||
})
|
||
return True
|
||
e = f'pricing_program_timing(id={pptid}) not found'
|
||
exception(e)
|
||
raise Exception(e)
|
||
e = f'pricing_program_timing(id={pptid}) read failed'
|
||
exception(e)
|
||
raise e
|
||
|
||
@staticmethod
|
||
async def write_pricing_patten(request, ppid):
|
||
async with get_sor_context(request._run_ns, 'pricing') as sor:
|
||
env = request._run_ns
|
||
recs = await sor.R('pricing_program', {'id': ppid})
|
||
if not recs:
|
||
debug(f'id={ppid} pricing_program not found')
|
||
r = recs[0]
|
||
x = DictObject(** yaml.safe_load(r.pricing_spec))
|
||
fields = x
|
||
if x.get('fields'):
|
||
fields = x['fields']
|
||
|
||
fpath = write_pattern_xlsx(r.name, fields)
|
||
return fpath
|
||
|
||
@staticmethod
|
||
async def write_pricing_data(request, pptid):
|
||
async with get_sor_context(request._run_ns, 'pricing') as sor:
|
||
env = request._run_ns
|
||
recs = await sor.R('pricing_program_timing', {'id': pptid})
|
||
if not recs:
|
||
debug(f'id={pptid} pricing_program not found')
|
||
r = recs[0]
|
||
recs = await sor.R('pricing_program', {'id': r.ppid})
|
||
x = DictObject(** yaml.safe_load(r.pricing_data))
|
||
fields = x
|
||
if x.get('fields'):
|
||
fields = x['fields']
|
||
data = x.get('pricings')
|
||
|
||
fpath = write_pattern_xlsx(recs[0].name, fields, data=data)
|
||
return fpath
|
||
|
||
@staticmethod
|
||
def pp_db2app(pp):
|
||
try:
|
||
pp.pricing_spec = yaml.safe_load(pp.pricing_spec)
|
||
except Exception as e:
|
||
e = f'{pp.pricing_spec}:yaml数据格式错误'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
def pp_app2db(pp):
|
||
try:
|
||
pp.pricing_spec = yaml.dump(pp.pricing_spec, allow_unicode=True)
|
||
except Exception as e:
|
||
e = f'{pp.pricing_spec}:导出到yaml失败'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
def ppt_db2app(ppt):
|
||
try:
|
||
ppt.pricing_data = yaml.safe_load(ppt.pricing_data)
|
||
except Exception as e:
|
||
e = f'{ppt.pricing_data}:yaml数据格式错误'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
def ppt_app2db(ppt):
|
||
try:
|
||
ppt.pricing_data = yaml.dump(ppt.pricing_data, allow_unicode=True)
|
||
except Exception as e:
|
||
e = f'{ppt.pricing_data}:yaml数据格式错误'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
@staticmethod
|
||
async def _invalidate_ppid_cache(ppid):
|
||
"""清除指定 ppid 的所有缓存条目。"""
|
||
dates = PricingProgram.pricing_data.get(ppid, [])
|
||
for d in dates:
|
||
k = f'{ppid}.{d}'
|
||
if k in PricingProgram.pricing_data:
|
||
del PricingProgram.pricing_data[k]
|
||
if ppid in PricingProgram.pricing_data:
|
||
del PricingProgram.pricing_data[ppid]
|
||
|
||
@staticmethod
|
||
def _extract_ppid_from_timing_event(data):
|
||
"""从 pricing_program_timing 事件 payload 中直接提取 ppid。"""
|
||
if not data:
|
||
return None
|
||
ppid = data.get('ppid')
|
||
if ppid:
|
||
return ppid
|
||
old_data = data.get('old') or data.get('old_row') or data.get('old_data')
|
||
if isinstance(old_data, dict):
|
||
return old_data.get('ppid')
|
||
return None
|
||
|
||
@staticmethod
|
||
async def _get_ppid_by_timing_id(pptid):
|
||
"""通过 pricing_program_timing.id 反查 ppid。删除后记录不存在时返回 None。"""
|
||
if not pptid:
|
||
return None
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
recs = await sor.R('pricing_program_timing', {'id': pptid})
|
||
if not recs:
|
||
return None
|
||
return recs[0].ppid
|
||
|
||
@staticmethod
|
||
async def _resolve_timing_event_ppid(data, allow_lookup=True):
|
||
"""解析 pricing_program_timing 事件影响的 ppid。
|
||
|
||
SQLor 的 C/U/D 事件 payload 是调用 sor.C/U/D 时传入的 ns,
|
||
不保证包含完整行。常见 U/D payload 只有主键 id 和修改字段;
|
||
因此需要在记录仍存在时按 id 反查 ppid。删除事件发生在 delete
|
||
之后,若 d:after 仅有 id,则数据库已无法反查,只能依赖 d:before
|
||
预缓存或 payload 中 old/ppid 兜底。
|
||
"""
|
||
ppid = PricingProgram._extract_ppid_from_timing_event(data)
|
||
if ppid:
|
||
return ppid
|
||
pptid = data.get('id') if data else None
|
||
if allow_lookup and pptid:
|
||
ppid = await PricingProgram._get_ppid_by_timing_id(pptid)
|
||
if ppid:
|
||
return ppid
|
||
if pptid:
|
||
return PricingProgram.pricing_data.get(f'timing:{pptid}:ppid')
|
||
return None
|
||
|
||
@staticmethod
|
||
async def precache_timing_delete(data):
|
||
"""删除前预缓存 pricing_program_timing.id -> ppid,供 d:after 使用。"""
|
||
pptid = data.get('id') if data else None
|
||
if not pptid:
|
||
exception(f'id not found in timing delete before event data: {data}')
|
||
return
|
||
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=True)
|
||
if not ppid:
|
||
exception(f'ppid not found in timing delete before event data: {data}')
|
||
return
|
||
PricingProgram.pricing_data[f'timing:{pptid}:ppid'] = ppid
|
||
debug(f'--EventHandle timing delete before: id={pptid}, ppid={ppid}')
|
||
|
||
@staticmethod
|
||
async def _reload_timing_pp_data(data, action):
|
||
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=(action != 'delete'))
|
||
if not ppid:
|
||
exception(f'ppid not found in timing {action} event data: {data}')
|
||
return
|
||
debug(f'--EventHandle timing {action}: {data}')
|
||
await PricingProgram._invalidate_ppid_cache(ppid)
|
||
try:
|
||
await PricingProgram.get_ppid_pricing(ppid)
|
||
except Exception as e:
|
||
debug(f'timing {action}: get_ppid_pricing failed for ppid={ppid}: {e}')
|
||
finally:
|
||
if action == 'delete' and data and data.get('id'):
|
||
PricingProgram.pricing_data.pop(f'timing:{data.get("id")}:ppid', None)
|
||
|
||
@staticmethod
|
||
async def reload_pp_data(ppt):
|
||
await PricingProgram._reload_timing_pp_data(ppt, 'update')
|
||
|
||
@staticmethod
|
||
async def reload_pricing_program(data):
|
||
"""处理 pricing_program 表的增删改事件,刷新该程序的全部缓存。"""
|
||
ppid = data.get('id')
|
||
if not ppid:
|
||
exception(f'ppid (id) not found in pricing_program event data: {data}')
|
||
return
|
||
debug(f'--EventHandle pricing_program c/u/d: {data}')
|
||
await PricingProgram._invalidate_ppid_cache(ppid)
|
||
try:
|
||
await PricingProgram.get_ppid_pricing(ppid)
|
||
except Exception as e:
|
||
debug(f'reload_pricing_program: get_ppid_pricing failed for ppid={ppid}: {e}')
|
||
|
||
@staticmethod
|
||
async def on_timing_create(data):
|
||
"""处理 pricing_program_timing 新增事件。"""
|
||
await PricingProgram._reload_timing_pp_data(data, 'create')
|
||
|
||
@staticmethod
|
||
async def on_timing_delete(data):
|
||
"""处理 pricing_program_timing 删除事件。"""
|
||
await PricingProgram._reload_timing_pp_data(data, 'delete')
|
||
|
||
@staticmethod
|
||
async def get_ppid_pricing(ppid):
|
||
dat = curDateString()
|
||
k = f'{ppid}.{dat}'
|
||
if _cache_enabled():
|
||
d = PricingProgram.pricing_data.get(k)
|
||
if d:
|
||
return d
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
sql = """select a.name, a.ownerid, a.providerid,
|
||
pricing_belong, discount, b.pricing_data
|
||
from pricing_program a, pricing_program_timing b
|
||
where a.id = b.ppid
|
||
and a.id = ${ppid}$
|
||
and b.enabled_date <= ${biz_date}$
|
||
and b.expired_date > ${biz_date}$
|
||
order by b.enabled_date desc"""
|
||
recs = await sor.sqlExe(sql, {
|
||
'ppid': ppid,
|
||
'biz_date': dat
|
||
})
|
||
if len(recs) == 0:
|
||
e = Exception(f'{ppid=},{dat=} data not found')
|
||
exception(f'{e}')
|
||
raise e
|
||
d = recs[0]
|
||
if _cache_enabled():
|
||
PricingProgram.pricing_data[k] = d
|
||
dates = PricingProgram.pricing_data.get(ppid, [])
|
||
dates.append(dat)
|
||
dates_cnt = len(dates)
|
||
if dates_cnt > 2:
|
||
for i in range(dates_cnt - 2):
|
||
dk = f'{ppid}.{dates[i]}'
|
||
PricingProgram.pricing_data[dk]
|
||
dates = dates[-2:]
|
||
PricingProgram.pricing_data[ppid] = dates
|
||
return d
|
||
|
||
async def buffered_charging(ppid, data):
|
||
r = await PricingProgram.get_ppid_pricing(ppid)
|
||
prices = PricingProgram.get_pricing_from_ymalstr(data,
|
||
r.pricing_data)
|
||
amt = 0.0
|
||
for p in prices:
|
||
p.cost = p.amount * r.discount
|
||
return prices
|
||
|
||
async def charging(sor, ppid, data):
|
||
if ppid is None:
|
||
e = Exception(f'ppid is None, {data=}')
|
||
exception(f'{e}')
|
||
raise e
|
||
if data is None:
|
||
e = Exception(f'{ppid=} data is None')
|
||
exception(f'{e}')
|
||
raise e
|
||
env = ServerEnv()
|
||
biz_date = await env.get_business_date(sor)
|
||
sql = """select a.name, a.ownerid, a.providerid,
|
||
pricing_belong, discount, b.pricing_data
|
||
from pricing_program a, pricing_program_timing b
|
||
where a.id = b.ppid
|
||
and a.id = ${ppid}$
|
||
and b.enabled_date <= ${biz_date}$
|
||
and b.expired_date > ${biz_date}$
|
||
order by b.enabled_date desc"""
|
||
recs = await sor.sqlExe(sql, {
|
||
'ppid': ppid,
|
||
'biz_date': biz_date
|
||
})
|
||
if recs:
|
||
r = recs[0]
|
||
r.prices = PricingProgram.get_pricing_from_ymalstr(data,
|
||
r.pricing_data)
|
||
debug(f'{r.prices=}')
|
||
amt = 0.0
|
||
for p in r.prices:
|
||
p.cost = p.amount * r.discount
|
||
return r.prices
|
||
|
||
@staticmethod
|
||
def get_pricing_from_ymalstr(config_data, yamlstr):
|
||
"""
|
||
解析定价YAML并计算费用。
|
||
支持两种格式:
|
||
1. 旧格式:formula 字段(eval计算)
|
||
2. 新格式:price_factors + unit_prices + unit(自动计算)
|
||
|
||
支持 derived 字段:在 fields 中定义 derived 表达式,从原始 usage 数据计算衍生字段
|
||
例如:uncached_prompt_tokens.derived = "prompt_tokens - prompt_tokens_details.cached_tokens"
|
||
"""
|
||
if config_data is None:
|
||
e = Exception(f'config_data is None, {yamlstr=}')
|
||
exception(f'{e=}')
|
||
raise e
|
||
# 用 DictObject 包装,支持 dot notation 属性访问(derived 表达式依赖此特性)
|
||
config_data = DictObject(**config_data)
|
||
d = None
|
||
try:
|
||
d = yaml.safe_load(yamlstr)
|
||
except Exception as e:
|
||
exception(f'yaml.sage_load({yamlstr}) error: {e}')
|
||
raise e
|
||
d = DictObject(**d)
|
||
if not d.fields:
|
||
exception(f'{d} has not "fields"')
|
||
raise Exception(f'定价定义中没有fields数据')
|
||
if not d.pricings:
|
||
exception(f'{d} has not "pricings"')
|
||
raise Exception(f'定价定义中没有pricing数据')
|
||
|
||
# 处理 derived 字段:从原始 usage 数据计算衍生字段
|
||
# DictObject 支持属性访问嵌套字段,如 prompt_tokens_details.cached_tokens
|
||
for field_name, field_def in d.fields.items():
|
||
if not isinstance(field_def, dict):
|
||
continue
|
||
derived_expr = field_def.get('derived')
|
||
if not derived_expr:
|
||
continue
|
||
|
||
# eval 环境:用 config_data 本身(DictObject 支持 dot notation 属性访问)
|
||
eval_env = dict(config_data)
|
||
# 将顶层 key 也注入 eval 环境,使 eval 能找到嵌套属性
|
||
for k in list(config_data.keys()):
|
||
eval_env[k] = config_data[k]
|
||
|
||
try:
|
||
result = eval(derived_expr, {}, eval_env)
|
||
config_data[field_name] = result
|
||
debug(f'derived field {field_name} = {derived_expr} = {result}')
|
||
except Exception as e:
|
||
debug(f'derived field {field_name} evaluation failed: {derived_expr}, error: {e}')
|
||
config_data[field_name] = 0
|
||
|
||
# 单位映射表
|
||
unit_values = d.get('unit_values', {'百万': 1000000, '秒': 1, '千': 1000, '次': 1, '张': 1, '毫秒': 0.001, '元/百万tokens': 1000000, '元/total_tokens': 1, '元/times': 1})
|
||
|
||
ret_items = []
|
||
for i, p in enumerate(d.pricings):
|
||
# 跳过需要人工审核的记录
|
||
if p.get('_NEEDS_MANUAL_REVIEW'):
|
||
debug(f'跳过需要人工审核的定价项: {i}')
|
||
continue
|
||
|
||
# 判断是旧格式还是新格式
|
||
# 新格式要求 price_factors 是标量(str)、unit_prices 是标量(number)
|
||
# 生产数据中 price_factors 可能是 list、unit_prices 可能是 dict,此时走旧格式 formula
|
||
_raw_pf = p.get('price_factors')
|
||
_raw_up = p.get('unit_prices')
|
||
is_new_format = (_raw_pf is not None and _raw_up is not None
|
||
and not isinstance(_raw_pf, list) and not isinstance(_raw_up, dict))
|
||
is_old_format = p.get('formula') is not None
|
||
|
||
if not is_new_format and not is_old_format:
|
||
debug(f'无公式也无price_factors:{p=}')
|
||
continue
|
||
|
||
p_ok = True
|
||
ns = DictObject(**config_data)
|
||
|
||
# 检查过滤条件(排除定价计算字段)
|
||
skip_keys = {'formula', 'price_factors', 'unit_prices', 'unit', 'min_amount', 'filters', 'pricing_type'}
|
||
for k, spec_value in p.items():
|
||
if spec_value is None:
|
||
continue
|
||
if k in skip_keys:
|
||
continue
|
||
f = d.fields.get(k)
|
||
if not f:
|
||
e = f'定价项({i})中的{k}在fields中没有定义'
|
||
exception(f'{e}')
|
||
raise Exception(e)
|
||
data_value = config_data.get(k)
|
||
data_value = data_mapping(d, k, data_value)
|
||
if data_value is None:
|
||
if 'default' in f.keys():
|
||
data_value = f['default']
|
||
else:
|
||
e = f'数据({config_data})没有({k})数据'
|
||
exception(e)
|
||
raise Exception(e)
|
||
try:
|
||
flg = check_value(f, spec_value, data_value)
|
||
if not flg:
|
||
p_ok = False
|
||
break
|
||
except Exception as e:
|
||
msg = f'{p=},{f}: {spec_value=}, {data_value=}'
|
||
exception(f'{e}:{msg}')
|
||
break
|
||
|
||
# 检查 filters 区间条件(新格式)
|
||
if p_ok and is_new_format and 'filters' in p:
|
||
# filters 是多个区间选项,只要有一个匹配就行
|
||
filter_matched = False
|
||
for filter_item in p['filters']:
|
||
item_ok = True
|
||
for fk, fv in filter_item.items():
|
||
if fk == 'unit_prices':
|
||
continue
|
||
f = d.fields.get(fk)
|
||
if not f:
|
||
continue
|
||
data_value = config_data.get(fk)
|
||
data_value = data_mapping(d, fk, data_value)
|
||
if data_value is None:
|
||
continue
|
||
try:
|
||
flg = check_value(f, fv, data_value)
|
||
if not flg:
|
||
item_ok = False
|
||
break
|
||
except Exception as e:
|
||
debug(f'filter check error: {e}')
|
||
item_ok = False
|
||
break
|
||
if item_ok:
|
||
filter_matched = True
|
||
break
|
||
if not filter_matched:
|
||
p_ok = False
|
||
|
||
if not p_ok:
|
||
info(f'{config_data=}, {p=}, mismatched')
|
||
continue
|
||
|
||
np = p.copy()
|
||
np.data = config_data
|
||
|
||
if is_new_format:
|
||
# 新格式:price_factors + unit_prices + unit
|
||
factor_name = p['price_factors']
|
||
unit_price = p['unit_prices']
|
||
unit_str = p.get('unit', '次')
|
||
|
||
# 处理 filters 中的区间定价(查找匹配的 unit_prices)
|
||
if 'filters' in p:
|
||
for filter_item in p['filters']:
|
||
for fk, fv in filter_item.items():
|
||
if fk == 'unit_prices':
|
||
continue
|
||
f = d.fields.get(fk)
|
||
if not f:
|
||
continue
|
||
data_value = config_data.get(fk)
|
||
data_value = data_mapping(d, fk, data_value)
|
||
if data_value is None:
|
||
continue
|
||
try:
|
||
flg = check_value(f, fv, data_value)
|
||
if flg and 'unit_prices' in filter_item:
|
||
unit_price = filter_item['unit_prices']
|
||
except:
|
||
pass
|
||
|
||
# 获取 usage 值
|
||
if factor_name == 'flat':
|
||
# 固定费用
|
||
usage_value = 1
|
||
else:
|
||
usage_value = config_data.get(factor_name)
|
||
if usage_value is None:
|
||
debug(f'新格式:config_data中缺少{factor_name}')
|
||
continue
|
||
usage_value = float(usage_value)
|
||
|
||
# 获取单位值
|
||
unit_val = unit_values.get(unit_str, 1)
|
||
if isinstance(unit_val, str):
|
||
unit_val = float(unit_val)
|
||
|
||
# 计算金额
|
||
amount = unit_price * usage_value / unit_val
|
||
|
||
# 应用 min_amount
|
||
min_amount = p.get('min_amount', 0)
|
||
if min_amount and amount < min_amount:
|
||
amount = min_amount
|
||
|
||
np.amount = amount
|
||
ret_items.append(np)
|
||
|
||
elif is_old_format:
|
||
# 旧格式:formula
|
||
formula = p.formula
|
||
debug(f'{formula=}, {ns=}, {p=}, {d.fields=}')
|
||
env_data = DictObject(config_data)
|
||
np.amount = eval(formula, env_data)
|
||
ret_items.append(np)
|
||
|
||
if len(ret_items) == 0:
|
||
e = f'{config_data=}{yamlstr=}没有找到合适的定价'
|
||
exception(e)
|
||
raise Exception(e)
|
||
return ret_items
|
||
|
||
def generate_formula_from_factors(price_factors):
|
||
"""从 price_factors 数组自动生成 formula 字符串。
|
||
|
||
price_factors 格式:
|
||
[
|
||
{"factor": "prompt_tokens", "unit_price": 3.2, "unit_label": "元/百万Token"},
|
||
{"factor": "completion_tokens", "unit_price": 16.0, "unit_label": "元/百万Token"}
|
||
]
|
||
返回: "(3.2 * float(prompt_tokens) + 16.0 * float(completion_tokens)) / 1000000.0"
|
||
"""
|
||
if not price_factors:
|
||
return None
|
||
parts = []
|
||
has_million = False
|
||
for f in price_factors:
|
||
factor = f.get('factor', '')
|
||
unit_price = f.get('unit_price', 0)
|
||
unit_label = f.get('unit_label', '')
|
||
# 判断单位是否是"百万"级别(元/百万Token 等)
|
||
if '百万' in unit_label:
|
||
has_million = True
|
||
parts.append(f'{unit_price} * float({factor})')
|
||
formula = ' + '.join(parts)
|
||
if has_million:
|
||
formula = f'({formula}) / 1000000.0'
|
||
return formula
|
||
|
||
|
||
async def get_pricing_display(ppid):
|
||
"""获取定价项目的可读定价数据,供客户/前端展示。
|
||
|
||
返回结构:
|
||
{
|
||
"ppid": "...",
|
||
"name": "...",
|
||
"pricing_type": "per_use",
|
||
"items": [
|
||
{
|
||
"filters": {"model": "doubao-seed-2-0-pro"},
|
||
"filter_labels": {"模型": "doubao-seed-2-0-pro"},
|
||
"price_factors": [
|
||
{
|
||
"factor": "prompt_tokens",
|
||
"label": "输入Token",
|
||
"unit_price": 0.000006,
|
||
"unit": "百万",
|
||
"unit_label": "元/百万Token"
|
||
}
|
||
],
|
||
"formula": "...",
|
||
"min_amount": 0.01
|
||
}
|
||
]
|
||
}
|
||
"""
|
||
r = await PricingProgram.get_ppid_pricing(ppid)
|
||
pricing_data_str = r.pricing_data
|
||
d = yaml.safe_load(pricing_data_str)
|
||
if not d:
|
||
raise Exception(f'{ppid} pricing_data 为空')
|
||
|
||
fields = d.get('fields', {})
|
||
pricings = d.get('pricings', [])
|
||
pricing_type = d.get('pricing_type', 'per_use')
|
||
unit_values = d.get('unit_values', {'百万': 1000000, '秒': 1, '千': 1000, '次': 1, '张': 1, '毫秒': 0.001, '元/百万tokens': 1000000, '元/total_tokens': 1, '元/times': 1})
|
||
|
||
items = []
|
||
for p in pricings:
|
||
# 跳过需要人工审核的记录
|
||
if p.get('_NEEDS_MANUAL_REVIEW'):
|
||
continue
|
||
|
||
# 提取过滤条件(role=filter 的字段)
|
||
filters = {}
|
||
filter_labels = {}
|
||
skip_keys = {'formula', 'price_factors', 'unit_prices', 'unit', 'filters', 'min_amount', 'pricing_type', 'name'}
|
||
|
||
for k, v in p.items():
|
||
if k in skip_keys:
|
||
continue
|
||
fdef = fields.get(k, {})
|
||
role = fdef.get('role', 'filter') if isinstance(fdef, dict) else 'filter'
|
||
if role == 'filter':
|
||
filters[k] = v
|
||
label = fdef.get('label', k) if isinstance(fdef, dict) else k
|
||
filter_labels[label] = v
|
||
|
||
# 新格式:price_factors(scalar) + unit_prices(scalar) + unit
|
||
# 注意:生产数据中 price_factors 可能是 list、unit_prices 可能是 dict(混合格式),
|
||
# 此时应走旧格式 formula 路径
|
||
_raw_pf = p.get('price_factors')
|
||
_raw_up = p.get('unit_prices')
|
||
is_new_format = (_raw_pf is not None and _raw_up is not None
|
||
and not isinstance(_raw_pf, list) and not isinstance(_raw_up, dict))
|
||
|
||
if is_new_format:
|
||
factor_name = _raw_pf
|
||
unit_price = _raw_up
|
||
unit_str = p.get('unit', '次')
|
||
|
||
# 获取 factor label
|
||
fdef = fields.get(factor_name, {})
|
||
factor_label = fdef.get('label', factor_name) if isinstance(fdef, dict) else factor_name
|
||
|
||
# 新格式 unit_prices 已是展示价(如 6.0 元/百万),无需再乘 unit_val
|
||
display_price = unit_price
|
||
|
||
# 构建 unit_label (元/单位)
|
||
unit_label = f"元/{unit_str}" if unit_str else "元"
|
||
|
||
price_factors_display = [{
|
||
'factor': factor_name,
|
||
'label': factor_label,
|
||
'unit_price': display_price,
|
||
'unit': unit_str,
|
||
'unit_label': unit_label
|
||
}]
|
||
|
||
# 处理 filters 区间定价 - 只保留有意义的 filter(如 model)且价格与主价格不同
|
||
if 'filters' in p:
|
||
tiered_pricing = []
|
||
for fi in p['filters']:
|
||
fi_copy = fi.copy()
|
||
raw_tier_price = fi.get('unit_prices', unit_price)
|
||
fi_copy.pop('unit_prices', None)
|
||
fi_copy.pop('value_mode', None)
|
||
# 只保留有意义的 filter key
|
||
meaningful_filters = {k: v for k, v in fi_copy.items()
|
||
if not k.endswith('_tokens') and k != 'value_mode'}
|
||
# 只展示价格与主价格不同的 tiered
|
||
if meaningful_filters and raw_tier_price != unit_price:
|
||
tiered_pricing.append({
|
||
'filters': meaningful_filters,
|
||
'unit_prices': raw_tier_price
|
||
})
|
||
item = {'price_factors': price_factors_display}
|
||
min_amount = p.get('min_amount', 0)
|
||
if min_amount:
|
||
item['min_amount'] = min_amount
|
||
if filters:
|
||
item['filters'] = filters
|
||
item['filter_labels'] = filter_labels
|
||
if tiered_pricing:
|
||
item['price_factors'][0]['tiered'] = tiered_pricing
|
||
items.append(item)
|
||
else:
|
||
# 旧格式:formula
|
||
price_factors = p.get('price_factors', None)
|
||
if not price_factors:
|
||
# fallback: 从 fields 中构建展示信息
|
||
price_factors = []
|
||
for k, fdef in fields.items():
|
||
if not isinstance(fdef, dict):
|
||
continue
|
||
role = fdef.get('role', 'filter')
|
||
if role == 'factor' or fdef.get('type') == 'float':
|
||
label = fdef.get('label', k)
|
||
price_factors.append({
|
||
'factor': k,
|
||
'label': label,
|
||
'unit_price': None,
|
||
'unit_label': ''
|
||
})
|
||
|
||
item = {'price_factors': price_factors}
|
||
if filters:
|
||
item['filters'] = filters
|
||
item['filter_labels'] = filter_labels
|
||
if p.get('formula'):
|
||
item['formula'] = p['formula']
|
||
items.append(item)
|
||
|
||
# 生成可读价格表
|
||
display_lines = [f"【{getattr(r, 'name', '')}】定价:"]
|
||
for item in items:
|
||
for pf in item.get('price_factors', []):
|
||
label = pf.get('label', pf.get('factor', ''))
|
||
up = pf.get('unit_price')
|
||
ul = pf.get('unit_label', '')
|
||
if up is not None:
|
||
display_lines.append(f" - {label}: {up} {ul}")
|
||
if pf.get('tiered'):
|
||
for t in pf['tiered']:
|
||
t_filters = ', '.join(f"{k}={v}" for k, v in t.get('filters', {}).items())
|
||
t_price = t.get('unit_prices', '')
|
||
display_lines.append(f" · {t_filters}: {t_price} {ul}")
|
||
|
||
return {
|
||
'ppid': ppid,
|
||
'name': getattr(r, 'name', ''),
|
||
'pricing_type': pricing_type,
|
||
'items': items,
|
||
'display_text': '\n'.join(display_lines)
|
||
}
|
||
|
||
|
||
async def get_pricing_program_timeing(pptid):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'pricing') as sor:
|
||
recs = await sor.R('pricing_program_timing', {'id': pptid})
|
||
if len(recs) == 0:
|
||
exception(f'{pptid} not found in pricing_program_timing')
|
||
return None
|
||
ppt = recs[0]
|
||
if ppt.pricing_data is None:
|
||
exception(f'{pptid} pricing_data is None in pricing_program_timing')
|
||
return None
|
||
try:
|
||
PricingProgram.ppt_db2app(ppt)
|
||
except Exception as e:
|
||
return None
|
||
return ppt
|
||
|
||
async def test_pricing(pptid, data):
|
||
ppt = get_pricing_program_timeing(pptid)
|
||
prices = PricingProgram.get_pricing_from_ymalstr(data, ppt.pricing_data)
|
||
if prices is None:
|
||
return None
|
||
amount = 0
|
||
for p in prices:
|
||
amount += p.amount
|
||
return amount
|
||
|
||
if __name__ == '__main__':
|
||
MyLogger('Pricing', levelname='info')
|
||
yamlstr = """fields:
|
||
formula:
|
||
label: 计算公式
|
||
type: str
|
||
model:
|
||
label: 模型
|
||
type: str
|
||
model_mappings: # 模型映射
|
||
"doubao-seed-2-0-pro-260215": "doubao-seed-2-0-pro"
|
||
"doubao-seed-2-0-lite-260215": "doubao-seed-2-0-lite"
|
||
"doubao-seed-2-0-mini-260215": "doubao-seed-2-0-mini"
|
||
"doubao-seed-2-0-code-preview-260215": "doubao-seed-2-0-code"
|
||
pricings:
|
||
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-pro
|
||
- formula: 0.6 * float(prompt_tokens) + 3.6 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-lite
|
||
- formula: (0.2 * float(prompt_tokens) + 2 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-mini
|
||
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
|
||
model: doubao-seed-2-0-code
|
||
"""
|
||
config_data = {'completion_tokens': 1416, 'prompt_tokens': 52, 'total_tokens': 1468, 'prompt_tokens_details': {'cached_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 884}, 'model': 'doubao-seed-2-0-pro-260215'}
|
||
print(f'{config_data=}, {yamlstr=}')
|
||
p = PricingProgram.get_pricing_from_ymalstr(config_data, yamlstr)
|
||
print(f'{p=}')
|
||
|