百度kafka 接收消息推送
This commit is contained in:
parent
e9b0e01a53
commit
fcbbb6056f
@ -6,29 +6,30 @@ async def time_convert(resoucetime=None):
|
||||
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
async def baidu_sms_kafka_consumer(ns={}):
|
||||
import os
|
||||
consumer = BaiduKafKaConsumer({
|
||||
# 接入点
|
||||
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
|
||||
# 接入协议
|
||||
'security.protocol': 'SASL_SSL',
|
||||
'ssl.endpoint.identification.algorithm': 'none',
|
||||
# 证书文件路径
|
||||
'ssl.ca.location': 'baidu_kafka_ca.pem',
|
||||
# SASL 机制
|
||||
'sasl.mechanism': 'SCRAM-SHA-512',
|
||||
# SASL 用户名
|
||||
'sasl.username': 'kaiyuanyun',
|
||||
# SASL 用户密码
|
||||
'sasl.password': 'Kyy250609#',
|
||||
# 消费组id
|
||||
'group.id': 'kaiyuanyun_msg_group',
|
||||
'auto.offset.reset': 'latest',
|
||||
'fetch.message.max.bytes': '1024*512',
|
||||
})
|
||||
# consumer = BaiduKafKaConsumer({
|
||||
# # 接入点
|
||||
# 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
|
||||
# # 接入协议
|
||||
# 'security.protocol': 'SASL_SSL',
|
||||
# 'ssl.endpoint.identification.algorithm': 'none',
|
||||
# # 证书文件路径
|
||||
# 'ssl.ca.location': 'baidu_kafka_ca.pem',
|
||||
# # SASL 机制
|
||||
# 'sasl.mechanism': 'SCRAM-SHA-512',
|
||||
# # SASL 用户名
|
||||
# 'sasl.username': 'kaiyuanyun',
|
||||
# # SASL 用户密码
|
||||
# 'sasl.password': 'Kyy250609#',
|
||||
# # 消费组id
|
||||
# 'group.id': 'kaiyuanyun_msg_group',
|
||||
# 'auto.offset.reset': 'latest',
|
||||
# 'fetch.message.max.bytes': '1024*512',
|
||||
# })
|
||||
|
||||
# 订阅的主题名称
|
||||
consumer.subscribe(['kaiyuanyun_msg_topic'])
|
||||
# # 订阅的主题名称
|
||||
# consumer.subscribe(['kaiyuanyun_msg_topic'])
|
||||
|
||||
import os
|
||||
|
||||
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
|
||||
for filename in files:
|
||||
@ -38,143 +39,129 @@ async def baidu_sms_kafka_consumer(ns={}):
|
||||
else:
|
||||
pass
|
||||
|
||||
total_count = 0
|
||||
for i in range(10):
|
||||
# if i == 0:
|
||||
# # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
|
||||
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||
# f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
|
||||
msg = consumer.poll(0.01) # 单次轮询获取消息
|
||||
|
||||
if msg is None:
|
||||
if i == 10:
|
||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
continue
|
||||
elif msg.error():
|
||||
# 写入日志文件记录错误信息
|
||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
|
||||
else:
|
||||
total_count += 1
|
||||
try:
|
||||
# 解析消息内容为字典(避免变量名冲突)
|
||||
msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
||||
msg_data = msg_data_ori['messages'][0]
|
||||
messageid = msg_data.get('id')
|
||||
taskid = msg_data.get('taskId')
|
||||
|
||||
# 读取文件内容进行检查
|
||||
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
if messageid in content:
|
||||
print(f"文件中已存在 '{messageid}',跳过写入")
|
||||
continue
|
||||
else:
|
||||
# 追加写入目标内容
|
||||
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(messageid + '\n')
|
||||
print(f"已写入 '{messageid}' 到文件")
|
||||
|
||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(str(msg_data) + '\n')
|
||||
|
||||
db = DBPools()
|
||||
async with db.sqlorContext('kboss') as sor:
|
||||
# 1. 去重检查(messageid和taskid)
|
||||
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
||||
if exist_msg:
|
||||
print(f"消息id {messageid} 已存在,跳过处理")
|
||||
continue
|
||||
# consumer.close()
|
||||
# return
|
||||
|
||||
# 2. 构建小写key的ns字典(完整映射所有字段)
|
||||
ns_msg = {
|
||||
'id': uuid(),
|
||||
'messageid': messageid,
|
||||
'taskid': taskid,
|
||||
'userid': msg_data.get('userId'),
|
||||
'accountid': msg_data.get('accountId'),
|
||||
'usertype': msg_data.get('userType'),
|
||||
'receiverid': msg_data.get('receiverId'),
|
||||
'contentvar': msg_data.get('contentVar'),
|
||||
'sendchannel': msg_data.get('sendChannel'),
|
||||
'content': msg_data.get('content'),
|
||||
'messagetemplateid': msg_data.get('messageTemplateId'),
|
||||
'isvirtualstore': msg_data.get('isVirtualStore'),
|
||||
'channelmessageid': msg_data.get('channelMessageId'),
|
||||
'channelstatus': msg_data.get('channelStatus'),
|
||||
'majorcategory': msg_data.get('majorCategory'),
|
||||
'minorcategory': msg_data.get('minorCategory'),
|
||||
'status': msg_data.get('status'),
|
||||
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
|
||||
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
|
||||
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
|
||||
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
|
||||
'valid': msg_data.get('valid'),
|
||||
'complete': msg_data.get('complete'),
|
||||
'disturbhold': msg_data.get('disturbHold'),
|
||||
'sendcomplete': msg_data.get('sendComplete')
|
||||
}
|
||||
|
||||
# 3. 执行存库
|
||||
await sor.C('baidu_kafka_msg', ns_msg)
|
||||
print(f"消息id {messageid} 存储成功")
|
||||
|
||||
# 4. 触发短信发送(当sendChannel为MOBILE时)
|
||||
send_channel = msg_data.get('sendChannel')
|
||||
if send_channel == 'MOBILE':
|
||||
msg_content = msg_data.get('content')
|
||||
|
||||
# 判断验证码类短信 | 通知类短信
|
||||
if '验证码' in msg_content:
|
||||
sms_stype = '百度kafka普通验证码'
|
||||
else:
|
||||
sms_stype = '百度kafka普通通知'
|
||||
|
||||
# 查询用户手机号
|
||||
account_id = msg_data.get('accountId')
|
||||
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
|
||||
if user_local_id_li:
|
||||
user_local_id = user_local_id_li[0]['user_id']
|
||||
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
|
||||
mobile = user_mobile_li[0]['mobile']
|
||||
|
||||
# 调用短信发送接口
|
||||
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
|
||||
if kyy_send_status.get('status'):
|
||||
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
|
||||
else:
|
||||
# 记录错误日志 短信发送失败
|
||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
|
||||
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
|
||||
else:
|
||||
# 记录错误日志 用户未找到
|
||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
|
||||
print(f"未找到百度用户ID {account_id} 对应的本地用户")
|
||||
|
||||
except json.JSONDecodeError:
|
||||
print("错误:消息内容非有效JSON")
|
||||
return {
|
||||
'status': False,
|
||||
'msg': '错误:消息内容非有效JSON'
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"处理异常: {str(e)}")
|
||||
return {
|
||||
'status': False,
|
||||
'msg': f"处理异常: {str(e)}"
|
||||
}
|
||||
# 记录total_count
|
||||
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||
# f.write(f"本次轮询共处理消息数:{total_count}\n")
|
||||
msg = ns.get('msg')
|
||||
|
||||
consumer.close() # 确保消费者关闭
|
||||
if msg.error():
|
||||
# 写入日志文件记录错误信息
|
||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
|
||||
else:
|
||||
try:
|
||||
# 解析消息内容为字典(避免变量名冲突)
|
||||
msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
||||
msg_data = msg_data_ori['messages'][0]
|
||||
messageid = msg_data.get('id')
|
||||
taskid = msg_data.get('taskId')
|
||||
|
||||
# 读取文件内容进行检查
|
||||
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
if messageid in content:
|
||||
print(f"文件中已存在 '{messageid}',跳过写入")
|
||||
return
|
||||
else:
|
||||
# 追加写入目标内容
|
||||
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(messageid + '\n')
|
||||
print(f"已写入 '{messageid}' 到文件")
|
||||
|
||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(str(msg_data) + '\n')
|
||||
|
||||
db = DBPools()
|
||||
async with db.sqlorContext('kboss') as sor:
|
||||
# 1. 去重检查(messageid和taskid)
|
||||
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
||||
if exist_msg:
|
||||
print(f"消息id {messageid} 已存在,跳过处理")
|
||||
return
|
||||
|
||||
# 2. 构建小写key的ns字典(完整映射所有字段)
|
||||
ns_msg = {
|
||||
'id': uuid(),
|
||||
'messageid': messageid,
|
||||
'taskid': taskid,
|
||||
'userid': msg_data.get('userId'),
|
||||
'accountid': msg_data.get('accountId'),
|
||||
'usertype': msg_data.get('userType'),
|
||||
'receiverid': msg_data.get('receiverId'),
|
||||
'contentvar': msg_data.get('contentVar'),
|
||||
'sendchannel': msg_data.get('sendChannel'),
|
||||
'content': msg_data.get('content'),
|
||||
'messagetemplateid': msg_data.get('messageTemplateId'),
|
||||
'isvirtualstore': msg_data.get('isVirtualStore'),
|
||||
'channelmessageid': msg_data.get('channelMessageId'),
|
||||
'channelstatus': msg_data.get('channelStatus'),
|
||||
'majorcategory': msg_data.get('majorCategory'),
|
||||
'minorcategory': msg_data.get('minorCategory'),
|
||||
'status': msg_data.get('status'),
|
||||
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
|
||||
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
|
||||
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
|
||||
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
|
||||
'valid': msg_data.get('valid'),
|
||||
'complete': msg_data.get('complete'),
|
||||
'disturbhold': msg_data.get('disturbHold'),
|
||||
'sendcomplete': msg_data.get('sendComplete')
|
||||
}
|
||||
|
||||
# 3. 执行存库
|
||||
await sor.C('baidu_kafka_msg', ns_msg)
|
||||
print(f"消息id {messageid} 存储成功")
|
||||
|
||||
# 4. 触发短信发送(当sendChannel为MOBILE时)
|
||||
send_channel = msg_data.get('sendChannel')
|
||||
if send_channel == 'MOBILE':
|
||||
msg_content = msg_data.get('content')
|
||||
|
||||
# 判断验证码类短信 | 通知类短信
|
||||
if '验证码' in msg_content:
|
||||
sms_stype = '百度kafka普通验证码'
|
||||
else:
|
||||
sms_stype = '百度kafka普通通知'
|
||||
|
||||
# 查询用户手机号
|
||||
account_id = msg_data.get('accountId')
|
||||
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
|
||||
if user_local_id_li:
|
||||
user_local_id = user_local_id_li[0]['user_id']
|
||||
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
|
||||
mobile = user_mobile_li[0]['mobile']
|
||||
|
||||
# 调用短信发送接口
|
||||
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
|
||||
if kyy_send_status.get('status'):
|
||||
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
|
||||
else:
|
||||
# 记录错误日志 短信发送失败
|
||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
|
||||
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
|
||||
else:
|
||||
# 记录错误日志 用户未找到
|
||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
|
||||
print(f"未找到百度用户ID {account_id} 对应的本地用户")
|
||||
|
||||
except json.JSONDecodeError:
|
||||
print("错误:消息内容非有效JSON")
|
||||
return {
|
||||
'status': False,
|
||||
'msg': '错误:消息内容非有效JSON'
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"处理异常: {str(e)}")
|
||||
import traceback
|
||||
with open('baidu_kafka_error.txt', 'w') as f:
|
||||
f.write(str(e)+ traceback.format_exc())
|
||||
traceback.print_exc()
|
||||
return {
|
||||
'status': False,
|
||||
'msg': f"处理异常: {str(e)}"
|
||||
}
|
||||
|
||||
# consumer.close() # 确保消费者关闭
|
||||
return {
|
||||
'status': True,
|
||||
'msg': '获取信息执行结束'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user