kboss/b/product/provider_product_group.dspy
2025-07-16 14:27:17 +08:00

48 lines
1.9 KiB
Plaintext

async def provider_product_group(ns={}):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
product_get_sql = """select name, description, label, spec_note, productgroup from product where providerpid like '{}' and del_flg = 0;""".format(ns.get('prefix') + '_%%')
products = await sor.sqlExe(product_get_sql, {})
# 使用字典来按'group'字段分组数据
grouped_data = {}
for item in products:
spec_note = json.loads(item['spec_note'].replace('None', '""'))
for spec in spec_note:
if spec.get('configName') == 'url':
item['url'] = spec.get('value')
elif spec.get('configName') == 'listUrl':
item['listUrl'] = spec.get('value')
elif spec.get('configName') == 'iconUrl':
item['iconUrl'] = spec.get('value')
item.pop('spec_note')
group = item.get('productgroup')
if group not in grouped_data:
grouped_data[group] = []
grouped_data[group].append(item)
# 构造最终的数据结构
result = []
for group, persons in grouped_data.items():
result.append({'title': group, 'products': persons})
priority = {'计算': 0, '网络': 1, '存储': 2, '数据库': 3, '大数据平台': 4}
sorted_data = sorted(
result,
key=lambda x: (priority.get(x['title'], 4), result.index(x))
)
return {
'status': True,
'msg': 'get product success',
'data': sorted_data
}
except Exception as e:
print(e)
return {
'status': False,
'msg': str(e)
}
ret = await provider_product_group(params_kw)
return ret