async def time_convert(resoucetime=None): if not resoucetime: return utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): import os consumer = BaiduKafKaConsumer({ # 接入点 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', # 接入协议 'security.protocol': 'SASL_SSL', 'ssl.endpoint.identification.algorithm': 'none', # 证书文件路径 'ssl.ca.location': 'baidu_kafka_ca.pem', # SASL 机制 'sasl.mechanism': 'SCRAM-SHA-512', # SASL 用户名 'sasl.username': 'kaiyuanyun', # SASL 用户密码 'sasl.password': 'Kyy250609#', # 消费组id 'group.id': 'kaiyuanyun_msg_group', 'auto.offset.reset': 'latest', 'fetch.message.max.bytes': '1024*512', }) # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] for filename in files: if not os.path.exists(filename): with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 pass # 创建空文件 else: pass total_count = 0 for i in range(1): # if i == 0: # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") msg = consumer.poll(1) # 单次轮询获取消息 if msg is None: if i == 10: with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") continue elif msg.error(): # 写入日志文件记录错误信息 with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") else: total_count += 1 try: # 解析消息内容为字典(避免变量名冲突) msg_data_ori = json.loads(msg.value().decode('utf-8')) msg_data = msg_data_ori['messages'][0] messageid = msg_data.get('id') taskid = msg_data.get('taskId') # 读取文件内容进行检查 with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: content = f.read() if messageid in content: print(f"文件中已存在 '{messageid}',跳过写入") continue else: # 追加写入目标内容 with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: f.write(messageid + '\n') print(f"已写入 '{messageid}' 到文件") with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: f.write(str(msg_data) + '\n') db = DBPools() async with db.sqlorContext('kboss') as sor: # 1. 去重检查(messageid和taskid) exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) if exist_msg: print(f"消息id {messageid} 已存在,跳过处理") continue # consumer.close() # return # 2. 构建小写key的ns字典(完整映射所有字段) ns_msg = { 'id': uuid(), 'messageid': messageid, 'taskid': taskid, 'userid': msg_data.get('userId'), 'accountid': msg_data.get('accountId'), 'usertype': msg_data.get('userType'), 'receiverid': msg_data.get('receiverId'), 'contentvar': msg_data.get('contentVar'), 'sendchannel': msg_data.get('sendChannel'), 'content': msg_data.get('content'), 'messagetemplateid': msg_data.get('messageTemplateId'), 'isvirtualstore': msg_data.get('isVirtualStore'), 'channelmessageid': msg_data.get('channelMessageId'), 'channelstatus': msg_data.get('channelStatus'), 'majorcategory': msg_data.get('majorCategory'), 'minorcategory': msg_data.get('minorCategory'), 'status': msg_data.get('status'), 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, 'valid': msg_data.get('valid'), 'complete': msg_data.get('complete'), 'disturbhold': msg_data.get('disturbHold'), 'sendcomplete': msg_data.get('sendComplete') } # 3. 执行存库 await sor.C('baidu_kafka_msg', ns_msg) print(f"消息id {messageid} 存储成功") # 4. 触发短信发送(当sendChannel为MOBILE时) send_channel = msg_data.get('sendChannel') if send_channel == 'MOBILE': msg_content = msg_data.get('content') # 判断验证码类短信 | 通知类短信 if '验证码' in msg_content: sms_stype = '百度kafka普通验证码' else: sms_stype = '百度kafka普通通知' # 查询用户手机号 account_id = msg_data.get('accountId') user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) if user_local_id_li: user_local_id = user_local_id_li[0]['user_id'] user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) mobile = user_mobile_li[0]['mobile'] # 调用短信发送接口 kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) if kyy_send_status.get('status'): await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) else: # 记录错误日志 短信发送失败 with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") else: # 记录错误日志 用户未找到 with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") print(f"未找到百度用户ID {account_id} 对应的本地用户") except json.JSONDecodeError: print("错误:消息内容非有效JSON") return { 'status': False, 'msg': '错误:消息内容非有效JSON' } except Exception as e: print(f"处理异常: {str(e)}") return { 'status': False, 'msg': f"处理异常: {str(e)}" } # 记录total_count # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: # f.write(f"本次轮询共处理消息数:{total_count}\n") consumer.close() # 确保消费者关闭 return { 'status': True, 'msg': '获取信息执行结束' } ret = await baidu_sms_kafka_consumer(params_kw) return ret