From fcbbb6056faf7f0bda59579af4f0f95f5d24e1f1 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 15:25:42 +0800 Subject: [PATCH] =?UTF-8?q?=E7=99=BE=E5=BA=A6kafka=20=E6=8E=A5=E6=94=B6?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- b/baiduc/baidu_sms_kafka_consumer.dspy | 303 ++++++++++++------------- 1 file changed, 145 insertions(+), 158 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index d73988d..74271e0 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -6,29 +6,30 @@ async def time_convert(resoucetime=None): return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): - import os - consumer = BaiduKafKaConsumer({ - # 接入点 - 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', - # 接入协议 - 'security.protocol': 'SASL_SSL', - 'ssl.endpoint.identification.algorithm': 'none', - # 证书文件路径 - 'ssl.ca.location': 'baidu_kafka_ca.pem', - # SASL 机制 - 'sasl.mechanism': 'SCRAM-SHA-512', - # SASL 用户名 - 'sasl.username': 'kaiyuanyun', - # SASL 用户密码 - 'sasl.password': 'Kyy250609#', - # 消费组id - 'group.id': 'kaiyuanyun_msg_group', - 'auto.offset.reset': 'latest', - 'fetch.message.max.bytes': '1024*512', - }) + # consumer = BaiduKafKaConsumer({ + # # 接入点 + # 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', + # # 接入协议 + # 'security.protocol': 'SASL_SSL', + # 'ssl.endpoint.identification.algorithm': 'none', + # # 证书文件路径 + # 'ssl.ca.location': 'baidu_kafka_ca.pem', + # # SASL 机制 + # 'sasl.mechanism': 'SCRAM-SHA-512', + # # SASL 用户名 + # 'sasl.username': 'kaiyuanyun', + # # SASL 用户密码 + # 'sasl.password': 'Kyy250609#', + # # 消费组id + # 'group.id': 'kaiyuanyun_msg_group', + # 'auto.offset.reset': 'latest', + # 'fetch.message.max.bytes': '1024*512', + # }) - # 订阅的主题名称 - consumer.subscribe(['kaiyuanyun_msg_topic']) + # # 订阅的主题名称 + # consumer.subscribe(['kaiyuanyun_msg_topic']) + + import os files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] for filename in files: @@ -38,143 +39,129 @@ async def baidu_sms_kafka_consumer(ns={}): else: pass - total_count = 0 - for i in range(10): - # if i == 0: - # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS - # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - - msg = consumer.poll(0.01) # 单次轮询获取消息 - - if msg is None: - if i == 10: - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - continue - elif msg.error(): - # 写入日志文件记录错误信息 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") - else: - total_count += 1 - try: - # 解析消息内容为字典(避免变量名冲突) - msg_data_ori = json.loads(msg.value().decode('utf-8')) - msg_data = msg_data_ori['messages'][0] - messageid = msg_data.get('id') - taskid = msg_data.get('taskId') - - # 读取文件内容进行检查 - with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: - content = f.read() - - if messageid in content: - print(f"文件中已存在 '{messageid}',跳过写入") - continue - else: - # 追加写入目标内容 - with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: - f.write(messageid + '\n') - print(f"已写入 '{messageid}' 到文件") - - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(str(msg_data) + '\n') - - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # 1. 去重检查(messageid和taskid) - exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) - if exist_msg: - print(f"消息id {messageid} 已存在,跳过处理") - continue - # consumer.close() - # return - - # 2. 构建小写key的ns字典(完整映射所有字段) - ns_msg = { - 'id': uuid(), - 'messageid': messageid, - 'taskid': taskid, - 'userid': msg_data.get('userId'), - 'accountid': msg_data.get('accountId'), - 'usertype': msg_data.get('userType'), - 'receiverid': msg_data.get('receiverId'), - 'contentvar': msg_data.get('contentVar'), - 'sendchannel': msg_data.get('sendChannel'), - 'content': msg_data.get('content'), - 'messagetemplateid': msg_data.get('messageTemplateId'), - 'isvirtualstore': msg_data.get('isVirtualStore'), - 'channelmessageid': msg_data.get('channelMessageId'), - 'channelstatus': msg_data.get('channelStatus'), - 'majorcategory': msg_data.get('majorCategory'), - 'minorcategory': msg_data.get('minorCategory'), - 'status': msg_data.get('status'), - 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, - 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, - 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, - 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, - 'valid': msg_data.get('valid'), - 'complete': msg_data.get('complete'), - 'disturbhold': msg_data.get('disturbHold'), - 'sendcomplete': msg_data.get('sendComplete') - } - - # 3. 执行存库 - await sor.C('baidu_kafka_msg', ns_msg) - print(f"消息id {messageid} 存储成功") - - # 4. 触发短信发送(当sendChannel为MOBILE时) - send_channel = msg_data.get('sendChannel') - if send_channel == 'MOBILE': - msg_content = msg_data.get('content') - - # 判断验证码类短信 | 通知类短信 - if '验证码' in msg_content: - sms_stype = '百度kafka普通验证码' - else: - sms_stype = '百度kafka普通通知' - - # 查询用户手机号 - account_id = msg_data.get('accountId') - user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - if user_local_id_li: - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - else: - # 记录错误日志 短信发送失败 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") - print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") - else: - # 记录错误日志 用户未找到 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") - print(f"未找到百度用户ID {account_id} 对应的本地用户") - - except json.JSONDecodeError: - print("错误:消息内容非有效JSON") - return { - 'status': False, - 'msg': '错误:消息内容非有效JSON' - } - except Exception as e: - print(f"处理异常: {str(e)}") - return { - 'status': False, - 'msg': f"处理异常: {str(e)}" - } - # 记录total_count - # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - # f.write(f"本次轮询共处理消息数:{total_count}\n") + msg = ns.get('msg') - consumer.close() # 确保消费者关闭 + if msg.error(): + # 写入日志文件记录错误信息 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") + else: + try: + # 解析消息内容为字典(避免变量名冲突) + msg_data_ori = json.loads(msg.value().decode('utf-8')) + msg_data = msg_data_ori['messages'][0] + messageid = msg_data.get('id') + taskid = msg_data.get('taskId') + + # 读取文件内容进行检查 + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: + content = f.read() + + if messageid in content: + print(f"文件中已存在 '{messageid}',跳过写入") + return + else: + # 追加写入目标内容 + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: + f.write(messageid + '\n') + print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + # 1. 去重检查(messageid和taskid) + exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) + if exist_msg: + print(f"消息id {messageid} 已存在,跳过处理") + return + + # 2. 构建小写key的ns字典(完整映射所有字段) + ns_msg = { + 'id': uuid(), + 'messageid': messageid, + 'taskid': taskid, + 'userid': msg_data.get('userId'), + 'accountid': msg_data.get('accountId'), + 'usertype': msg_data.get('userType'), + 'receiverid': msg_data.get('receiverId'), + 'contentvar': msg_data.get('contentVar'), + 'sendchannel': msg_data.get('sendChannel'), + 'content': msg_data.get('content'), + 'messagetemplateid': msg_data.get('messageTemplateId'), + 'isvirtualstore': msg_data.get('isVirtualStore'), + 'channelmessageid': msg_data.get('channelMessageId'), + 'channelstatus': msg_data.get('channelStatus'), + 'majorcategory': msg_data.get('majorCategory'), + 'minorcategory': msg_data.get('minorCategory'), + 'status': msg_data.get('status'), + 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, + 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, + 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, + 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, + 'valid': msg_data.get('valid'), + 'complete': msg_data.get('complete'), + 'disturbhold': msg_data.get('disturbHold'), + 'sendcomplete': msg_data.get('sendComplete') + } + + # 3. 执行存库 + await sor.C('baidu_kafka_msg', ns_msg) + print(f"消息id {messageid} 存储成功") + + # 4. 触发短信发送(当sendChannel为MOBILE时) + send_channel = msg_data.get('sendChannel') + if send_channel == 'MOBILE': + msg_content = msg_data.get('content') + + # 判断验证码类短信 | 通知类短信 + if '验证码' in msg_content: + sms_stype = '百度kafka普通验证码' + else: + sms_stype = '百度kafka普通通知' + + # 查询用户手机号 + account_id = msg_data.get('accountId') + user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) + else: + # 记录错误日志 短信发送失败 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + + except json.JSONDecodeError: + print("错误:消息内容非有效JSON") + return { + 'status': False, + 'msg': '错误:消息内容非有效JSON' + } + except Exception as e: + print(f"处理异常: {str(e)}") + import traceback + with open('baidu_kafka_error.txt', 'w') as f: + f.write(str(e)+ traceback.format_exc()) + traceback.print_exc() + return { + 'status': False, + 'msg': f"处理异常: {str(e)}" + } + + # consumer.close() # 确保消费者关闭 return { 'status': True, 'msg': '获取信息执行结束'