This commit is contained in:
yumoqing 2026-03-18 18:30:21 +08:00
parent 1c962dbd9b
commit acad0f3d4d
6 changed files with 234 additions and 175 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,186 +1,245 @@
import json import json
import yaml
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
from sqlor.dbpools import DBPools, get_sor_context from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception from appPublic.log import debug, exception
from appPublic.dictObject import DictObject
import yaml
"""
采用yaml描述定价策略
一下是一个例子
pricing_unit: 1000
- id: input_token_pricing_1
field: prompt_tokens
between: 0 ~= 1000
- id: input_token_pricing_2
- cnt_field: prompt_tokens
between: 1000 ~= 100000
- id: input_token_pricing_2
- cnt_field: prompt_tokens
between: 100000 ~=
- id: output_pricing
- cnt_field: completion_tokens
视频定价
fields:
resolution:
type: str
label: 分辨率
value_mode:
- between
- in
- =
- >
- >=
- <
- <=
duration:
type: int
label: 时长
audio:
type: boolean
label: 音频
pricings:
- resolution: 480p
duration: 4
audio: false
- resolution: 480p
duration: 8
audio: false
- resolution: 480p
duration: 12
audio: false
- resolution: 480p
duration: 4
audio: true
- resolution: 480p
duration: 8
audio: true
- resolution: 480p
duration: 12
audio: true
- resolution: 720p
duration: 4
audio: false
- resolution: 720p
duration: 8
audio: false
- resolution: 720p
duration: 12
audio: false
- resolution: 720p
duration: 4
audio: true
- resolution: 720p
duration: 8
audio: true
- resolution: 720p
duration: 12
audio: true
"""
"""
typefuncs = {
'int': int,
'float': float
}
def typevalue(v, t):
if not v:
return None
f = typefuncs.get(t)
if not f:
return v
return f(v)
def check_value(field, spec_value, data_value):
if field.value_mode == 'between':
arr = spec_value.split(' ')
if len(arr) < 2 or len(arr) > 3:
e = f'{spec_value=} error'
exception(e)
raise Exception(e)
if (arr[0] is None or arr[-1] is None) and field.type == 'str':
e = f'字符串类型between方法的两个值任何一个都不能为空')
exception(e)
raise e
else:
if arr[0] is None:
arr[0] = -float('inf')
if arr[-1] is None:
arr[-1] = float('inf')
if len(arr) == 2 or arr[1] == '=~' :
return arr[0] <= data_value and data_value < arr[-1]
if arr[1] == '~':
return arr[0] < data_value and data_value < arr[-1]
if arr[1] == '~=':
return arr[0] < data_value and data_value <= arr[-1]
e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~='
exception(e)
raise Exception(e)
if field.value_mode == 'in':
arr = spec_value.split(' ')
arr = [ typevalue(a, field.type) for a in arr ]
return data_value in arr
mode = field.value_mode
if not mode:
mode = '='
script = f'{data_value} {mode} {typevalue(spec_value, field.type)}'
x = eval(scrpt)
return x
class PricingProgram: class PricingProgram:
def __init__(self, ppid, sor): def parse_pricing_spec(self, yamlstr):
self.ppid = ppid d = yaml.safe_load(yamlstr)
self.sor = sor assert isinstance(d, list)
return d
async def add_pricing_program(self, sor,
name, ownerid,
description, pricing_spec, id=None):
env = ServerEnv()
if not id:
id = env.uuid()
yamlstr = yaml.dump(pricing_spec)
await sor.C('pricing_program', {
'id': id,
'name': name,
'ownerid': ownerid,
'description': description,
'pricing_spec': ymalstr
})
async def init(self): async def pp_db2app(self, pp):
await self.get_program() try:
await self.get_pricing_type() pp.pricing_spec = yaml.safe_load(pp.pricing_spec)
except Exception as e:
e = f'{pp.pricing_spec}:yaml数据格式错误'
exception(e)
raise Exception(e)
async def get_program(self): async def pp_app2db(self, pp):
recs = await self.sor.R('pricing_program', {'id': self.ppid}) try:
if len(recs): pp.pricing_spec = yaml.dump(pp.pricing_spec)
self.__dict__.update(recs[0]) except Exception as e:
e = f'{pp.pricing_spec}:导出到yaml失败'
exception(e)
raise Exception(e)
async def ppt_db2app(self, ppt):
try:
ppt.pricing_data = yaml.safe_load(ppt.pricing_data)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
async def get_pricing_type(self): async def ppt_app2db(self, ppt):
self.pricing_type = await self.sor.R('pricing_type', try:
{'id': self.ptid}) ppt.pricing_data = yaml.dump(ppt.pricing_data)
except Exception as e:
e = f'{ppt.pricing_data}:yaml数据格式错误'
exception(e)
raise Exception(e)
async def get_items(self, biz_date=None): def pricing(self, config_data, yamlstr):
if biz_date is None: """
env = ServerEnv() yamlstr是从
biz_date = await env.get_business_date(self.sor) d = None
ppts = await self.get_program_timing(biz_date) try:
if len(ppts) < 1: d = yaml.safe_load(yamlstr)
return None except Exception as e:
ppt = ppts[0] exception(f'yaml.sage_load({yamlstr}) error: {e}')
recs = await self.sor.R('pricing_item', {'pptid': ppt.id}) raise e
return recs if not d.fields:
exception(f'{d} has not "fields"')
async def get_program_timing(self, biz_date): raise Exception(f'定价定义中没有fields数据')
sql = """select * from pricing_program_timing if not d.pricing:
where enabled_date >= ${biz_date}$ exception(f'{d} has not "pricing"')
and expired_date < ${biz_date}$ raise Exception(f'定价定义中没有pricing数据')
and ppid = ${ppid}$ ret_items = []
""" for i, p in enumerate(d.pricings):
return await self.sor.sqlExe(sql, {'ppid':self.ppid, p_ok = True
'biz_date': biz_date}) times = 1
unit = 1
async def get_specs(self): for k,spec_value in p.items():
recs = await self.sor.R('pricing_spec', {'ptid': self.ptid}) f = d.fields.get(k)
return recs if not f:
e = f'定价项({i})中的{k}在fields中没有定义'
async def get_spec_by_id(self, psid): exception(f'{e}')
recs = await self.sor.R('pricing_spec', {'id': psid}) raise Exception(e)
if len(recs) > 0: data_value = config_data.get(k)
return recs[0] if not value:
return None e = f'数据({config_data})没有({k})数据'
exception(e)
async def get_specs(sor, psid): raise Exeption(e)
sql = """select a.* from pricing_spec a where a.id=${psid}$""" if f.type == 'unit':
recs = await sor.sqlExe(sql, {'psid': psid}) unit = spec_value
if len(recs)>0: elif f.type == 'times':
return recs[0] times = data_value
return None else:
f = check_value(f, spec_value, data_value)
async def sor_get_program_items(sor, ppid, biz_date): if not f:
sql = """select b.* from pricing_program_timing a, pricing_item b # 条件不满足
where b.pptid = a.id p_ok = False
and a.ppid = ${ppid}$
and a.enabled_date <= ${biz_date}$
and a.expired_date > ${biz_date}$
"""
recs = await sor.sqlExe(sql, {'ppid': ppid, 'biz_date': biz_date})
return recs
async def get_pricing_specs_by_pptid(pptid):
env = ServerEnv()
dbname = env.get_module_dbname('pricing')
db = DBPools()
async with db.sqlorContext(dbname) as sor:
sql = """select d.*
from pricing_program_timing a, pricing_program b, pricing_type c, pricing_spec d
where a.ppid = b.id
and b.ptid = c.id
and d.ptid = c.id
and a.id = ${pptid}$"""
recs = await sor.sqlExe(sql, {'pptid': pptid})
return recs
return []
async def get_remote_pricing(sor, charge, data):
env = ServerEnv()
get_callerid = env.get_callerid
userid = await get_callerid(self.ownerid)
uapi = UAPI()
ret = await uapi.call(charge.upappid, charge.apiname,
userid, params=data)
d = json.loads(ret.decode('utf-8'))
return d
async def pricing_program_charging(sor, pricing_program_id, data):
env = ServerEnv()
if not data.get('biz_date'):
biz_date = await env.get_business_date(sor)
data['biz_date'] = biz_date
debug(f'{pricing_program_id=}, {data=}')
pp_items = await sor_get_program_items(sor, pricing_program_id, data['biz_date'])
charges = []
debug(f'{pp_items=}, {data["biz_date"]=}')
for item in pp_items:
charge = item.copy()
spec = await get_specs(sor, charge.psid)
if spec.pricing_spec_mode == 'spec_amount':
if item.spec_value:
d = json.loads(item.spec_value)
matchs = True
for k,v in d.items():
if v != item['k']:
matchs = False
break break
if not matchs: if p_ok:
continue if not p.price:
e = f'{p} 没有价格属性')
cnt = data.get(spec.count_name, 1) exception(e)
if charge.pricing_unit is None or charge.pricing_unit < 1: raise Exception(e)
charge.pricing_unit = 1 np = p.copy()
cost_amount = charge.cost_amount or 0.00 np.amount = p.price * float(times) / float(unit)
charge.amount = cnt * charge.pricing_amount / charge.pricing_unit ret_items.append(np)
charge.cost = cnt * cost_amount / charge.pricing_unit if len(ret_items) == 0:
charges.append(charge) e = f'{config_data=}{yamlstr=}没有找到合适的定价‘
elif spec.pricing_spec_mode == 'remote_pricing': exception(e)
charge.amount = await get_remote_pricing(sor, charge, params=d) raise Exception(e)
charges.append(charge) return ret_items
elif spec.pricing_spec_mode == 'spec_subtype':
sub_charges = await pricing_program_charging(sor,
charge.subppid, d)
charges += sub_charges
return charges
async def sor_get_spec_fields(sor, piid):
sql = "select a.spec_names from pricing_spec a, pricing_item b where a.id = b.psid and b.id = ${piid}$"
recs = await sor.sqlExe(sql, {'piid': piid})
if len(recs) < 1:
return []
b = recs[0].spec_names
if b is None:
return []
return json.loads(b)
async def get_spec_names_fields(request, piid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
fields = await sor_get_spec_fields(sor, piid)
if len(fields) < 1:
return ''
x = ''
for f in fields:
x += ','+ json.dumps(f)
return x
return ''
async def get_spec_names_fields_names(request, piid):
async with get_sor_context(request._run_ns, 'pricing') as sor:
fs = await sor_get_spec_fields(sor, piid)
return [f['name'] for f in fs]
async def get_all_spec_fields_by_pptid(request, pptid):
env = request._run_ns
async with get_sor_context(env, 'pricing') as sor:
sql = """select a.*
from pricing_spec a,
pricing_type b,
pricing_program c,
pricing_program_timing d
where
b.id=a.ptid
and b.id = c.ptid
and c.id = d.ppid
and d.id = ${pptid}$"""
recs = await sor.sqlExe(sql, {'pptid': pptid})
ret = ''
for r in recs:
if r.spec_names:
x = {
"name":r.id,
"uitype":"group",
"nouse": True
}
x['fields'] = json.loads(r.spec_names)
ret += ','+ json.dumps(x, ensure_ascii=False, indent=4)
return ret