import json import yaml from ahserver.serverenv import ServerEnv from ahserver.filestorage import FileStorage from sqlor.dbpools import DBPools, get_sor_context from appPublic.log import debug, exception, info, MyLogger from appPublic.timeUtils import curDateString from appPublic.dictObject import DictObject from .write_pattern import write_pattern_xlsx, load_xlsx_pricing import yaml """ 采用yaml描述定价策略, 在pricing_program的pricing_spec表中定义定价的数据字段 遵循一下规格: 字典结构,key是字段名,需定义字段的type(类型),label(标题),value_mode(值方式)(可选),options(可选项)(可选) 必须要有一个price字段,其type: float value_mode 有下列可能的取值 between # 定价表中此字段以如此格式给出: # "小值 ~ 大值", "~"的前面或后面可以加"=", # 表示含小值,或含大值 in # 定价表中此字段值有如下格式:值1 值2 ... = # 缺省,给定单一值 > # >= < <= 例子(vidu定价字段): ''' model: type: str label: "模型" options: - "viduq3-pro" - "viduq3-turbo" resolution: type: str label: "分辨率" options: - "1024p" - "720p" - "540p" duration: type: int # 建议改为 int 或 str,times 不是标准类型 label: "时长" # 如果时长也有选项,需要补全,例如: # options: # - 5 # - 10 off_peak: type: int # 建议改为 int 或 str,因为值是 0 和 1 label: "错峰执行" options: - off_peak # 正常时段 - normal # 错峰 price: type: float label: 单价 ''' pricing_program_timing表中的pricing_data字段的数据是一个只有一个属性"pricings"的字典 其值为定价条目列表,每个定价条目是个字典,key值为pricing_spec字段定义的字段, pricings: - resolution: 480p duration: 4 audio: false - resolution: 480p duration: 8 audio: false - resolution: 480p duration: 12 audio: false - resolution: 480p duration: 4 audio: true - resolution: 480p duration: 8 audio: true - resolution: 480p duration: 12 audio: true - resolution: 720p duration: 4 audio: false - resolution: 720p duration: 8 audio: false - resolution: 720p duration: 12 audio: false - resolution: 720p duration: 4 audio: true - resolution: 720p duration: 8 audio: true - resolution: 720p duration: 12 audio: true """ typefuncs = { 'int': int, 'float': float } def typevalue(v, t): f = typefuncs.get(t) if not f: return v return f(v) def check_value(field, spec_value, data_value): if field.value_mode == 'between': arr = spec_value.strip().split() if len(arr) < 2 or len(arr) > 3: e = f'{spec_value=} error' exception(e) raise Exception(e) if (arr[0] is None or arr[-1] is None) and field.type == 'str': e = f'字符串类型between方法的两个值任何一个都不能为空' exception(e) raise e else: if arr[0] is None: arr[0] = -float('inf') if arr[-1] is None: arr[-1] = float('inf') arr[0] = float(arr[0]) arr[-1] = float(arr[-1]) fvalue = float(data_value) if len(arr) == 2 or arr[1] == '=~' : return arr[0] <= fvalue and fvalue < arr[-1] if arr[1] == '~': return arr[0] < fvalue and fvalue < arr[-1] if arr[1] == '~=': return arr[0] < fvalue and fvalue <= arr[-1] e = f'{arr[1]}不认识的期间逻辑,只支持:~ =~ ~=' exception(e) raise Exception(e) if field.value_mode == 'in': arr = spec_value.strip().split() arr = [ typevalue(a, field.type) for a in arr ] # debug(f'{arr=}, {data_value=}') return data_value in arr mode = field.value_mode if not mode or mode == '=': mode = '==' ns = { "a": data_value, "b": typevalue(spec_value, field.type) } script = f'a {mode} b' x = eval(script, ns) return x def data_mapping(ns, name, v): if f'{name}_mappings' in ns.keys(): mappings = ns[f'{name}_mappings'] if mappings: return mappings.get(v, v) return v class PricingProgram: pricing_data = {} @staticmethod async def get_pricing_program(ppid): env = ServerEnv() async with get_sor_context(env, 'pricing') as sor: recs = await sor.R('pricing_program', {'id': ppid}) if recs: return recs[0] e = f'pricing_program(id={ppid}) not found' raise e e = f'read pricing_program(id={ppid}) failed' raise e @staticmethod async def load_pricing_data(pptid, webpath_xlsx): env = ServerEnv() fs = FileStorage() fp = fs.realPath(webpath_xlsx) pricings = load_xlsx_pricing(fp) async with get_sor_context(env, 'pricing') as sor: ppts = await sor.R('pricing_program_timing', {'id': pptid}) if ppts: ppt = ppts[0] pps = await sor.R('pricing_program', {'id': ppt.ppid}) if not pps: e = f'pricing_program({pptid}) can not find pricing_program' exception(f) pp = pps[0] pp_spec = yaml.safe_load(pp.pricing_spec) formula = None if pp_spec.get('fields'): fields = pp_spec.get('fields') pricing_formula = pp_spec.get('formula') else: fields = pp_spec newpricings = [] for p in pricings: np = {} for k,v in p.items(): for fk, fv in fields.items(): if k == fv['label']: np.update({fk:v}) newpricings.append(np) d = { 'fields': fields, 'formula': formula, 'pricings': newpricings } debug(f'{d=}') ppt.pricing_data = yaml.dump(d, allow_unicode=True) await sor.U('pricing_program_timing', { 'id': ppt.id, 'pricing_data': ppt.pricing_data }) return True e = f'pricing_program_timing(id={pptid}) not found' exception(e) raise Exception(e) e = f'pricing_program_timing(id={pptid}) read failed' exception(e) raise e @staticmethod async def write_pricing_patten(request, ppid): async with get_sor_context(request._run_ns, 'pricing') as sor: env = request._run_ns recs = await sor.R('pricing_program', {'id': ppid}) if not recs: debug(f'id={ppid} pricing_program not found') r = recs[0] x = DictObject(** yaml.safe_load(r.pricing_spec)) fields = x if x.get('fields'): fields = x['fields'] fpath = write_pattern_xlsx(r.name, fields) return fpath @staticmethod async def write_pricing_data(request, pptid): async with get_sor_context(request._run_ns, 'pricing') as sor: env = request._run_ns recs = await sor.R('pricing_program_timing', {'id': pptid}) if not recs: debug(f'id={pptid} pricing_program not found') r = recs[0] recs = await sor.R('pricing_program', {'id': r.ppid}) x = DictObject(** yaml.safe_load(r.pricing_data)) fields = x if x.get('fields'): fields = x['fields'] data = x.get('pricings') fpath = write_pattern_xlsx(recs[0].name, fields, data=data) return fpath @staticmethod def pp_db2app(pp): try: pp.pricing_spec = yaml.safe_load(pp.pricing_spec) except Exception as e: e = f'{pp.pricing_spec}:yaml数据格式错误' exception(e) raise Exception(e) @staticmethod def pp_app2db(pp): try: pp.pricing_spec = yaml.dump(pp.pricing_spec, allow_unicode=True) except Exception as e: e = f'{pp.pricing_spec}:导出到yaml失败' exception(e) raise Exception(e) @staticmethod def ppt_db2app(ppt): try: ppt.pricing_data = yaml.safe_load(ppt.pricing_data) except Exception as e: e = f'{ppt.pricing_data}:yaml数据格式错误' exception(e) raise Exception(e) @staticmethod def ppt_app2db(ppt): try: ppt.pricing_data = yaml.dump(ppt.pricing_data, allow_unicode=True) except Exception as e: e = f'{ppt.pricing_data}:yaml数据格式错误' exception(e) raise Exception(e) @staticmethod async def reload_pp_data(ppt): env = ServerEnv() ppid = None async with get_sor_context(env, 'pricing') as sor: recs = await sor.R('pricing_program', {'id': ppt['id']}) if len(recs) == 0: exception(f'{ppt["id"]} not found in pricing_program') return ppid = recs[0].ppid debug(f'--EventHandle {ppt}') dat = curDateString() k = f'{ppid}.{dat}' if PricingProgram.pricing_data.get(k): del PricingProgram.pricing_data[k] await PricingProgram.get_ppid_pricing(ppid) @staticmethod async def get_ppid_pricing(ppid): dat = curDateString() k = f'{ppid}.{dat}' d = PricingProgram.pricing_data.get(k) if d: return d env = ServerEnv() async with get_sor_context(env, 'pricing') as sor: sql = """select a.name, a.ownerid, a.providerid, pricing_belong, discount, b.pricing_data from pricing_program a, pricing_program_timing b where a.id = b.ppid and a.id = ${ppid}$ and b.enabled_date <= ${biz_date}$ and b.expired_date > ${biz_date}$ order by b.enabled_date desc""" recs = await sor.sqlExe(sql, { 'ppid': ppid, 'biz_date': dat }) if len(recs) == 0: e = Exception(f'{ppid=},{dat=} data not found') exception(f'{e}') raise e d = recs[0] PricingProgram.pricing_data[k] = d dates = PricingProgram.pricing_data.get(ppid, []) dates.append(dat) dates_cnt = len(dates) if dates_cnt > 2: for i in range(dates_cnt - 2): dk = f'{ppid}.{dates[i]}' PricingProgram.pricing_data[dk] dates = dates[-2:] PricingProgram.pricing_data[ppid] = dates return d async def buffered_charging(ppid, data): r = await PricingProgram.get_ppid_pricing(ppid) prices = PricingProgram.get_pricing_from_ymalstr(data, r.pricing_data) amt = 0.0 for p in prices: p.cost = p.amount * r.discount return prices async def charging(sor, ppid, data): if ppid is None: e = Exception(f'ppid is None, {data=}') exception(f'{e}') raise e if data is None: e = Exception(f'{ppid=} data is None') exception(f'{e}') raise e env = ServerEnv() biz_date = await env.get_business_date(sor) sql = """select a.name, a.ownerid, a.providerid, pricing_belong, discount, b.pricing_data from pricing_program a, pricing_program_timing b where a.id = b.ppid and a.id = ${ppid}$ and b.enabled_date <= ${biz_date}$ and b.expired_date > ${biz_date}$ order by b.enabled_date desc""" recs = await sor.sqlExe(sql, { 'ppid': ppid, 'biz_date': biz_date }) if recs: r = recs[0] r.prices = PricingProgram.get_pricing_from_ymalstr(data, r.pricing_data) debug(f'{r.prices=}') amt = 0.0 for p in r.prices: p.cost = p.amount * r.discount return r.prices @staticmethod def get_pricing_from_ymalstr(config_data, yamlstr): """ yamlstr是从 """ if config_data is None: e = Exception(f'config_data is None, {yamlstr=}') exception(f'{e=}') raise e config_data = config_data.copy() d = None try: d = yaml.safe_load(yamlstr) except Exception as e: exception(f'yaml.sage_load({yamlstr}) error: {e}') raise e d = DictObject(**d) if not d.fields: exception(f'{d} has not "fields"') raise Exception(f'定价定义中没有fields数据') if not d.pricings: exception(f'{d} has not "pricings"') raise Exception(f'定价定义中没有pricing数据') ret_items = [] for i, p in enumerate(d.pricings): if not p.formula: debug(f'无公式:{p=}') continue p_ok = True times = 1 unit = 1 ns = DictObject(**config_data) for k,spec_value in p.items(): if spec_value is None: continue if k == 'formula': continue f = d.fields.get(k) if not f: e = f'定价项({i})中的{k}在fields中没有定义' exception(f'{e}') raise Exception(e) data_value = config_data.get(k) # p[f'old_{k}'] = data_value # p[f'mapping_{k}'] = data_mapping(d, k, data_value) #需要mapping的数据转换 data_value = data_mapping(d, k, data_value) if data_value is None: if 'default' in f.keys(): data_value = f['default'] else: e = f'数据({config_data})没有({k})数据' exception(e) raise Exception(e) try: flg = check_value(f, spec_value, data_value) if not flg: # 条件不满足 # debug(f'条件不满足:{p=},{spec_value=}, {data_value=}, {k=}') p_ok = False break except Exception as e: msg = f'{p=},{f}: {spec_value=}, {data_value=}' exception(f'{e}:{msg}') break if p_ok and p.formula: np = p.copy() formula = p.formula if not formula: e = f'{p} not formula found' exception(e) raise Exception(e) debug(f'{formula=}, {ns=}, {p=}, {d.fields=}') np.data = config_data np.amount = eval(formula, config_data.copy()) ret_items.append(np) else: info(f'{config_data=}, {p=}, {d.model_mappings=}, mismatched') if len(ret_items) == 0: e = f'{config_data=}{yamlstr=}没有找到合适的定价' exception(e) raise Exception(e) return ret_items async def get_pricing_program(ppid): env = ServerEnv() async with get_sor_context(env, 'pricing') as sor: recs = await sor.R('pricing_program', {'id': ppid}) if len(recs) == 0: exception(f'{ppid} not found in pricing_program_timing') return None pp = recs[0] if ppt.pricing_data is None: exception(f'{ppid} pricing_data is None in pricing_program_timing') return None try: PricingProgram.pp_db2app(ppt) except Exception as e: return None async def get_pricing_program_timeing(pptid): env = ServerEnv() async with get_sor_context(env, 'pricing') as sor: recs = await sor.R('pricing_program_timing', {'id': pptid}) if len(recs) == 0: exception(f'{pptid} not found in pricing_program_timing') return None ppt = recs[0] if ppt.pricing_data is None: exception(f'{pptid} pricing_data is None in pricing_program_timing') return None try: PricingProgram.ppt_db2app(ppt) except Exception as e: return None return ppt async def test_pricing(pptid, data): ppt = get_pricing_program_timeing(pptid) prices = PricingProgram.get_pricing_from_ymalstr(data, ppt.pricing_data) if prices is None: return None amount = 0 for p in prices: amount += p.amount return amount if __name__ == '__main__': MyLogger('Pricing', levelname='info') yamlstr = """fields: formula: label: 计算公式 type: str model: label: 模型 type: str model_mappings: # 模型映射 "doubao-seed-2-0-pro-260215": "doubao-seed-2-0-pro" "doubao-seed-2-0-lite-260215": "doubao-seed-2-0-lite" "doubao-seed-2-0-mini-260215": "doubao-seed-2-0-mini" "doubao-seed-2-0-code-preview-260215": "doubao-seed-2-0-code" pricings: - formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0 model: doubao-seed-2-0-pro - formula: 0.6 * float(prompt_tokens) + 3.6 * float(completion_tokens)) / 1000000.0 model: doubao-seed-2-0-lite - formula: (0.2 * float(prompt_tokens) + 2 * float(completion_tokens)) / 1000000.0 model: doubao-seed-2-0-mini - formula: (3.2 * float(prompt_tokens) + 16 * float(completion_tokens)) / 1000000.0 model: doubao-seed-2-0-code """ config_data = {'completion_tokens': 1416, 'prompt_tokens': 52, 'total_tokens': 1468, 'prompt_tokens_details': {'cached_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 884}, 'model': 'doubao-seed-2-0-pro-260215'} print(f'{config_data=}, {yamlstr=}') p = PricingProgram.get_pricing_from_ymalstr(config_data, yamlstr) print(f'{p=}')