kboss/b/user/favorite_search.dspy
2025-08-27 14:05:00 +08:00

108 lines
4.2 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def favorite_search(ns={}):
"""
查询用户收藏列表
:param ns: 包含userid, favorite_type等查询参数
:return: 查询结果
"""
# 处理userid
if ns.get('userid'):
ns['userid'] = ns.get('userid')
else:
ns['userid'] = await get_user()
if not ns.get('userid'):
server_error(401)
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
# 分页参数
current_page = int(ns.get('current_page', 1))
page_size = int(ns.get('page_size', 1000))
offset = (current_page - 1) * page_size
# 构建查询条件
where_conditions = ["f.del_flg = '0'"]
if ns.get('userid'):
where_conditions.append("f.userid = '%s'" % ns.get('userid'))
if ns.get('publish_type'):
where_conditions.append("f.favorite_type = '%s'" % ns.get('publish_type'))
where_clause = " AND ".join(where_conditions)
# 查询总数
count_sql = """
SELECT COUNT(*) AS total_count
FROM user_favorite f
WHERE %s
""" % where_clause
count_result = await sor.sqlExe(count_sql, {})
total_count = count_result[0]['total_count'] if count_result else 0
# 查询数据
query_sql = """
SELECT *
FROM user_favorite f
WHERE %s
ORDER BY f.create_at DESC
LIMIT %s OFFSET %s
""" % (where_clause, page_size, offset)
result = await sor.sqlExe(query_sql, {})
# 通过查询数据中的productid, 查询product表, 增加product_info
for product in result:
product_sql = f"""
SELECT * FROM user_publish_product
WHERE id = '{product['productid']}' AND del_flg = '0' AND listing_status = 'listing';
"""
product_info_li = await sor.sqlExe(product_sql, {})
if product_info_li:
product['product_info'] = product_info_li[0]
# 增加收藏状态
product['product_info']['favorite'] = '1'
# 手机号加*
if product['product_info'].get('phone_number'):
product['product_info']['phone_number'] = '**************'
if product['product_info'].get('img'):
product['product_info']['img'] = 'https://' + product['product_info']['domain_name'] + '/idfile?path=' + product['product_info']['img']
else:
product['product_info'] = None
date_groups = {}
for item in result:
# 提取日期部分(如"2025-08-22 15:58:52" → "2025-08-22"
browse_date = datetime.datetime.strptime(item["create_at"], "%Y-%m-%d %H:%M:%S").strftime("%Y-%m-%d")
if browse_date not in date_groups:
date_groups[browse_date] = []
date_groups[browse_date].append(item)
# 按日期升序排序并添加序号
sorted_dates = sorted(date_groups.keys(), reverse=True) # 按日期升序排列
result = []
for idx, date in enumerate(sorted_dates, start=1):
result.append({
"id": str(idx), # 序号从1开始
"browse_date": date,
"products": date_groups[date] # 该日期下的所有浏览记录
})
return {
'status': True,
'msg': '查询成功',
'data': {
'total_count': total_count,
'current_page': current_page,
'page_size': page_size,
'favorites': result
}
}
except Exception as e:
return {
'status': False,
'msg': '查询失败, %s' % str(e)
}
ret = await favorite_search(params_kw)
return ret