This commit is contained in:
ping 2025-11-20 18:21:04 +08:00
parent f87215b1df
commit ed9a33663e
4 changed files with 9327 additions and 25 deletions

View File

@ -1,2 +1,6 @@
Message_fabaab3c68a6436cbfe483634b77bcda Message_fabaab3c68a6436cbfe483634b77bcda
Message_fb0d466fd8444a1987ac5bab8d7d0250 Message_fb0d466fd8444a1987ac5bab8d7d0250
Message_7bc5a59afd434737b21a49893041804d
Message_df99998d476d47b59960d078e715a85a
Message_5454656e48ad410183c7ead615000d9a
Message_35cdeb143c014279bc89d88e5d0fcba1

File diff suppressed because it is too large Load Diff

View File

@ -34,4 +34,4 @@
*/1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1 */1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1
* * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1 * * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1
#* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1 #* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1
*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py #*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py

View File

@ -90,6 +90,7 @@ async def time_convert(resoucetime=None):
return beijing_time.strftime("%Y-%m-%d %H:%M:%S") return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}): async def baidu_sms_kafka_consumer(ns={}):
import os
consumer = BaiduKafKaConsumer({ consumer = BaiduKafKaConsumer({
# 接入点 # 接入点
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
@ -113,14 +114,35 @@ async def baidu_sms_kafka_consumer(ns={}):
# 订阅的主题名称 # 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic']) consumer.subscribe(['kaiyuanyun_msg_topic'])
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
for filename in files:
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
pass # 创建空文件
else:
pass
# total_count = 0
while True: while True:
msg = consumer.poll(1) # 单次轮询获取消息 i = 0
# if i == 0:
# # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
msg = consumer.poll(0.1) # 单次轮询获取消息
if msg is None: if msg is None:
if i == 10:
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询第10次消息为None{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
continue continue
elif msg.error(): elif msg.error():
print(f"消费者错误: {msg.error()}") # 写入日志文件记录错误信息
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else: else:
# total_count += 1
try: try:
# 解析消息内容为字典(避免变量名冲突) # 解析消息内容为字典(避免变量名冲突)
msg_data_ori = json.loads(msg.value().decode('utf-8')) msg_data_ori = json.loads(msg.value().decode('utf-8'))
@ -128,14 +150,8 @@ async def baidu_sms_kafka_consumer(ns={}):
messageid = msg_data.get('id') messageid = msg_data.get('id')
taskid = msg_data.get('taskId') taskid = msg_data.get('taskId')
filename = 'baidu_kafka_msg.txt'
# 检查文件是否存在,不存在则创建
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f:
print(f"文件不存在,已创建 {filename}")
# 读取文件内容进行检查 # 读取文件内容进行检查
with open(filename, 'r', encoding='utf-8') as f: with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
if messageid in content: if messageid in content:
@ -143,18 +159,22 @@ async def baidu_sms_kafka_consumer(ns={}):
continue continue
else: else:
# 追加写入目标内容 # 追加写入目标内容
with open(filename, 'a', encoding='utf-8') as f: with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
f.write(messageid + '\n') f.write(messageid + '\n')
print(f"已写入 '{messageid}' 到文件") print(f"已写入 '{messageid}' 到文件")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools() db = DBPools()
async with db.sqlorContext('kboss') as sor: async with db.sqlorContext('kboss') as sor:
# 1. 去重检查messageid和taskid # 1. 去重检查messageid和taskid
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg: if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理") print(f"消息id {messageid} 已存在,跳过处理")
consumer.close() continue
return # consumer.close()
# return
# 2. 构建小写key的ns字典完整映射所有字段 # 2. 构建小写key的ns字典完整映射所有字段
ns_msg = { ns_msg = {
@ -187,7 +207,7 @@ async def baidu_sms_kafka_consumer(ns={}):
# 3. 执行存库 # 3. 执行存库
await sor.C('baidu_kafka_msg', ns_msg) await sor.C('baidu_kafka_msg', ns_msg)
print(f"消息id {messageid} 存储成功") # print(f"消息id {messageid} 存储成功")
# 4. 触发短信发送当sendChannel为MOBILE时 # 4. 触发短信发送当sendChannel为MOBILE时
send_channel = msg_data.get('sendChannel') send_channel = msg_data.get('sendChannel')
@ -203,15 +223,25 @@ async def baidu_sms_kafka_consumer(ns={}):
# 查询用户手机号 # 查询用户手机号
account_id = msg_data.get('accountId') account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
user_local_id = user_local_id_li[0]['user_id'] if user_local_id_li:
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) user_local_id = user_local_id_li[0]['user_id']
mobile = user_mobile_li[0]['mobile'] user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口 # 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'): if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
print(f"已触发短信发送至 {mobile}") else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户")
except json.JSONDecodeError: except json.JSONDecodeError:
print("错误消息内容非有效JSON") print("错误消息内容非有效JSON")
@ -225,6 +255,9 @@ async def baidu_sms_kafka_consumer(ns={}):
'status': False, 'status': False,
'msg': f"处理异常: {str(e)}" 'msg': f"处理异常: {str(e)}"
} }
# 记录total_count
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"本次轮询共处理消息数:{total_count}\n")
consumer.close() # 确保消费者关闭 consumer.close() # 确保消费者关闭
return { return {