pricing/pricing/pricing.py

1026 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import yaml
from ahserver.serverenv import ServerEnv
from ahserver.filestorage import FileStorage
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, info, MyLogger
from appPublic.timeUtils import curDateString
from appPublic.dictObject import DictObject
from appPublic.jsonConfig import getConfig
from .write_pattern import write_pattern_xlsx, load_xlsx_pricing
import yaml
def _cache_enabled():
"""Check if cache is enabled for pricing module in config.json"""
try:
config = getConfig()
module_cache = config.module_cache
if module_cache is None:
return True
return getattr(module_cache, 'pricing', True)
except Exception:
return True
"""
采用yaml描述定价策略
在pricing_program的pricing_spec表中定义定价的数据字段
遵循一下规格:
字典结构key是字段名需定义字段的type(类型)label(标题)value_mode(值方式)可选options(可选项)(可选)
必须要有一个price字段其type: float
value_mode 有下列可能的取值
between # 定价表中此字段以如此格式给出:
# "小值 ~ 大值", "~"的前面或后面可以加"="
# 表示含小值,或含大值
in # 定价表中此字段值有如下格式值1 值2 ...
= # 缺省,给定单一值
> #
>=
<
<=
例子vidu定价字段
'''
model:
type: str
label: "模型"
options:
- "viduq3-pro"
- "viduq3-turbo"
resolution:
type: str
label: "分辨率"
options:
- "1024p"
- "720p"
- "540p"
duration:
type: int # 建议改为 int 或 strtimes 不是标准类型
label: "时长"
# 如果时长也有选项,需要补全,例如:
# options:
# - 5
# - 10
off_peak:
type: int # 建议改为 int 或 str因为值是 0 和 1
label: "错峰执行"
options:
- off_peak # 正常时段
- normal # 错峰
price:
type: float
label: 单价
'''
pricing_program_timing表中的pricing_data字段的数据是一个只有一个属性"pricings"的字典
其值为定价条目列表每个定价条目是个字典key值为pricing_spec字段定义的字段
pricings:
- resolution: 480p
duration: 4
audio: false
- resolution: 480p
duration: 8
audio: false
- resolution: 480p
duration: 12
audio: false
- resolution: 480p
duration: 4
audio: true
- resolution: 480p
duration: 8
audio: true
- resolution: 480p
duration: 12
audio: true
- resolution: 720p
duration: 4
audio: false
- resolution: 720p
duration: 8
audio: false
- resolution: 720p
duration: 12
audio: false
- resolution: 720p
duration: 4
audio: true
- resolution: 720p
duration: 8
audio: true
- resolution: 720p
duration: 12
audio: true
"""
typefuncs = {
'int': int,
'float': float
}
def typevalue(v, t):
f = typefuncs.get(t)
if not f:
return v
return f(v)
def check_value(field, spec_value, data_value, value_mode=None):
if value_mode is None:
value_mode = field.value_mode
if value_mode == 'between':
arr = spec_value.strip().split()
if len(arr) < 2 or len(arr) > 3:
e = f'{spec_value=} error'
exception(e)
raise Exception(e)
if (arr[0] is None or arr[-1] is None) and field.type == 'str':
e = f'字符串类型between方法的两个值任何一个都不能为空'
exception(e)
raise e
else:
if arr[0] is None:
arr[0] = -float('inf')
if arr[-1] is None:
arr[-1] = float('inf')
arr[0] = float(arr[0])
arr[-1] = float(arr[-1])
fvalue = float(data_value)
if len(arr) == 2 or arr[1] == '=~' :
return arr[0] <= fvalue and fvalue < arr[-1]
if arr[1] == '~':
return arr[0] < fvalue and fvalue < arr[-1]
if arr[1] == '~=':
return arr[0] < fvalue and fvalue <= arr[-1]
if arr[1] == '=~=':
return arr[0] <= fvalue and fvalue <= arr[-1]
e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~= =~='
exception(e)
raise Exception(e)
if value_mode == 'in':
arr = spec_value.strip().split()
arr = [ typevalue(a, field.type) for a in arr ]
# debug(f'{arr=}, {data_value=}')
return data_value in arr
mode = value_mode
if not mode or mode == '=':
mode = '=='
ns = {
"a": data_value,
"b": typevalue(spec_value, field.type)
}
script = f'a {mode} b'
x = eval(script, ns)
return x
def data_mapping(ns, name, v):
if f'{name}_mappings' in ns.keys():
mappings = ns[f'{name}_mappings']
if mappings:
return mappings.get(v, v)
return v
class PricingProgram:
pricing_data = {}
@staticmethod
def on_hot_reload(data=None):
"""Event handler for hot_reload event. Clears pricing cache."""
from appPublic.log import debug
debug(f'[pricing] on_hot_reload called, clearing pricing_data (data={data})')
PricingProgram.pricing_data.clear()
@staticmethod
async def get_pricing_program(ppid):
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program', {'id': ppid})
if recs:
return recs[0]
e = f'pricing_program(id={ppid}) not found'
raise e
e = f'read pricing_program(id={ppid}) failed'
raise e
@staticmethod
async def load_pricing_data(pptid, webpath_xlsx):
env = ServerEnv()
fs = FileStorage()
fp = fs.realPath(webpath_xlsx)
pricings = load_xlsx_pricing(fp)
async with get_sor_context(env, 'pricing') as sor:
ppts = await sor.R('pricing_program_timing', {'id': pptid})
if ppts:
ppt = ppts[0]
pps = await sor.R('pricing_program', {'id': ppt.ppid})
if not pps:
e = f'pricing_program({pptid}) can not find pricing_program'
exception(f)
pp = pps[0]
pp_spec = yaml.safe_load(pp.pricing_spec)
formula = None
if pp_spec.get('fields'):
fields = pp_spec.get('fields')
pricing_formula = pp_spec.get('formula')
else:
fields = pp_spec
newpricings = []
for p in pricings:
np = {}
for k,v in p.items():
for fk, fv in fields.items():
if k == fv['label']:
np.update({fk:v})
newpricings.append(np)
d = {
'fields': fields,
'formula': formula,
'pricings': newpricings
}
debug(f'{d=}')
ppt.pricing_data = yaml.dump(d, allow_unicode=True)
await sor.U('pricing_program_timing', {
'id': ppt.id,
'pricing_data': ppt.pricing_data
})
return True
e = f'pricing_program_timing(id={pptid}) not found'
exception(e)
raise Exception(e)
e = f'pricing_program_timing(id={pptid}) read failed'
exception(e)
raise e
@staticmethod
async def write_pricing_patten(request, ppid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
env = request._run_ns
recs = await sor.R('pricing_program', {'id': ppid})
if not recs:
debug(f'id={ppid} pricing_program not found')
r = recs[0]
x = DictObject(** yaml.safe_load(r.pricing_spec))
fields = x
if x.get('fields'):
fields = x['fields']
fpath = write_pattern_xlsx(r.name, fields)
return fpath
@staticmethod
async def write_pricing_data(request, pptid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
env = request._run_ns
recs = await sor.R('pricing_program_timing', {'id': pptid})
if not recs:
debug(f'id={pptid} pricing_program not found')
r = recs[0]
recs = await sor.R('pricing_program', {'id': r.ppid})
x = DictObject(** yaml.safe_load(r.pricing_data))
fields = x
if x.get('fields'):
fields = x['fields']
data = x.get('pricings')
fpath = write_pattern_xlsx(recs[0].name, fields, data=data)
return fpath
@staticmethod
def pp_db2app(pp):
try:
pp.pricing_spec = yaml.safe_load(pp.pricing_spec)
except Exception as e:
e = f'{pp.pricing_spec}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
def pp_app2db(pp):
try:
pp.pricing_spec = yaml.dump(pp.pricing_spec, allow_unicode=True)
except Exception as e:
e = f'{pp.pricing_spec}:导出到yaml失败'
exception(e)
raise Exception(e)
@staticmethod
def ppt_db2app(ppt):
try:
ppt.pricing_data = yaml.safe_load(ppt.pricing_data)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
def ppt_app2db(ppt):
try:
ppt.pricing_data = yaml.dump(ppt.pricing_data, allow_unicode=True)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
@staticmethod
async def _invalidate_ppid_cache(ppid):
"""清除指定 ppid 的所有缓存条目。"""
dates = PricingProgram.pricing_data.get(ppid, [])
for d in dates:
k = f'{ppid}.{d}'
if k in PricingProgram.pricing_data:
del PricingProgram.pricing_data[k]
if ppid in PricingProgram.pricing_data:
del PricingProgram.pricing_data[ppid]
@staticmethod
def _extract_ppid_from_timing_event(data):
"""从 pricing_program_timing 事件 payload 中直接提取 ppid。"""
if not data:
return None
ppid = data.get('ppid')
if ppid:
return ppid
old_data = data.get('old') or data.get('old_row') or data.get('old_data')
if isinstance(old_data, dict):
return old_data.get('ppid')
return None
@staticmethod
async def _get_ppid_by_timing_id(pptid):
"""通过 pricing_program_timing.id 反查 ppid。删除后记录不存在时返回 None。"""
if not pptid:
return None
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program_timing', {'id': pptid})
if not recs:
return None
return recs[0].ppid
@staticmethod
async def _resolve_timing_event_ppid(data, allow_lookup=True):
"""解析 pricing_program_timing 事件影响的 ppid。
SQLor 的 C/U/D 事件 payload 是调用 sor.C/U/D 时传入的 ns
不保证包含完整行。常见 U/D payload 只有主键 id 和修改字段;
因此需要在记录仍存在时按 id 反查 ppid。删除事件发生在 delete
之后,若 d:after 仅有 id则数据库已无法反查只能依赖 d:before
预缓存或 payload 中 old/ppid 兜底。
"""
ppid = PricingProgram._extract_ppid_from_timing_event(data)
if ppid:
return ppid
pptid = data.get('id') if data else None
if allow_lookup and pptid:
ppid = await PricingProgram._get_ppid_by_timing_id(pptid)
if ppid:
return ppid
if pptid:
return PricingProgram.pricing_data.get(f'timing:{pptid}:ppid')
return None
@staticmethod
async def precache_timing_delete(data):
"""删除前预缓存 pricing_program_timing.id -> ppid供 d:after 使用。"""
pptid = data.get('id') if data else None
if not pptid:
exception(f'id not found in timing delete before event data: {data}')
return
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=True)
if not ppid:
exception(f'ppid not found in timing delete before event data: {data}')
return
PricingProgram.pricing_data[f'timing:{pptid}:ppid'] = ppid
debug(f'--EventHandle timing delete before: id={pptid}, ppid={ppid}')
@staticmethod
async def _reload_timing_pp_data(data, action):
ppid = await PricingProgram._resolve_timing_event_ppid(data, allow_lookup=(action != 'delete'))
if not ppid:
exception(f'ppid not found in timing {action} event data: {data}')
return
debug(f'--EventHandle timing {action}: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'timing {action}: get_ppid_pricing failed for ppid={ppid}: {e}')
finally:
if action == 'delete' and data and data.get('id'):
PricingProgram.pricing_data.pop(f'timing:{data.get("id")}:ppid', None)
@staticmethod
async def reload_pp_data(ppt):
await PricingProgram._reload_timing_pp_data(ppt, 'update')
@staticmethod
async def reload_pricing_program(data):
"""处理 pricing_program 表的增删改事件,刷新该程序的全部缓存。"""
ppid = data.get('id')
if not ppid:
exception(f'ppid (id) not found in pricing_program event data: {data}')
return
debug(f'--EventHandle pricing_program c/u/d: {data}')
await PricingProgram._invalidate_ppid_cache(ppid)
try:
await PricingProgram.get_ppid_pricing(ppid)
except Exception as e:
debug(f'reload_pricing_program: get_ppid_pricing failed for ppid={ppid}: {e}')
@staticmethod
async def on_timing_create(data):
"""处理 pricing_program_timing 新增事件。"""
await PricingProgram._reload_timing_pp_data(data, 'create')
@staticmethod
async def on_timing_delete(data):
"""处理 pricing_program_timing 删除事件。"""
await PricingProgram._reload_timing_pp_data(data, 'delete')
@staticmethod
async def get_ppid_pricing(ppid):
dat = curDateString()
k = f'{ppid}.{dat}'
if _cache_enabled():
d = PricingProgram.pricing_data.get(k)
if d:
return d
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
sql = """select a.name, a.ownerid, a.providerid,
pricing_belong, discount, b.pricing_data
from pricing_program a, pricing_program_timing b
where a.id = b.ppid
and a.id = ${ppid}$
and b.enabled_date <= ${biz_date}$
and b.expired_date > ${biz_date}$
order by b.enabled_date desc"""
recs = await sor.sqlExe(sql, {
'ppid': ppid,
'biz_date': dat
})
if len(recs) == 0:
e = Exception(f'{ppid=},{dat=} data not found')
exception(f'{e}')
raise e
d = recs[0]
if _cache_enabled():
PricingProgram.pricing_data[k] = d
dates = PricingProgram.pricing_data.get(ppid, [])
dates.append(dat)
dates_cnt = len(dates)
if dates_cnt > 2:
for i in range(dates_cnt - 2):
dk = f'{ppid}.{dates[i]}'
PricingProgram.pricing_data[dk]
dates = dates[-2:]
PricingProgram.pricing_data[ppid] = dates
return d
async def buffered_charging(ppid, data):
r = await PricingProgram.get_ppid_pricing(ppid)
prices = PricingProgram.get_pricing_from_ymalstr(data,
r.pricing_data)
amt = 0.0
discount = max(0.0, min(1.0, r.discount)) if r.discount is not None else 1.0
for p in prices:
p.cost = p.amount * discount
return prices
async def charging(sor, ppid, data):
if ppid is None:
e = Exception(f'ppid is None, {data=}')
exception(f'{e}')
raise e
if data is None:
e = Exception(f'{ppid=} data is None')
exception(f'{e}')
raise e
env = ServerEnv()
biz_date = await env.get_business_date(sor)
sql = """select a.name, a.ownerid, a.providerid,
pricing_belong, discount, b.pricing_data
from pricing_program a, pricing_program_timing b
where a.id = b.ppid
and a.id = ${ppid}$
and b.enabled_date <= ${biz_date}$
and b.expired_date > ${biz_date}$
order by b.enabled_date desc"""
recs = await sor.sqlExe(sql, {
'ppid': ppid,
'biz_date': biz_date
})
if recs:
r = recs[0]
r.prices = PricingProgram.get_pricing_from_ymalstr(data,
r.pricing_data)
debug(f'{r.prices=}')
amt = 0.0
discount = max(0.0, min(1.0, r.discount)) if r.discount is not None else 1.0
for p in r.prices:
p.cost = p.amount * discount
return r.prices
@staticmethod
def get_pricing_from_ymalstr(config_data, yamlstr):
"""
解析定价YAML并计算费用。
支持两种格式:
1. 旧格式formula 字段eval计算
2. 新格式price_factors + unit_prices + unit自动计算
支持 derived 字段:在 fields 中定义 derived 表达式,从原始 usage 数据计算衍生字段
例如uncached_prompt_tokens.derived = "prompt_tokens - prompt_tokens_details.cached_tokens"
"""
if config_data is None:
e = Exception(f'config_data is None, {yamlstr=}')
exception(f'{e=}')
raise e
# 用 DictObject 包装,支持 dot notation 属性访问derived 表达式依赖此特性)
config_data = DictObject(**config_data)
d = None
try:
d = yaml.safe_load(yamlstr)
except Exception as e:
exception(f'yaml.sage_load({yamlstr}) error: {e}')
raise e
d = DictObject(**d)
if not d.fields:
exception(f'{d} has not "fields"')
raise Exception(f'定价定义中没有fields数据')
if not d.pricings:
exception(f'{d} has not "pricings"')
raise Exception(f'定价定义中没有pricing数据')
# 处理 derived 字段:从原始 usage 数据计算衍生字段
# DictObject 支持属性访问嵌套字段,如 prompt_tokens_details.cached_tokens
for field_name, field_def in d.fields.items():
if not isinstance(field_def, dict):
continue
derived_expr = field_def.get('derived')
if not derived_expr:
continue
# eval 环境:用 config_data 本身DictObject 支持 dot notation 属性访问)
eval_env = dict(config_data)
# 将顶层 key 也注入 eval 环境,使 eval 能找到嵌套属性
for k in list(config_data.keys()):
eval_env[k] = config_data[k]
try:
result = eval(derived_expr, {}, eval_env)
config_data[field_name] = result
debug(f'derived field {field_name} = {derived_expr} = {result}')
except Exception as e:
debug(f'derived field {field_name} evaluation failed: {derived_expr}, error: {e}')
config_data[field_name] = 0
# 单位映射表
unit_values = d.get('unit_values', {'百万': 1000000, '': 1, '': 1000, '': 1, '': 1, '毫秒': 0.001, '元/百万tokens': 1000000, '元/total_tokens': 1, '元/times': 1})
ret_items = []
for i, p in enumerate(d.pricings):
# 跳过需要人工审核的记录
if p.get('_NEEDS_MANUAL_REVIEW'):
debug(f'跳过需要人工审核的定价项: {i}')
continue
# 判断是旧格式还是新格式
# 新格式要求 price_factors 是标量(str)、unit_prices 是标量(number)
# 生产数据中 price_factors 可能是 list、unit_prices 可能是 dict此时走旧格式 formula
_raw_pf = p.get('price_factors')
_raw_up = p.get('unit_prices')
is_new_format = (_raw_pf is not None and _raw_up is not None
and not isinstance(_raw_pf, list) and not isinstance(_raw_up, dict))
is_old_format = p.get('formula') is not None
if not is_new_format and not is_old_format:
debug(f'无公式也无price_factors{p=}')
continue
p_ok = True
ns = DictObject(**config_data)
# 检查过滤条件(排除定价计算字段)
skip_keys = {'formula', 'price_factors', 'unit_prices', 'unit', 'min_amount', 'filters', 'pricing_type'}
for k, spec_value in p.items():
if spec_value is None:
continue
if k in skip_keys:
continue
f = d.fields.get(k)
if not f:
e = f'定价项({i})中的{k}在fields中没有定义'
exception(f'{e}')
raise Exception(e)
data_value = config_data.get(k)
data_value = data_mapping(d, k, data_value)
if data_value is None:
if 'default' in f.keys():
data_value = f['default']
else:
e = f'数据({config_data})没有({k})数据'
exception(e)
raise Exception(e)
try:
flg = check_value(f, spec_value, data_value)
if not flg:
p_ok = False
break
except Exception as e:
msg = f'{p=},{f}: {spec_value=}, {data_value=}'
exception(f'{e}:{msg}')
break
# 检查 filters 区间条件新格式AND逻辑所有filter_item都要匹配
# 带 unit_prices 的 filter_item 是 tiered 定价,跳过(由后面第二块处理)
if p_ok and is_new_format and 'filters' in p:
for filter_item in p['filters']:
if 'unit_prices' in filter_item:
continue # tiered定价项不在此处检查
item_ok = True
item_value_mode = filter_item.get('value_mode')
for fk, fv in filter_item.items():
if fk in ('unit_prices', 'value_mode'):
continue
f = d.fields.get(fk)
if not f:
continue
data_value = config_data.get(fk)
data_value = data_mapping(d, fk, data_value)
if data_value is None:
continue # 数据中没有该键,视为匹配
try:
flg = check_value(f, fv, data_value, item_value_mode)
if not flg:
item_ok = False
break
except Exception as e:
debug(f'filter check error: {e}')
item_ok = False
break
if not item_ok:
p_ok = False
break
if not p_ok:
info(f'{config_data=}, {p=}, mismatched')
continue
np = p.copy()
np.data = config_data
if is_new_format:
# 新格式price_factors + unit_prices + unit
factor_name = p['price_factors']
unit_price = p['unit_prices']
unit_str = p.get('unit', '')
# 处理 filters 中的区间定价(查找匹配的 unit_prices
if 'filters' in p:
for filter_item in p['filters']:
item_value_mode = filter_item.get('value_mode')
for fk, fv in filter_item.items():
if fk in ('unit_prices', 'value_mode'):
continue
f = d.fields.get(fk)
if not f:
continue
data_value = config_data.get(fk)
data_value = data_mapping(d, fk, data_value)
if data_value is None:
continue
try:
flg = check_value(f, fv, data_value, item_value_mode)
if flg and 'unit_prices' in filter_item:
unit_price = filter_item['unit_prices']
except:
pass
# 获取 usage 值
if factor_name == 'flat':
# 固定费用
usage_value = 1
else:
usage_value = config_data.get(factor_name)
if usage_value is None:
debug(f'新格式config_data中缺少{factor_name}')
continue
usage_value = float(usage_value)
# 获取单位值
unit_val = unit_values.get(unit_str, 1)
if isinstance(unit_val, str):
unit_val = float(unit_val)
# 计算金额
amount = unit_price * usage_value / unit_val
# 应用 min_amount
min_amount = p.get('min_amount', 0)
if min_amount and amount < min_amount:
amount = min_amount
np.amount = amount
ret_items.append(np)
elif is_old_format:
# 旧格式formula
formula = p.formula
debug(f'{formula=}, {ns=}, {p=}, {d.fields=}')
env_data = DictObject(config_data)
np.amount = eval(formula, env_data)
ret_items.append(np)
if len(ret_items) == 0:
e = f'{config_data=}{yamlstr=}没有找到合适的定价'
exception(e)
raise Exception(e)
return ret_items
def generate_formula_from_factors(price_factors):
"""从 price_factors 数组自动生成 formula 字符串。
price_factors 格式:
[
{"factor": "prompt_tokens", "unit_price": 3.2, "unit_label": "元/百万Token"},
{"factor": "completion_tokens", "unit_price": 16.0, "unit_label": "元/百万Token"}
]
返回: "(3.2 * float(prompt_tokens) + 16.0 * float(completion_tokens)) / 1000000.0"
"""
if not price_factors:
return None
parts = []
has_million = False
for f in price_factors:
factor = f.get('factor', '')
unit_price = f.get('unit_price', 0)
unit_label = f.get('unit_label', '')
# 判断单位是否是"百万"级别(元/百万Token 等)
if '百万' in unit_label:
has_million = True
parts.append(f'{unit_price} * float({factor})')
formula = ' + '.join(parts)
if has_million:
formula = f'({formula}) / 1000000.0'
return formula
async def get_pricing_display(ppid):
"""获取定价项目的可读定价数据,供客户/前端展示。
返回结构:
{
"ppid": "...",
"name": "...",
"pricing_type": "per_use",
"items": [
{
"filters": {"model": "doubao-seed-2-0-pro"},
"filter_labels": {"模型": "doubao-seed-2-0-pro"},
"price_factors": [
{
"factor": "prompt_tokens",
"label": "输入Token",
"unit_price": 0.000006,
"unit": "百万",
"unit_label": "元/百万Token"
}
],
"formula": "...",
"min_amount": 0.01
}
]
}
"""
try:
r = await PricingProgram.get_ppid_pricing(ppid)
except Exception:
return None
pricing_data_str = r.pricing_data
d = yaml.safe_load(pricing_data_str)
if not d:
raise Exception(f'{ppid} pricing_data 为空')
fields = d.get('fields', {})
pricings = d.get('pricings', [])
pricing_type = d.get('pricing_type', 'per_use')
unit_values = d.get('unit_values', {'百万': 1000000, '': 1, '': 1000, '': 1, '': 1, '毫秒': 0.001, '元/百万tokens': 1000000, '元/total_tokens': 1, '元/times': 1})
items = []
for p in pricings:
# 跳过需要人工审核的记录
if p.get('_NEEDS_MANUAL_REVIEW'):
continue
# 提取过滤条件role=filter 的字段)
filters = {}
filter_labels = {}
skip_keys = {'formula', 'price_factors', 'unit_prices', 'unit', 'filters', 'min_amount', 'pricing_type', 'name'}
# 先从 p.items() 中提取(直接字段)
for k, v in p.items():
if k in skip_keys:
continue
fdef = fields.get(k, {})
role = fdef.get('role', 'filter') if isinstance(fdef, dict) else 'filter'
if role == 'filter':
filters[k] = v
label = fdef.get('label', k) if isinstance(fdef, dict) else k
filter_labels[label] = v
# 再从 p['filters'] 列表中提取(如果存在)
raw_filters = p.get('filters')
if isinstance(raw_filters, list):
for fi in raw_filters:
if isinstance(fi, dict):
for k, v in fi.items():
fdef = fields.get(k, {})
filters[k] = v
label = fdef.get('label', k) if isinstance(fdef, dict) else k
filter_labels[label] = v
# 新格式price_factors(scalar) + unit_prices(scalar) + unit
# 注意:生产数据中 price_factors 可能是 list、unit_prices 可能是 dict混合格式
# 此时应走旧格式 formula 路径
_raw_pf = p.get('price_factors')
_raw_up = p.get('unit_prices')
is_new_format = (_raw_pf is not None and _raw_up is not None
and not isinstance(_raw_pf, list) and not isinstance(_raw_up, dict))
if is_new_format:
factor_name = _raw_pf
unit_price = _raw_up
unit_str = p.get('unit', '')
# 获取 factor label
fdef = fields.get(factor_name, {})
factor_label = fdef.get('label', factor_name) if isinstance(fdef, dict) else factor_name
# 新格式 unit_prices 已是展示价(如 6.0 元/百万),无需再乘 unit_val
display_price = unit_price
# 构建 unit_label (元/单位)
unit_label = f"元/{unit_str}" if unit_str else ""
price_factors_display = [{
'factor': factor_name,
'label': factor_label,
'unit_price': display_price,
'unit': unit_str,
'unit_label': unit_label
}]
# 处理 filters: 提取适用条件(model)到item级区间条件放tiered
# 注意filters 已在上方提取(支持 dict 和 list 两种格式)
# tiered 仅用于价格不同的阶梯定价
tiered_pricing = []
if isinstance(p.get('filters'), list):
for fi in p['filters']:
if not isinstance(fi, dict):
continue
raw_tier_price = fi.get('unit_prices')
if raw_tier_price is None or raw_tier_price == unit_price:
continue
fi_copy = {k: v for k, v in fi.items()
if k not in ('unit_prices', 'value_mode')
and not k.endswith('_tokens')}
if fi_copy:
tiered_pricing.append({
'filters': fi_copy,
'unit_prices': raw_tier_price
})
item = {'price_factors': price_factors_display}
if filters:
item['filters'] = filters
item['filter_labels'] = filter_labels
min_amount = p.get('min_amount', 0)
if min_amount:
item['min_amount'] = min_amount
if tiered_pricing:
item['price_factors'][0]['tiered'] = tiered_pricing
items.append(item)
else:
# 旧格式formula
price_factors = p.get('price_factors', None)
if not price_factors:
# fallback: 从 fields 中构建展示信息
price_factors = []
for k, fdef in fields.items():
if not isinstance(fdef, dict):
continue
role = fdef.get('role', 'filter')
if role == 'factor' or fdef.get('type') == 'float':
label = fdef.get('label', k)
price_factors.append({
'factor': k,
'label': label,
'unit_price': None,
'unit_label': ''
})
item = {'price_factors': price_factors}
if filters:
item['filters'] = filters
item['filter_labels'] = filter_labels
if p.get('formula'):
item['formula'] = p['formula']
items.append(item)
# 生成可读价格表
display_lines = [f"{getattr(r, 'name', '')}】定价:"]
for item in items:
# 构建过滤条件文本
filter_text = ''
if item.get('filter_labels'):
filter_parts = [f"{k}={v}" for k, v in item['filter_labels'].items()]
filter_text = f" [{', '.join(filter_parts)}]"
for pf in item.get('price_factors', []):
label = pf.get('label', pf.get('factor', ''))
up = pf.get('unit_price')
ul = pf.get('unit_label', '')
if up is not None:
display_lines.append(f" - {label}: {up} {ul}{filter_text}")
if pf.get('tiered'):
for t in pf['tiered']:
t_filters = ', '.join(f"{k}={v}" for k, v in t.get('filters', {}).items())
t_price = t.get('unit_prices', '')
display_lines.append(f" · {t_filters}: {t_price} {ul}")
return {
'ppid': ppid,
'name': getattr(r, 'name', ''),
'pricing_type': pricing_type,
'items': items,
'display_text': '\n'.join(display_lines)
}
async def get_pricing_program_timeing(pptid):
env = ServerEnv()
async with get_sor_context(env, 'pricing') as sor:
recs = await sor.R('pricing_program_timing', {'id': pptid})
if len(recs) == 0:
exception(f'{pptid} not found in pricing_program_timing')
return None
ppt = recs[0]
if ppt.pricing_data is None:
exception(f'{pptid} pricing_data is None in pricing_program_timing')
return None
try:
PricingProgram.ppt_db2app(ppt)
except Exception as e:
return None
return ppt
async def test_pricing(pptid, data):
ppt = await get_pricing_program_timeing(pptid)
# ppt.pricing_data 已被 ppt_db2app 解析为 dict需要转回 YAML 字符串
yamlstr = yaml.dump(ppt.pricing_data, allow_unicode=True) if isinstance(ppt.pricing_data, dict) else ppt.pricing_data
prices = PricingProgram.get_pricing_from_ymalstr(data, yamlstr)
if prices is None:
return None
amount = 0
for p in prices:
amount += p.amount
return amount
if __name__ == '__main__':
MyLogger('Pricing', levelname='info')
yamlstr = """fields:
formula:
label: 计算公式
type: str
model:
label: 模型
type: str
model_mappings: # 模型映射
"doubao-seed-2-0-pro-260215": "doubao-seed-2-0-pro"
"doubao-seed-2-0-lite-260215": "doubao-seed-2-0-lite"
"doubao-seed-2-0-mini-260215": "doubao-seed-2-0-mini"
"doubao-seed-2-0-code-preview-260215": "doubao-seed-2-0-code"
pricings:
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-pro
- formula: 0.6 * float(prompt_tokens) + 3.6 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-lite
- formula: (0.2 * float(prompt_tokens) + 2 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-mini
- formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0
model: doubao-seed-2-0-code
"""
config_data = {'completion_tokens': 1416, 'prompt_tokens': 52, 'total_tokens': 1468, 'prompt_tokens_details': {'cached_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 884}, 'model': 'doubao-seed-2-0-pro-260215'}
print(f'{config_data=}, {yamlstr=}')
p = PricingProgram.get_pricing_from_ymalstr(config_data, yamlstr)
print(f'{p=}')