This commit is contained in:
ping 2025-11-21 16:15:13 +08:00
parent 6045e8ec76
commit 9cbda277fb
2 changed files with 120 additions and 114 deletions

View File

@ -41,125 +41,126 @@ async def baidu_sms_kafka_consumer(ns={}):
msg = ns.get('msg') msg = ns.get('msg')
if msg.error(): # if msg.error():
# 写入日志文件记录错误信息 # # 写入日志文件记录错误信息
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: # with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") # f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else: # else:
try: try:
# 解析消息内容为字典(避免变量名冲突) # 解析消息内容为字典(避免变量名冲突)
msg_data_ori = json.loads(msg.value().decode('utf-8')) # msg_data_ori = json.loads(msg.value().decode('utf-8'))
msg_data = msg_data_ori['messages'][0] msg_data_ori = json.loads(msg)
messageid = msg_data.get('id') msg_data = msg_data_ori['messages'][0]
taskid = msg_data.get('taskId') messageid = msg_data.get('id')
taskid = msg_data.get('taskId')
# 读取文件内容进行检查 # 读取文件内容进行检查
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
if messageid in content: if messageid in content:
print(f"文件中已存在 '{messageid}',跳过写入") print(f"文件中已存在 '{messageid}',跳过写入")
return
else:
# 追加写入目标内容
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
f.write(messageid + '\n')
print(f"已写入 '{messageid}' 到文件")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# 1. 去重检查messageid和taskid
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理")
return return
else:
# 追加写入目标内容
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
f.write(messageid + '\n')
print(f"已写入 '{messageid}' 到文件")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools() # 2. 构建小写key的ns字典完整映射所有字段
async with db.sqlorContext('kboss') as sor: ns_msg = {
# 1. 去重检查messageid和taskid 'id': uuid(),
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) 'messageid': messageid,
if exist_msg: 'taskid': taskid,
print(f"消息id {messageid} 已存在,跳过处理") 'userid': msg_data.get('userId'),
return 'accountid': msg_data.get('accountId'),
'usertype': msg_data.get('userType'),
'receiverid': msg_data.get('receiverId'),
'contentvar': msg_data.get('contentVar'),
'sendchannel': msg_data.get('sendChannel'),
'content': msg_data.get('content'),
'messagetemplateid': msg_data.get('messageTemplateId'),
'isvirtualstore': msg_data.get('isVirtualStore'),
'channelmessageid': msg_data.get('channelMessageId'),
'channelstatus': msg_data.get('channelStatus'),
'majorcategory': msg_data.get('majorCategory'),
'minorcategory': msg_data.get('minorCategory'),
'status': msg_data.get('status'),
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
'valid': msg_data.get('valid'),
'complete': msg_data.get('complete'),
'disturbhold': msg_data.get('disturbHold'),
'sendcomplete': msg_data.get('sendComplete')
}
# 2. 构建小写key的ns字典完整映射所有字段 # 3. 执行存库
ns_msg = { await sor.C('baidu_kafka_msg', ns_msg)
'id': uuid(), print(f"消息id {messageid} 存储成功")
'messageid': messageid,
'taskid': taskid,
'userid': msg_data.get('userId'),
'accountid': msg_data.get('accountId'),
'usertype': msg_data.get('userType'),
'receiverid': msg_data.get('receiverId'),
'contentvar': msg_data.get('contentVar'),
'sendchannel': msg_data.get('sendChannel'),
'content': msg_data.get('content'),
'messagetemplateid': msg_data.get('messageTemplateId'),
'isvirtualstore': msg_data.get('isVirtualStore'),
'channelmessageid': msg_data.get('channelMessageId'),
'channelstatus': msg_data.get('channelStatus'),
'majorcategory': msg_data.get('majorCategory'),
'minorcategory': msg_data.get('minorCategory'),
'status': msg_data.get('status'),
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
'valid': msg_data.get('valid'),
'complete': msg_data.get('complete'),
'disturbhold': msg_data.get('disturbHold'),
'sendcomplete': msg_data.get('sendComplete')
}
# 3. 执行存库 # 4. 触发短信发送当sendChannel为MOBILE时
await sor.C('baidu_kafka_msg', ns_msg) send_channel = msg_data.get('sendChannel')
print(f"消息id {messageid} 存储成功") if send_channel == 'MOBILE':
msg_content = msg_data.get('content')
# 4. 触发短信发送当sendChannel为MOBILE时 # 判断验证码类短信 | 通知类短信
send_channel = msg_data.get('sendChannel') if '验证码' in msg_content:
if send_channel == 'MOBILE': sms_stype = '百度kafka普通验证码'
msg_content = msg_data.get('content') else:
sms_stype = '百度kafka普通通知'
# 判断验证码类短信 | 通知类短信 # 查询用户手机号
if '验证码' in msg_content: account_id = msg_data.get('accountId')
sms_stype = '百度kafka普通验证码' user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else: else:
sms_stype = '百度kafka普通通知' # 记录错误日志 短信发送失败
# 查询用户手机号
account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户") print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
except json.JSONDecodeError: # 记录错误日志 用户未找到
print("错误消息内容非有效JSON") with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
return { f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
'status': False, print(f"未找到百度用户ID {account_id} 对应的本地用户")
'msg': '错误消息内容非有效JSON'
} except json.JSONDecodeError:
except Exception as e: print("错误消息内容非有效JSON")
print(f"处理异常: {str(e)}") return {
import traceback 'status': False,
with open('baidu_kafka_error.txt', 'w') as f: 'msg': '错误消息内容非有效JSON'
f.write(str(e)+ traceback.format_exc()) }
traceback.print_exc() except Exception as e:
return { print(f"处理异常: {str(e)}")
'status': False, import traceback
'msg': f"处理异常: {str(e)}" with open('baidu_kafka_error.txt', 'w') as f:
} f.write(str(e)+ traceback.format_exc())
traceback.print_exc()
return {
'status': False,
'msg': f"处理异常: {str(e)}"
}
# consumer.close() # 确保消费者关闭 # consumer.close() # 确保消费者关闭
return { return {

View File

@ -100,7 +100,7 @@ async def baidu_sms_kafka_consumer(ns={}):
'security.protocol': 'SASL_SSL', 'security.protocol': 'SASL_SSL',
'ssl.endpoint.identification.algorithm': 'none', 'ssl.endpoint.identification.algorithm': 'none',
# 证书文件路径 # 证书文件路径
'ssl.ca.location': 'baidu_kafka_ca.pem', 'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem',
# SASL 机制 # SASL 机制
'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.mechanism': 'SCRAM-SHA-512',
# SASL 用户名 # SASL 用户名
@ -124,21 +124,23 @@ async def baidu_sms_kafka_consumer(ns={}):
pass pass
else: else:
try: try:
# print('Received message: {}'.format(msg.value().decode('utf-8')))
method = "POST" method = "POST"
url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy' url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy'
header = { header = {
"Host": "www.opencomputing.cn", "Host": "www.opencomputing.cn",
"Content-Type": "application/json" # "Content-Type": "application/json"
} }
data = { data = {
'msg': msg 'msg': msg.value().decode('utf-8')
} }
async with aiohttp_client.request( async with aiohttp_client.request(
method=method, method=method,
url=url, url=url,
headers=header, headers=header,
json=data) as res: data=data) as res:
data_ = await res.json() data_ = await res.text()
# print("Response data:", data_)
except json.JSONDecodeError: except json.JSONDecodeError:
return { return {
@ -146,6 +148,9 @@ async def baidu_sms_kafka_consumer(ns={}):
'msg': '错误消息内容非有效JSON' 'msg': '错误消息内容非有效JSON'
} }
except Exception as e: except Exception as e:
import traceback
print(str(e)+ traceback.format_exc())
traceback.print_exc()
return { return {
'status': False, 'status': False,
'msg': f"处理异常: {str(e)}" 'msg': f"处理异常: {str(e)}"