Compare commits

...

2 Commits

Author SHA1 Message Date
ping
3afe8b8f7e Merge branch 'main' of https://git.opencomputing.cn/yumoqing/kboss 2025-11-26 15:43:46 +08:00
ping
cc080e149a ujpdate 2025-11-26 15:43:40 +08:00
8 changed files with 563 additions and 0 deletions

View File

@ -0,0 +1,45 @@
async def home_page_product_add(ns={}):
"""
添加产品信息
:param ns: 包含产品信息的字典
"""
ns_dic = {
'id': uuid(), # 固定写法
'menu_id': ns.get('menu_id'),
'name': ns.get('name'),
'description': ns.get('description'),
'label': ns.get('label'),
'product_group': ns.get('product_group'),
'url': ns.get('url'),
'list_url': ns.get('list_url'),
'icon_url': ns.get('icon_url'),
'source': ns.get('source'),
'sort_order': ns.get('sort_order', 0),
'del_flg': '0'
}
# 验证必填字段
if not ns_dic.get('menu_id') or not ns_dic.get('name'):
return {
'status': False,
'msg': 'menu_id and name are required'
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
await sor.C('home_page_product_info', ns_dic)
return {
'status': True,
'msg': 'create product success',
'data': {'id': ns_dic['id']}
}
except Exception as e:
await sor.rollback()
return {
'status': False,
'msg': 'create product failed, %s' % str(e)
}
ret = await home_page_product_add(params_kw)
return ret

View File

@ -0,0 +1,33 @@
async def home_page_product_delete(ns={}):
"""
软删除产品信息 id值必传 并且把del_flg值修改为1
:param ns:
:return:
"""
if not ns.get('id'):
return {
'status': False,
'msg': 'product id is required'
}
ns_dic = {
'id': ns.get('id'),
'del_flg': '1'
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
await sor.U('home_page_product_info', ns_dic)
return {
'status': True,
'msg': 'delete product success'
}
except Exception as e:
await sor.rollback()
return {
'status': False,
'msg': 'delete product failed, %s' % str(e)
}
ret = await home_page_product_delete(params_kw)
return ret

View File

@ -0,0 +1,45 @@
async def home_page_product_menu_add(ns={}):
"""
添加首页产品菜单
:param ns: 包含菜单信息的字典
"""
ns_dic = {
'id': uuid(), # 固定写法
'parent_id': ns.get('parent_id'), # NULL表示一级菜单
'menu_level': ns.get('menu_level'), # 1-一级, 2-二级, 3-三级
'title': ns.get('title'),
'sort_order': ns.get('sort_order', 0)
}
# 验证必填字段
if not ns_dic.get('menu_level') or not ns_dic.get('title'):
return {
'status': False,
'msg': 'menu_level and title are required'
}
if ns.get('menu_level') > 1 and not ns.get('parent_id'):
return {
'status': False,
'msg': 'parent_id is required for menu_level > 1'
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
await sor.C('home_page_product_menu', ns_dic)
return {
'status': True,
'msg': 'create menu success',
'data': {'id': ns_dic['id']}
}
except Exception as e:
await sor.rollback()
return {
'status': False,
'msg': 'create menu failed, %s' % str(e)
}
ret = await home_page_product_menu_add(params_kw)
return ret

View File

@ -0,0 +1,33 @@
async def home_page_product_menu_delete(ns={}):
"""
软删除菜单 id值必传 并且把del_flg值修改为1
:param ns:
:return:
"""
if not ns.get('id'):
return {
'status': False,
'msg': 'menu id is required'
}
ns_dic = {
'id': ns.get('id'),
'del_flg': '1'
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
await sor.U('home_page_product_menu', ns_dic)
return {
'status': True,
'msg': 'delete menu success'
}
except Exception as e:
await sor.rollback()
return {
'status': False,
'msg': 'delete menu failed, %s' % str(e)
}
ret = await home_page_product_menu_delete(params_kw)
return ret

View File

@ -0,0 +1,229 @@
async def home_page_product_menu_search(ns={}):
"""
查找菜单支持按层级和父级ID筛选
规则:
1. 无参数时,返回所有数据并有嵌套层级
2. 有参数时menu_level和title必传返回当前层级以及向下的嵌套层级数据
:param ns:
:return:
"""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
# 无参数时,返回所有数据
if not ns:
conditions = {"del_flg": "0"}
result = await sor.R('home_page_product_menu', conditions)
menu_tree = await build_menu_tree(result)
return {
'status': True,
'msg': 'search all menu success',
'data': menu_tree
}
# 有参数时,验证必填参数
# if not ns.get('menu_level') or not ns.get('title'):
# return {
# 'status': False,
# 'msg': 'menu_level and title are required when parameters are provided'
# }
# 获取指定层级及向下层级的数据
menu_level = int(ns.get('menu_level'))
title = ns.get('title')
# 构建查询条件,获取当前层级及向下层级的数据
conditions = {"del_flg": "0"}
# 根据层级获取相应的数据
if menu_level == 1:
# 获取所有层级的数据,用于构建完整的树形结构
all_conditions = {"del_flg": "0"}
if title:
# 如果指定了标题,获取所有层级中标题匹配的数据
all_result = await sor.R('home_page_product_menu', all_conditions)
filtered_result = [item for item in all_result if title in item.get('title', '')]
# 获取匹配项的所有子级数据
result = []
parent_ids = set()
# 收集一级菜单的ID
for item in filtered_result:
if item['menu_level'] == 1:
parent_ids.add(item['id'])
result.append(item)
# 获取所有子级数据
if parent_ids:
all_data = await sor.R('home_page_product_menu', {"del_flg": "0"})
for item in all_data:
if item['parent_id'] in parent_ids:
result.append(item)
menu_tree = await build_menu_tree(result)
else:
# 获取所有数据
result = await sor.R('home_page_product_menu', all_conditions)
menu_tree = await build_menu_tree(result)
elif menu_level == 2:
# 获取二级菜单及其子级(三级)
all_conditions = {"del_flg": "0"}
all_result = await sor.R('home_page_product_menu', all_conditions)
# 筛选匹配的二级菜单
level2_items = [item for item in all_result if item['menu_level'] == 2 and title in item.get('title', '')]
if level2_items:
parent_ids = set(item['id'] for item in level2_items)
# 获取对应的一级菜单
result = []
for item in all_result:
if item['menu_level'] == 1 and any(level2['parent_id'] == item['id'] for level2 in level2_items):
result.append(item)
# 添加匹配的二级菜单
result.extend(level2_items)
# 获取三级子菜单
for item in all_result:
if item['menu_level'] == 3 and item['parent_id'] in parent_ids:
result.append(item)
menu_tree = await build_menu_tree(result)
else:
menu_tree = []
elif menu_level == 3:
# 获取三级菜单
all_conditions = {"del_flg": "0"}
all_result = await sor.R('home_page_product_menu', all_conditions)
# 筛选匹配的三级菜单
level3_items = [item for item in all_result if item['menu_level'] == 3 and title in item.get('title', '')]
if level3_items:
result = []
# 获取对应的二级和一级菜单
level2_ids = set()
level1_ids = set()
for item in level3_items:
level2_ids.add(item['parent_id'])
for item in all_result:
if item['menu_level'] == 2 and item['id'] in level2_ids:
result.append(item)
level1_ids.add(item['parent_id'])
for item in all_result:
if item['menu_level'] == 1 and item['id'] in level1_ids:
result.append(item)
# 添加匹配的三级菜单
result.extend(level3_items)
menu_tree = await build_menu_tree(result)
else:
menu_tree = []
else:
return {
'status': False,
'msg': 'Invalid menu_level, must be 1, 2, or 3'
}
return {
'status': True,
'msg': 'search menu success',
'data': menu_tree
}
except Exception as e:
return {
'status': False,
'msg': 'search menu failed, %s' % str(e)
}
async def get_home_page_product(ns={}):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
find_sql = """select * from home_page_product_info where menu_id = '%s' and del_flg = '0' order by sort_order asc;""" % ns.get("menu_id")
result = await sor.sqlExe(find_sql, {})
return result
# 构建菜单树形结构
async def build_menu_tree(menu_list, target_level=None, target_title=None):
"""
将扁平的菜单列表转换为树形结构
:param menu_list: 菜单列表
:param target_level: 目标层级(可选)
:param target_title: 目标标题(可选)
:return: 树形结构的菜单
"""
# 通过sort_order对菜单进行排序
menu_list.sort(key=lambda x: x['sort_order'], reverse=True)
menu_dict = {}
result = []
# 先创建所有菜单的字典
for menu in menu_list:
# 创建菜单节点的基本结构
menu_node = {
'id': menu['id'],
'title': menu['title'],
'menu_level': menu['menu_level'],
'parent_id': menu['parent_id'],
'children': [] # 动态子菜单数组
}
menu_dict[menu['id']] = menu_node
# 构建父子关系
for menu in menu_list:
menu_node = menu_dict[menu['id']]
parent_id = menu['parent_id']
# 如果有父级且父级存在
if parent_id and parent_id in menu_dict:
parent_node = menu_dict[parent_id]
parent_node['children'].append(menu_node)
else:
# 没有父级的菜单(根节点)
result.append(menu_node)
# 递归构建层级结构
async def build_hierarchy(node, level=1):
"""递归构建层级结构"""
if level == 1:
return {
'id': node['id'],
'firTitle': node['title'],
'secMenu': [await build_hierarchy(child, 2) for child in node['children']]
}
elif level == 2:
return {
'id': node['id'],
'secTitle': node['title'],
'thrMenu': [await build_hierarchy(child, 3) for child in node['children']]
}
else:
# 三级及以上
return {
'id': node['id'],
'thrTitle': node['title'],
'value': await get_home_page_product({'menu_id': node['id']}) # 三级菜单没有子菜单value为空列表
}
# 构建最终的树形结构
final_result = []
for root_node in result:
final_result.append(await build_hierarchy(root_node))
return final_result
ret = await home_page_product_menu_search(params_kw)
return ret

View File

@ -0,0 +1,40 @@
async def home_page_product_menu_update(ns={}):
"""
更新菜单信息
:param ns:
:return:
"""
if not ns.get('id'):
return {
'status': False,
'msg': 'menu id is required'
}
# 构建更新字段,只更新传入的字段
ns_dic = {'id': ns.get('id')}
if ns.get('title'):
ns_dic['title'] = ns.get('title')
if ns.get('parent_id') is not None:
ns_dic['parent_id'] = ns.get('parent_id')
if ns.get('menu_level'):
ns_dic['menu_level'] = ns.get('menu_level')
if ns.get('sort_order') is not None:
ns_dic['sort_order'] = ns.get('sort_order')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
await sor.U('home_page_product_menu', ns_dic)
return {
'status': True,
'msg': 'update menu success'
}
except Exception as e:
await sor.rollback()
return {
'status': False,
'msg': 'update menu failed, %s' % str(e)
}
ret = await home_page_product_menu_update(params_kw)
return ret

View File

@ -0,0 +1,86 @@
async def home_page_product_search(ns={}):
"""
查找产品信息支持按菜单ID、产品名称等筛选
:param ns:
:return:
"""
# 分页参数
current_page = int(ns.get('current_page', 1))
page_size = int(ns.get('page_size', 10))
offset = (current_page - 1) * page_size
# 构建查询条件
conditions = ["hp.del_flg = '0'"]
if ns.get('menu_id'):
conditions.append(f"hp.menu_id = '{ns.get('menu_id')}'")
if ns.get('name'):
conditions.append(f"hp.name LIKE '%{ns.get('name')}%'")
if ns.get('product_group'):
conditions.append(f"hp.product_group = '{ns.get('product_group')}'")
if ns.get('source'):
conditions.append(f"hp.source = '{ns.get('source')}'")
where_clause = " AND ".join(conditions)
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
# 查询总数
count_sql = f"""
SELECT COUNT(*) AS total_count
FROM home_page_product_info hp
WHERE {where_clause}
"""
total_count = (await sor.sqlExe(count_sql, {}))[0]['total_count']
# 查询产品列表
find_sql = f"""
SELECT hp.*, hm.title as menu_title, hm.menu_level
FROM home_page_product_info hp
LEFT JOIN home_page_product_menu hm ON hp.menu_id = hm.id
WHERE {where_clause}
ORDER BY hp.sort_order ASC, hp.create_at DESC
LIMIT {page_size} OFFSET {offset}
"""
result = await sor.sqlExe(find_sql, {})
# 格式化返回数据匹配home_page_menu.json的数据格式
product_list = []
for product in result:
product_data = {
'id': product['id'],
'name': product['name'],
'description': product['description'],
'label': product['label'],
'product_group': product['product_group'], # 注意字段名映射
'url': product['url'],
'list_url': product['list_url'], # 注意字段名映射
'iconUrl': product['icon_url'], # 注意字段名映射
'icon_url': product['source'],
'menu_id': product['menu_id'],
'menu_title': product['menu_title'],
'menu_level': product['menu_level']
}
product_list.append(product_data)
return {
'status': True,
'msg': 'search product success',
'data': {
'product_list': product_list
},
'pagination': {
'total': total_count,
'page_size': page_size,
'current_page': current_page,
}
}
except Exception as e:
return {
'status': False,
'msg': 'search product failed, %s' % str(e)
}
ret = await home_page_product_search(params_kw)
return ret

View File

@ -0,0 +1,52 @@
async def home_page_product_update(ns={}):
"""
更新产品信息
:param ns:
:return:
"""
if not ns.get('id'):
return {
'status': False,
'msg': 'product id is required'
}
# 构建更新字段,只更新传入的字段
ns_dic = {'id': ns.get('id')}
if ns.get('name'):
ns_dic['name'] = ns.get('name')
if ns.get('description') is not None:
ns_dic['description'] = ns.get('description')
if ns.get('label') is not None:
ns_dic['label'] = ns.get('label')
if ns.get('product_group') is not None:
ns_dic['product_group'] = ns.get('product_group')
if ns.get('url') is not None:
ns_dic['url'] = ns.get('url')
if ns.get('list_url') is not None:
ns_dic['list_url'] = ns.get('list_url')
if ns.get('icon_url') is not None:
ns_dic['icon_url'] = ns.get('icon_url')
if ns.get('source') is not None:
ns_dic['source'] = ns.get('source')
if ns.get('sort_order') is not None:
ns_dic['sort_order'] = ns.get('sort_order')
if ns.get('menu_id'):
ns_dic['menu_id'] = ns.get('menu_id')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
await sor.U('home_page_product_info', ns_dic)
return {
'status': True,
'msg': 'update product success'
}
except Exception as e:
await sor.rollback()
return {
'status': False,
'msg': 'update product failed, %s' % str(e)
}
ret = await home_page_product_update(params_kw)
return ret