main #41
@ -1,2 +0,0 @@
|
|||||||
2025-11-20 17:24:21 - 错误: 未找到百度用户ID 2529cb85e30a4d51a348e71edf5fa947 对应的本地用户
|
|
||||||
2025-11-20 17:28:43 - 错误: 未找到百度用户ID 2529cb85e30a4d51a348e71edf5fa947 对应的本地用户
|
|
||||||
@ -1,2 +0,0 @@
|
|||||||
Message_fabaab3c68a6436cbfe483634b77bcda
|
|
||||||
Message_fb0d466fd8444a1987ac5bab8d7d0250
|
|
||||||
@ -1,17 +0,0 @@
|
|||||||
轮询开始时间:2025-11-20 17:18:31
|
|
||||||
轮询开始时间:2025-11-20 17:24:18
|
|
||||||
{'id': 'Message_fabaab3c68a6436cbfe483634b77bcda', 'taskId': 'Task_a97448bf43af415c861f4655e3f6927a', 'userId': '2529cb85e30a4d51a348e71edf5fa947', 'accountId': '2529cb85e30a4d51a348e71edf5fa947', 'userType': 'account', 'receiverId': '13261188523', 'contentVar': '', 'sendChannel': 'MOBILE', 'content': '验证码为:925124,感谢您使用百度云服务,请填写完整完成验证。', 'messageTemplateId': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'messageTemplate': {'id': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'name': 'IAM验证码通知', 'tag': 'iam', 'product': '', 'type': 'CUSTOM', 'sendChannel': 'MOBILE', 'locale': 'zh-CN', 'taskTemplateId': 'ffc54482-3528-41f8-85d8-5ad44c9e53d7', 'channelTemplateId': '', 'parentId': '', 'content': '验证码为:${code},感谢您使用百度云服务,请填写完整完成验证。', 'contentType': 'TEXT', 'sign': '', 'createTime': '2017-12-07T17:44:17Z', 'createBy': 'UUAP:dengxiaochao', 'updateTime': '2025-11-18T18:18:58Z', 'updateBy': 'UUAP:dengxiaochao'}, 'isVirtualStore': True, 'channelMessageId': None, 'channelStatus': None, 'majorCategory': '03', 'minorCategory': '03002', 'status': 'NEW', 'createTime': '2025-11-20T09:22:32Z', 'updateTime': None, 'expireTime': None, 'windowTime': None, 'valid': True, 'sendComplete': False, 'disturbHold': False, 'complete': False}
|
|
||||||
轮询开始时间:2025-11-20 17:25:45
|
|
||||||
轮询开始时间:2025-11-20 17:25:54
|
|
||||||
轮询开始时间:2025-11-20 17:26:07
|
|
||||||
轮询开始时间:2025-11-20 17:26:17
|
|
||||||
轮询开始时间:2025-11-20 17:26:26
|
|
||||||
轮询开始时间:2025-11-20 17:26:36
|
|
||||||
轮询开始时间:2025-11-20 17:27:38
|
|
||||||
轮询开始时间:2025-11-20 17:27:46
|
|
||||||
轮询开始时间:2025-11-20 17:27:55
|
|
||||||
轮询开始时间:2025-11-20 17:28:04
|
|
||||||
轮询开始时间:2025-11-20 17:28:21
|
|
||||||
轮询开始时间:2025-11-20 17:28:30
|
|
||||||
轮询开始时间:2025-11-20 17:28:38
|
|
||||||
{'id': 'Message_fb0d466fd8444a1987ac5bab8d7d0250', 'taskId': 'Task_a7cc98a04bf74ffe97d36a34008b66e3', 'userId': '2529cb85e30a4d51a348e71edf5fa947', 'accountId': '2529cb85e30a4d51a348e71edf5fa947', 'userType': 'account', 'receiverId': '13261188523', 'contentVar': '', 'sendChannel': 'MOBILE', 'content': '验证码为:925124,感谢您使用百度云服务,请填写完整完成验证。', 'messageTemplateId': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'messageTemplate': {'id': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'name': 'IAM验证码通知', 'tag': 'iam', 'product': '', 'type': 'CUSTOM', 'sendChannel': 'MOBILE', 'locale': 'zh-CN', 'taskTemplateId': 'ffc54482-3528-41f8-85d8-5ad44c9e53d7', 'channelTemplateId': '', 'parentId': '', 'content': '验证码为:${code},感谢您使用百度云服务,请填写完整完成验证。', 'contentType': 'TEXT', 'sign': '', 'createTime': '2017-12-07T17:44:17Z', 'createBy': 'UUAP:dengxiaochao', 'updateTime': '2025-11-18T18:18:58Z', 'updateBy': 'UUAP:dengxiaochao'}, 'isVirtualStore': True, 'channelMessageId': None, 'channelStatus': None, 'majorCategory': '03', 'minorCategory': '03002', 'status': 'NEW', 'createTime': '2025-11-20T09:28:15Z', 'updateTime': None, 'expireTime': None, 'windowTime': None, 'valid': True, 'sendComplete': False, 'disturbHold': False, 'complete': False}
|
|
||||||
@ -34,4 +34,4 @@
|
|||||||
*/1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1
|
*/1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1
|
||||||
* * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1
|
* * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1
|
||||||
#* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1
|
#* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1
|
||||||
*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py
|
#*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py
|
||||||
|
|||||||
@ -90,6 +90,7 @@ async def time_convert(resoucetime=None):
|
|||||||
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
async def baidu_sms_kafka_consumer(ns={}):
|
async def baidu_sms_kafka_consumer(ns={}):
|
||||||
|
import os
|
||||||
consumer = BaiduKafKaConsumer({
|
consumer = BaiduKafKaConsumer({
|
||||||
# 接入点
|
# 接入点
|
||||||
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
|
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
|
||||||
@ -113,14 +114,35 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
# 订阅的主题名称
|
# 订阅的主题名称
|
||||||
consumer.subscribe(['kaiyuanyun_msg_topic'])
|
consumer.subscribe(['kaiyuanyun_msg_topic'])
|
||||||
|
|
||||||
|
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
|
||||||
|
for filename in files:
|
||||||
|
if not os.path.exists(filename):
|
||||||
|
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
|
||||||
|
pass # 创建空文件
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# total_count = 0
|
||||||
while True:
|
while True:
|
||||||
msg = consumer.poll(1) # 单次轮询获取消息
|
i = 0
|
||||||
|
# if i == 0:
|
||||||
|
# # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
|
||||||
|
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||||
|
# f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
|
|
||||||
|
msg = consumer.poll(0.1) # 单次轮询获取消息
|
||||||
|
|
||||||
if msg is None:
|
if msg is None:
|
||||||
|
if i == 10:
|
||||||
|
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||||
|
f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||||
continue
|
continue
|
||||||
elif msg.error():
|
elif msg.error():
|
||||||
print(f"消费者错误: {msg.error()}")
|
# 写入日志文件记录错误信息
|
||||||
|
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||||
|
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
|
||||||
else:
|
else:
|
||||||
|
# total_count += 1
|
||||||
try:
|
try:
|
||||||
# 解析消息内容为字典(避免变量名冲突)
|
# 解析消息内容为字典(避免变量名冲突)
|
||||||
msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
||||||
@ -128,14 +150,8 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
messageid = msg_data.get('id')
|
messageid = msg_data.get('id')
|
||||||
taskid = msg_data.get('taskId')
|
taskid = msg_data.get('taskId')
|
||||||
|
|
||||||
filename = 'baidu_kafka_msg.txt'
|
|
||||||
# 检查文件是否存在,不存在则创建
|
|
||||||
if not os.path.exists(filename):
|
|
||||||
with open(filename, 'w', encoding='utf-8') as f:
|
|
||||||
print(f"文件不存在,已创建 {filename}")
|
|
||||||
|
|
||||||
# 读取文件内容进行检查
|
# 读取文件内容进行检查
|
||||||
with open(filename, 'r', encoding='utf-8') as f:
|
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
|
|
||||||
if messageid in content:
|
if messageid in content:
|
||||||
@ -143,18 +159,22 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# 追加写入目标内容
|
# 追加写入目标内容
|
||||||
with open(filename, 'a', encoding='utf-8') as f:
|
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
|
||||||
f.write(messageid + '\n')
|
f.write(messageid + '\n')
|
||||||
print(f"已写入 '{messageid}' 到文件")
|
print(f"已写入 '{messageid}' 到文件")
|
||||||
|
|
||||||
|
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||||
|
f.write(str(msg_data) + '\n')
|
||||||
|
|
||||||
db = DBPools()
|
db = DBPools()
|
||||||
async with db.sqlorContext('kboss') as sor:
|
async with db.sqlorContext('kboss') as sor:
|
||||||
# 1. 去重检查(messageid和taskid)
|
# 1. 去重检查(messageid和taskid)
|
||||||
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
||||||
if exist_msg:
|
if exist_msg:
|
||||||
print(f"消息id {messageid} 已存在,跳过处理")
|
print(f"消息id {messageid} 已存在,跳过处理")
|
||||||
consumer.close()
|
continue
|
||||||
return
|
# consumer.close()
|
||||||
|
# return
|
||||||
|
|
||||||
# 2. 构建小写key的ns字典(完整映射所有字段)
|
# 2. 构建小写key的ns字典(完整映射所有字段)
|
||||||
ns_msg = {
|
ns_msg = {
|
||||||
@ -187,7 +207,7 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
|
|
||||||
# 3. 执行存库
|
# 3. 执行存库
|
||||||
await sor.C('baidu_kafka_msg', ns_msg)
|
await sor.C('baidu_kafka_msg', ns_msg)
|
||||||
print(f"消息id {messageid} 存储成功")
|
# print(f"消息id {messageid} 存储成功")
|
||||||
|
|
||||||
# 4. 触发短信发送(当sendChannel为MOBILE时)
|
# 4. 触发短信发送(当sendChannel为MOBILE时)
|
||||||
send_channel = msg_data.get('sendChannel')
|
send_channel = msg_data.get('sendChannel')
|
||||||
@ -203,15 +223,25 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
# 查询用户手机号
|
# 查询用户手机号
|
||||||
account_id = msg_data.get('accountId')
|
account_id = msg_data.get('accountId')
|
||||||
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
|
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
|
||||||
user_local_id = user_local_id_li[0]['user_id']
|
if user_local_id_li:
|
||||||
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
|
user_local_id = user_local_id_li[0]['user_id']
|
||||||
mobile = user_mobile_li[0]['mobile']
|
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
|
||||||
|
mobile = user_mobile_li[0]['mobile']
|
||||||
|
|
||||||
# 调用短信发送接口
|
# 调用短信发送接口
|
||||||
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
|
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
|
||||||
if kyy_send_status.get('status'):
|
if kyy_send_status.get('status'):
|
||||||
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
|
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
|
||||||
print(f"已触发短信发送至 {mobile}")
|
else:
|
||||||
|
# 记录错误日志 短信发送失败
|
||||||
|
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||||
|
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
|
||||||
|
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
|
||||||
|
else:
|
||||||
|
# 记录错误日志 用户未找到
|
||||||
|
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||||
|
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
|
||||||
|
print(f"未找到百度用户ID {account_id} 对应的本地用户")
|
||||||
|
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
print("错误:消息内容非有效JSON")
|
print("错误:消息内容非有效JSON")
|
||||||
@ -225,6 +255,9 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
'status': False,
|
'status': False,
|
||||||
'msg': f"处理异常: {str(e)}"
|
'msg': f"处理异常: {str(e)}"
|
||||||
}
|
}
|
||||||
|
# 记录total_count
|
||||||
|
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||||
|
# f.write(f"本次轮询共处理消息数:{total_count}\n")
|
||||||
|
|
||||||
consumer.close() # 确保消费者关闭
|
consumer.close() # 确保消费者关闭
|
||||||
return {
|
return {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user