diff --git a/baidu_kafka_error.txt b/baidu_kafka_error.txt deleted file mode 100644 index c823653..0000000 --- a/baidu_kafka_error.txt +++ /dev/null @@ -1,2 +0,0 @@ -2025-11-20 17:24:21 - 错误: 未找到百度用户ID 2529cb85e30a4d51a348e71edf5fa947 对应的本地用户 -2025-11-20 17:28:43 - 错误: 未找到百度用户ID 2529cb85e30a4d51a348e71edf5fa947 对应的本地用户 diff --git a/baidu_kafka_id.txt b/baidu_kafka_id.txt deleted file mode 100644 index 65d7838..0000000 --- a/baidu_kafka_id.txt +++ /dev/null @@ -1,2 +0,0 @@ -Message_fabaab3c68a6436cbfe483634b77bcda -Message_fb0d466fd8444a1987ac5bab8d7d0250 diff --git a/baidu_kafka_msg.txt b/baidu_kafka_msg.txt deleted file mode 100644 index f9903a3..0000000 --- a/baidu_kafka_msg.txt +++ /dev/null @@ -1,17 +0,0 @@ -轮询开始时间:2025-11-20 17:18:31 -轮询开始时间:2025-11-20 17:24:18 -{'id': 'Message_fabaab3c68a6436cbfe483634b77bcda', 'taskId': 'Task_a97448bf43af415c861f4655e3f6927a', 'userId': '2529cb85e30a4d51a348e71edf5fa947', 'accountId': '2529cb85e30a4d51a348e71edf5fa947', 'userType': 'account', 'receiverId': '13261188523', 'contentVar': '', 'sendChannel': 'MOBILE', 'content': '验证码为:925124,感谢您使用百度云服务,请填写完整完成验证。', 'messageTemplateId': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'messageTemplate': {'id': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'name': 'IAM验证码通知', 'tag': 'iam', 'product': '', 'type': 'CUSTOM', 'sendChannel': 'MOBILE', 'locale': 'zh-CN', 'taskTemplateId': 'ffc54482-3528-41f8-85d8-5ad44c9e53d7', 'channelTemplateId': '', 'parentId': '', 'content': '验证码为:${code},感谢您使用百度云服务,请填写完整完成验证。', 'contentType': 'TEXT', 'sign': '', 'createTime': '2017-12-07T17:44:17Z', 'createBy': 'UUAP:dengxiaochao', 'updateTime': '2025-11-18T18:18:58Z', 'updateBy': 'UUAP:dengxiaochao'}, 'isVirtualStore': True, 'channelMessageId': None, 'channelStatus': None, 'majorCategory': '03', 'minorCategory': '03002', 'status': 'NEW', 'createTime': '2025-11-20T09:22:32Z', 'updateTime': None, 'expireTime': None, 'windowTime': None, 'valid': True, 'sendComplete': False, 'disturbHold': False, 'complete': False} -轮询开始时间:2025-11-20 17:25:45 -轮询开始时间:2025-11-20 17:25:54 -轮询开始时间:2025-11-20 17:26:07 -轮询开始时间:2025-11-20 17:26:17 -轮询开始时间:2025-11-20 17:26:26 -轮询开始时间:2025-11-20 17:26:36 -轮询开始时间:2025-11-20 17:27:38 -轮询开始时间:2025-11-20 17:27:46 -轮询开始时间:2025-11-20 17:27:55 -轮询开始时间:2025-11-20 17:28:04 -轮询开始时间:2025-11-20 17:28:21 -轮询开始时间:2025-11-20 17:28:30 -轮询开始时间:2025-11-20 17:28:38 -{'id': 'Message_fb0d466fd8444a1987ac5bab8d7d0250', 'taskId': 'Task_a7cc98a04bf74ffe97d36a34008b66e3', 'userId': '2529cb85e30a4d51a348e71edf5fa947', 'accountId': '2529cb85e30a4d51a348e71edf5fa947', 'userType': 'account', 'receiverId': '13261188523', 'contentVar': '', 'sendChannel': 'MOBILE', 'content': '验证码为:925124,感谢您使用百度云服务,请填写完整完成验证。', 'messageTemplateId': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'messageTemplate': {'id': 'MOBILE_6720338bf9c04712933f50a2630fbba9', 'name': 'IAM验证码通知', 'tag': 'iam', 'product': '', 'type': 'CUSTOM', 'sendChannel': 'MOBILE', 'locale': 'zh-CN', 'taskTemplateId': 'ffc54482-3528-41f8-85d8-5ad44c9e53d7', 'channelTemplateId': '', 'parentId': '', 'content': '验证码为:${code},感谢您使用百度云服务,请填写完整完成验证。', 'contentType': 'TEXT', 'sign': '', 'createTime': '2017-12-07T17:44:17Z', 'createBy': 'UUAP:dengxiaochao', 'updateTime': '2025-11-18T18:18:58Z', 'updateBy': 'UUAP:dengxiaochao'}, 'isVirtualStore': True, 'channelMessageId': None, 'channelStatus': None, 'majorCategory': '03', 'minorCategory': '03002', 'status': 'NEW', 'createTime': '2025-11-20T09:28:15Z', 'updateTime': None, 'expireTime': None, 'windowTime': None, 'valid': True, 'sendComplete': False, 'disturbHold': False, 'complete': False} diff --git a/crontab/crontab.txt b/crontab/crontab.txt index 20faac3..d19fa41 100644 --- a/crontab/crontab.txt +++ b/crontab/crontab.txt @@ -34,4 +34,4 @@ */1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1 * * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1 #* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1 -*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py +#*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 3f28bd4..9e968e0 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -90,6 +90,7 @@ async def time_convert(resoucetime=None): return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): + import os consumer = BaiduKafKaConsumer({ # 接入点 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', @@ -112,15 +113,36 @@ async def baidu_sms_kafka_consumer(ns={}): # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) - + + files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] + for filename in files: + if not os.path.exists(filename): + with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 + pass # 创建空文件 + else: + pass + + # total_count = 0 while True: - msg = consumer.poll(1) # 单次轮询获取消息 + i = 0 + # if i == 0: + # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS + # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + + msg = consumer.poll(0.1) # 单次轮询获取消息 if msg is None: + if i == 10: + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") continue elif msg.error(): - print(f"消费者错误: {msg.error()}") + # 写入日志文件记录错误信息 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") else: + # total_count += 1 try: # 解析消息内容为字典(避免变量名冲突) msg_data_ori = json.loads(msg.value().decode('utf-8')) @@ -128,14 +150,8 @@ async def baidu_sms_kafka_consumer(ns={}): messageid = msg_data.get('id') taskid = msg_data.get('taskId') - filename = 'baidu_kafka_msg.txt' - # 检查文件是否存在,不存在则创建 - if not os.path.exists(filename): - with open(filename, 'w', encoding='utf-8') as f: - print(f"文件不存在,已创建 {filename}") - # 读取文件内容进行检查 - with open(filename, 'r', encoding='utf-8') as f: + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: content = f.read() if messageid in content: @@ -143,9 +159,12 @@ async def baidu_sms_kafka_consumer(ns={}): continue else: # 追加写入目标内容 - with open(filename, 'a', encoding='utf-8') as f: + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: f.write(messageid + '\n') print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') db = DBPools() async with db.sqlorContext('kboss') as sor: @@ -153,8 +172,9 @@ async def baidu_sms_kafka_consumer(ns={}): exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) if exist_msg: print(f"消息id {messageid} 已存在,跳过处理") - consumer.close() - return + continue + # consumer.close() + # return # 2. 构建小写key的ns字典(完整映射所有字段) ns_msg = { @@ -187,7 +207,7 @@ async def baidu_sms_kafka_consumer(ns={}): # 3. 执行存库 await sor.C('baidu_kafka_msg', ns_msg) - print(f"消息id {messageid} 存储成功") + # print(f"消息id {messageid} 存储成功") # 4. 触发短信发送(当sendChannel为MOBILE时) send_channel = msg_data.get('sendChannel') @@ -203,16 +223,26 @@ async def baidu_sms_kafka_consumer(ns={}): # 查询用户手机号 account_id = msg_data.get('accountId') user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - print(f"已触发短信发送至 {mobile}") + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) + else: + # 记录错误日志 短信发送失败 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + except json.JSONDecodeError: print("错误:消息内容非有效JSON") return { @@ -225,7 +255,10 @@ async def baidu_sms_kafka_consumer(ns={}): 'status': False, 'msg': f"处理异常: {str(e)}" } - + # 记录total_count + # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + # f.write(f"本次轮询共处理消息数:{total_count}\n") + consumer.close() # 确保消费者关闭 return { 'status': True,