From 9cbda277fb6e27e596b3f9a3507dcbbc9e1a9a33 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 16:15:13 +0800 Subject: [PATCH] update --- b/baiduc/baidu_sms_kafka_consumer.dspy | 219 ++++++++++++------------ kgadget/src/baidu_sms_kafka_consumer.py | 15 +- 2 files changed, 120 insertions(+), 114 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index 74271e0..dbefa7c 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -41,125 +41,126 @@ async def baidu_sms_kafka_consumer(ns={}): msg = ns.get('msg') - if msg.error(): - # 写入日志文件记录错误信息 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") - else: - try: - # 解析消息内容为字典(避免变量名冲突) - msg_data_ori = json.loads(msg.value().decode('utf-8')) - msg_data = msg_data_ori['messages'][0] - messageid = msg_data.get('id') - taskid = msg_data.get('taskId') + # if msg.error(): + # # 写入日志文件记录错误信息 + # with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + # f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") + # else: + try: + # 解析消息内容为字典(避免变量名冲突) + # msg_data_ori = json.loads(msg.value().decode('utf-8')) + msg_data_ori = json.loads(msg) + msg_data = msg_data_ori['messages'][0] + messageid = msg_data.get('id') + taskid = msg_data.get('taskId') - # 读取文件内容进行检查 - with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: - content = f.read() + # 读取文件内容进行检查 + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: + content = f.read() - if messageid in content: - print(f"文件中已存在 '{messageid}',跳过写入") + if messageid in content: + print(f"文件中已存在 '{messageid}',跳过写入") + return + else: + # 追加写入目标内容 + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: + f.write(messageid + '\n') + print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + # 1. 去重检查(messageid和taskid) + exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) + if exist_msg: + print(f"消息id {messageid} 已存在,跳过处理") return - else: - # 追加写入目标内容 - with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: - f.write(messageid + '\n') - print(f"已写入 '{messageid}' 到文件") - - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(str(msg_data) + '\n') - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # 1. 去重检查(messageid和taskid) - exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) - if exist_msg: - print(f"消息id {messageid} 已存在,跳过处理") - return + # 2. 构建小写key的ns字典(完整映射所有字段) + ns_msg = { + 'id': uuid(), + 'messageid': messageid, + 'taskid': taskid, + 'userid': msg_data.get('userId'), + 'accountid': msg_data.get('accountId'), + 'usertype': msg_data.get('userType'), + 'receiverid': msg_data.get('receiverId'), + 'contentvar': msg_data.get('contentVar'), + 'sendchannel': msg_data.get('sendChannel'), + 'content': msg_data.get('content'), + 'messagetemplateid': msg_data.get('messageTemplateId'), + 'isvirtualstore': msg_data.get('isVirtualStore'), + 'channelmessageid': msg_data.get('channelMessageId'), + 'channelstatus': msg_data.get('channelStatus'), + 'majorcategory': msg_data.get('majorCategory'), + 'minorcategory': msg_data.get('minorCategory'), + 'status': msg_data.get('status'), + 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, + 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, + 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, + 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, + 'valid': msg_data.get('valid'), + 'complete': msg_data.get('complete'), + 'disturbhold': msg_data.get('disturbHold'), + 'sendcomplete': msg_data.get('sendComplete') + } - # 2. 构建小写key的ns字典(完整映射所有字段) - ns_msg = { - 'id': uuid(), - 'messageid': messageid, - 'taskid': taskid, - 'userid': msg_data.get('userId'), - 'accountid': msg_data.get('accountId'), - 'usertype': msg_data.get('userType'), - 'receiverid': msg_data.get('receiverId'), - 'contentvar': msg_data.get('contentVar'), - 'sendchannel': msg_data.get('sendChannel'), - 'content': msg_data.get('content'), - 'messagetemplateid': msg_data.get('messageTemplateId'), - 'isvirtualstore': msg_data.get('isVirtualStore'), - 'channelmessageid': msg_data.get('channelMessageId'), - 'channelstatus': msg_data.get('channelStatus'), - 'majorcategory': msg_data.get('majorCategory'), - 'minorcategory': msg_data.get('minorCategory'), - 'status': msg_data.get('status'), - 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, - 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, - 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, - 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, - 'valid': msg_data.get('valid'), - 'complete': msg_data.get('complete'), - 'disturbhold': msg_data.get('disturbHold'), - 'sendcomplete': msg_data.get('sendComplete') - } + # 3. 执行存库 + await sor.C('baidu_kafka_msg', ns_msg) + print(f"消息id {messageid} 存储成功") - # 3. 执行存库 - await sor.C('baidu_kafka_msg', ns_msg) - print(f"消息id {messageid} 存储成功") + # 4. 触发短信发送(当sendChannel为MOBILE时) + send_channel = msg_data.get('sendChannel') + if send_channel == 'MOBILE': + msg_content = msg_data.get('content') - # 4. 触发短信发送(当sendChannel为MOBILE时) - send_channel = msg_data.get('sendChannel') - if send_channel == 'MOBILE': - msg_content = msg_data.get('content') + # 判断验证码类短信 | 通知类短信 + if '验证码' in msg_content: + sms_stype = '百度kafka普通验证码' + else: + sms_stype = '百度kafka普通通知' - # 判断验证码类短信 | 通知类短信 - if '验证码' in msg_content: - sms_stype = '百度kafka普通验证码' + # 查询用户手机号 + account_id = msg_data.get('accountId') + user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) else: - sms_stype = '百度kafka普通通知' - - # 查询用户手机号 - account_id = msg_data.get('accountId') - user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - if user_local_id_li: - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - else: - # 记录错误日志 短信发送失败 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") - print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") - else: - # 记录错误日志 用户未找到 + # 记录错误日志 短信发送失败 with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") - print(f"未找到百度用户ID {account_id} 对应的本地用户") - - except json.JSONDecodeError: - print("错误:消息内容非有效JSON") - return { - 'status': False, - 'msg': '错误:消息内容非有效JSON' - } - except Exception as e: - print(f"处理异常: {str(e)}") - import traceback - with open('baidu_kafka_error.txt', 'w') as f: - f.write(str(e)+ traceback.format_exc()) - traceback.print_exc() - return { - 'status': False, - 'msg': f"处理异常: {str(e)}" - } + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + + except json.JSONDecodeError: + print("错误:消息内容非有效JSON") + return { + 'status': False, + 'msg': '错误:消息内容非有效JSON' + } + except Exception as e: + print(f"处理异常: {str(e)}") + import traceback + with open('baidu_kafka_error.txt', 'w') as f: + f.write(str(e)+ traceback.format_exc()) + traceback.print_exc() + return { + 'status': False, + 'msg': f"处理异常: {str(e)}" + } # consumer.close() # 确保消费者关闭 return { diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 399c421..03c6d48 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -100,7 +100,7 @@ async def baidu_sms_kafka_consumer(ns={}): 'security.protocol': 'SASL_SSL', 'ssl.endpoint.identification.algorithm': 'none', # 证书文件路径 - 'ssl.ca.location': 'baidu_kafka_ca.pem', + 'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem', # SASL 机制 'sasl.mechanism': 'SCRAM-SHA-512', # SASL 用户名 @@ -124,21 +124,23 @@ async def baidu_sms_kafka_consumer(ns={}): pass else: try: + # print('Received message: {}'.format(msg.value().decode('utf-8'))) method = "POST" url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy' header = { "Host": "www.opencomputing.cn", - "Content-Type": "application/json" + # "Content-Type": "application/json" } data = { - 'msg': msg + 'msg': msg.value().decode('utf-8') } async with aiohttp_client.request( method=method, url=url, headers=header, - json=data) as res: - data_ = await res.json() + data=data) as res: + data_ = await res.text() + # print("Response data:", data_) except json.JSONDecodeError: return { @@ -146,6 +148,9 @@ async def baidu_sms_kafka_consumer(ns={}): 'msg': '错误:消息内容非有效JSON' } except Exception as e: + import traceback + print(str(e)+ traceback.format_exc()) + traceback.print_exc() return { 'status': False, 'msg': f"处理异常: {str(e)}"