From 88b67eb5958f19237d40796362218450b4a6125a Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 14:47:37 +0800 Subject: [PATCH] update --- b/baiduc/baidu_sms_kafka_consumer.dspy | 76 ++++++++++++++++++-------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index 4181447..a03b978 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -6,6 +6,7 @@ async def time_convert(resoucetime=None): return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): + import os consumer = BaiduKafKaConsumer({ # 接入点 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', @@ -28,15 +29,35 @@ async def baidu_sms_kafka_consumer(ns={}): # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) - + + files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] + for filename in files: + if not os.path.exists(filename): + with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 + pass # 创建空文件 + else: + pass + + total_count = 0 for i in range(30): + if i == 0: + # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + msg = consumer.poll(0.1) # 单次轮询获取消息 if msg is None: + if i == 10: + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") continue elif msg.error(): - print(f"消费者错误: {msg.error()}") + # 写入日志文件记录错误信息 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") else: + total_count += 1 try: # 解析消息内容为字典(避免变量名冲突) msg_data_ori = json.loads(msg.value().decode('utf-8')) @@ -44,14 +65,8 @@ async def baidu_sms_kafka_consumer(ns={}): messageid = msg_data.get('id') taskid = msg_data.get('taskId') - filename = 'baidu_kafka_msg.txt' - # 检查文件是否存在,不存在则创建 - if not os.path.exists(filename): - with open(filename, 'w', encoding='utf-8') as f: - print(f"文件不存在,已创建 {filename}") - # 读取文件内容进行检查 - with open(filename, 'r', encoding='utf-8') as f: + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: content = f.read() if messageid in content: @@ -59,9 +74,12 @@ async def baidu_sms_kafka_consumer(ns={}): continue else: # 追加写入目标内容 - with open(filename, 'a', encoding='utf-8') as f: + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: f.write(messageid + '\n') print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') db = DBPools() async with db.sqlorContext('kboss') as sor: @@ -69,8 +87,9 @@ async def baidu_sms_kafka_consumer(ns={}): exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) if exist_msg: print(f"消息id {messageid} 已存在,跳过处理") - consumer.close() - return + continue + # consumer.close() + # return # 2. 构建小写key的ns字典(完整映射所有字段) ns_msg = { @@ -119,16 +138,26 @@ async def baidu_sms_kafka_consumer(ns={}): # 查询用户手机号 account_id = msg_data.get('accountId') user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - print(f"已触发短信发送至 {mobile}") + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) + else: + # 记录错误日志 短信发送失败 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + except json.JSONDecodeError: print("错误:消息内容非有效JSON") return { @@ -141,7 +170,10 @@ async def baidu_sms_kafka_consumer(ns={}): 'status': False, 'msg': f"处理异常: {str(e)}" } - + # 记录total_count + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"本次轮询共处理消息数:{total_count}\n") + consumer.close() # 确保消费者关闭 return { 'status': True,