From 21e9e9b08bf6046ce5cff21f32653bef21caae7f Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 09:26:50 +0800 Subject: [PATCH 01/10] update --- kgadget/src/baidu_sms_kafka_consumer.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 90d2153..fbc1202 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -110,7 +110,7 @@ async def baidu_sms_kafka_consumer(ns={}): 'auto.offset.reset': 'latest', 'fetch.message.max.bytes': '1024*512', }) - + print('开始创建文件夹') # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) @@ -125,10 +125,10 @@ async def baidu_sms_kafka_consumer(ns={}): # total_count = 0 while True: i = 0 - if i == 0: - # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + # if i == 0: + # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS + # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") msg = consumer.poll(0.1) # 单次轮询获取消息 @@ -251,6 +251,10 @@ async def baidu_sms_kafka_consumer(ns={}): } except Exception as e: print(f"处理异常: {str(e)}") + import traceback + with open('baidu_kafka_error.txt', 'w') as f: + f.write(str(e)+ traceback.format_exc()) + traceback.print_exc() return { 'status': False, 'msg': f"处理异常: {str(e)}" @@ -272,5 +276,6 @@ if __name__ == '__main__': two_levels_up = '/'.join(path_parts[:-2]) config = getConfig(two_levels_up) DBPools(config.databases) + print(config.databases) loop = asyncio.get_event_loop() print(loop.run_until_complete(baidu_sms_kafka_consumer())) From e9b0e01a5337c92a94369f50ba0fd40517d6cd73 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 15:12:47 +0800 Subject: [PATCH 02/10] =?UTF-8?q?baidu=5Fsms=5Fkafka=5Fconsumer=20?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kgadget/src/baidu_sms_kafka_consumer.py | 285 +++++++----------------- 1 file changed, 86 insertions(+), 199 deletions(-) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index fbc1202..399c421 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -4,8 +4,8 @@ import os import json import asyncio -import datetime -from sqlor.dbpools import DBPools +# import datetime +# from sqlor.dbpools import DBPools # from appPublic.jsonConfig import getConfig from appPublic.uniqueID import getID as uuid from confluent_kafka import Consumer as BaiduKafKaConsumer @@ -18,76 +18,78 @@ from appPublic.Singleton import SingletonDecorator from appPublic.folderUtils import ProgramPath from appPublic.argsConvert import ArgsConvert -from baiDuSmsClient import send_baidu_vcode as send_vcode +# from baiDuSmsClient import send_baidu_vcode as send_vcode + +from aiohttp import client as aiohttp_client -def key2ansi(dict): - # print dict - return dict - a = {} - for k, v in dict.items(): - k = k.encode('utf-8') - # if type(v) == type(u" "): - # v = v.encode('utf-8') - a[k] = v +# def key2ansi(dict): +# # print dict +# return dict +# a = {} +# for k, v in dict.items(): +# k = k.encode('utf-8') +# # if type(v) == type(u" "): +# # v = v.encode('utf-8') +# a[k] = v - return a +# return a -class JsonObject(DictObject): - """ - JsonObject class load json from a json file - """ +# class JsonObject(DictObject): +# """ +# JsonObject class load json from a json file +# """ - def __init__(self, jsonholder, keytype='ansi', NS=None): - jhtype = type(jsonholder) - if jhtype == type("") or jhtype == type(u''): - f = open(jsonholder, 'r') - else: - f = jsonholder - try: - a = json.load(f) - except Exception as e: - print("exception:", self.__jsonholder__, e) - raise e - finally: - if type(jsonholder) == type(""): - f.close() +# def __init__(self, jsonholder, keytype='ansi', NS=None): +# jhtype = type(jsonholder) +# if jhtype == type("") or jhtype == type(u''): +# f = open(jsonholder, 'r') +# else: +# f = jsonholder +# try: +# a = json.load(f) +# except Exception as e: +# print("exception:", self.__jsonholder__, e) +# raise e +# finally: +# if type(jsonholder) == type(""): +# f.close() - if NS is not None: - ac = ArgsConvert('$[', ']$') - a = ac.convert(a, NS) - a['__jsonholder__'] = jsonholder - a['NS'] = NS - DictObject.__init__(self, **a) +# if NS is not None: +# ac = ArgsConvert('$[', ']$') +# a = ac.convert(a, NS) +# a['__jsonholder__'] = jsonholder +# a['NS'] = NS +# DictObject.__init__(self, **a) -@SingletonDecorator -class JsonConfig(JsonObject): - pass +# @SingletonDecorator +# class JsonConfig(JsonObject): +# pass -def getConfig(path=None, NS=None): - pp = ProgramPath() - if path == None: - path = os.getcwd() - cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json")) - # print __name__,cfname - ns = { - 'home': str(Path.home()), - 'workdir': path, - 'ProgramPath': pp - } - if NS is not None: - ns.update(NS) - a = JsonConfig(cfname, NS=ns) - return a +# def getConfig(path=None, NS=None): +# pp = ProgramPath() +# if path == None: +# path = os.getcwd() +# cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json")) +# # print __name__,cfname +# ns = { +# 'home': str(Path.home()), +# 'workdir': path, +# 'ProgramPath': pp +# } +# if NS is not None: +# ns.update(NS) +# a = JsonConfig(cfname, NS=ns) +# return a -async def time_convert(resoucetime=None): - if not resoucetime: - return - utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) - beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) - return beijing_time.strftime("%Y-%m-%d %H:%M:%S") +# async def time_convert(resoucetime=None): +# if not resoucetime: +# return +# utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) +# beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) +# return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): import os @@ -110,158 +112,44 @@ async def baidu_sms_kafka_consumer(ns={}): 'auto.offset.reset': 'latest', 'fetch.message.max.bytes': '1024*512', }) - print('开始创建文件夹') + # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) - - files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] - for filename in files: - if not os.path.exists(filename): - with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 - pass # 创建空文件 - else: - pass - - # total_count = 0 while True: - i = 0 - # if i == 0: - # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS - # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - - msg = consumer.poll(0.1) # 单次轮询获取消息 + msg = consumer.poll(2) # 单次轮询获取消息 if msg is None: - if i == 10: - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") continue elif msg.error(): - # 写入日志文件记录错误信息 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") + pass else: - # total_count += 1 try: - # 解析消息内容为字典(避免变量名冲突) - msg_data_ori = json.loads(msg.value().decode('utf-8')) - msg_data = msg_data_ori['messages'][0] - messageid = msg_data.get('id') - taskid = msg_data.get('taskId') - - # 读取文件内容进行检查 - with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: - content = f.read() - - if messageid in content: - print(f"文件中已存在 '{messageid}',跳过写入") - continue - else: - # 追加写入目标内容 - with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: - f.write(messageid + '\n') - print(f"已写入 '{messageid}' 到文件") - - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(str(msg_data) + '\n') - - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # 1. 去重检查(messageid和taskid) - exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) - if exist_msg: - print(f"消息id {messageid} 已存在,跳过处理") - continue - # consumer.close() - # return - - # 2. 构建小写key的ns字典(完整映射所有字段) - ns_msg = { - 'id': uuid(), - 'messageid': messageid, - 'taskid': taskid, - 'userid': msg_data.get('userId'), - 'accountid': msg_data.get('accountId'), - 'usertype': msg_data.get('userType'), - 'receiverid': msg_data.get('receiverId'), - 'contentvar': msg_data.get('contentVar'), - 'sendchannel': msg_data.get('sendChannel'), - 'content': msg_data.get('content'), - 'messagetemplateid': msg_data.get('messageTemplateId'), - 'isvirtualstore': msg_data.get('isVirtualStore'), - 'channelmessageid': msg_data.get('channelMessageId'), - 'channelstatus': msg_data.get('channelStatus'), - 'majorcategory': msg_data.get('majorCategory'), - 'minorcategory': msg_data.get('minorCategory'), - 'status': msg_data.get('status'), - 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, - 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, - 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, - 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, - 'valid': msg_data.get('valid'), - 'complete': msg_data.get('complete'), - 'disturbhold': msg_data.get('disturbHold'), - 'sendcomplete': msg_data.get('sendComplete') - } - - # 3. 执行存库 - await sor.C('baidu_kafka_msg', ns_msg) - # print(f"消息id {messageid} 存储成功") - - # 4. 触发短信发送(当sendChannel为MOBILE时) - send_channel = msg_data.get('sendChannel') - if send_channel == 'MOBILE': - msg_content = msg_data.get('content') - - # 判断验证码类短信 | 通知类短信 - if '验证码' in msg_content: - sms_stype = '百度kafka普通验证码' - else: - sms_stype = '百度kafka普通通知' - - # 查询用户手机号 - account_id = msg_data.get('accountId') - user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - if user_local_id_li: - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - else: - # 记录错误日志 短信发送失败 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") - print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") - else: - # 记录错误日志 用户未找到 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") - print(f"未找到百度用户ID {account_id} 对应的本地用户") + method = "POST" + url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy' + header = { + "Host": "www.opencomputing.cn", + "Content-Type": "application/json" + } + data = { + 'msg': msg + } + async with aiohttp_client.request( + method=method, + url=url, + headers=header, + json=data) as res: + data_ = await res.json() except json.JSONDecodeError: - print("错误:消息内容非有效JSON") return { 'status': False, 'msg': '错误:消息内容非有效JSON' } except Exception as e: - print(f"处理异常: {str(e)}") - import traceback - with open('baidu_kafka_error.txt', 'w') as f: - f.write(str(e)+ traceback.format_exc()) - traceback.print_exc() return { 'status': False, 'msg': f"处理异常: {str(e)}" } - # 记录total_count - # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - # f.write(f"本次轮询共处理消息数:{total_count}\n") consumer.close() # 确保消费者关闭 return { @@ -271,11 +159,10 @@ async def baidu_sms_kafka_consumer(ns={}): if __name__ == '__main__': # p = os.getcwd() - current_dir = os.getcwd() - path_parts = current_dir.split('/') - two_levels_up = '/'.join(path_parts[:-2]) - config = getConfig(two_levels_up) - DBPools(config.databases) - print(config.databases) + # current_dir = os.getcwd() + # path_parts = current_dir.split('/') + # two_levels_up = '/'.join(path_parts[:-2]) + # config = getConfig(two_levels_up) + # DBPools(config.databases) loop = asyncio.get_event_loop() print(loop.run_until_complete(baidu_sms_kafka_consumer())) From fcbbb6056faf7f0bda59579af4f0f95f5d24e1f1 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 15:25:42 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E7=99=BE=E5=BA=A6kafka=20=E6=8E=A5?= =?UTF-8?q?=E6=94=B6=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- b/baiduc/baidu_sms_kafka_consumer.dspy | 303 ++++++++++++------------- 1 file changed, 145 insertions(+), 158 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index d73988d..74271e0 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -6,29 +6,30 @@ async def time_convert(resoucetime=None): return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): - import os - consumer = BaiduKafKaConsumer({ - # 接入点 - 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', - # 接入协议 - 'security.protocol': 'SASL_SSL', - 'ssl.endpoint.identification.algorithm': 'none', - # 证书文件路径 - 'ssl.ca.location': 'baidu_kafka_ca.pem', - # SASL 机制 - 'sasl.mechanism': 'SCRAM-SHA-512', - # SASL 用户名 - 'sasl.username': 'kaiyuanyun', - # SASL 用户密码 - 'sasl.password': 'Kyy250609#', - # 消费组id - 'group.id': 'kaiyuanyun_msg_group', - 'auto.offset.reset': 'latest', - 'fetch.message.max.bytes': '1024*512', - }) + # consumer = BaiduKafKaConsumer({ + # # 接入点 + # 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', + # # 接入协议 + # 'security.protocol': 'SASL_SSL', + # 'ssl.endpoint.identification.algorithm': 'none', + # # 证书文件路径 + # 'ssl.ca.location': 'baidu_kafka_ca.pem', + # # SASL 机制 + # 'sasl.mechanism': 'SCRAM-SHA-512', + # # SASL 用户名 + # 'sasl.username': 'kaiyuanyun', + # # SASL 用户密码 + # 'sasl.password': 'Kyy250609#', + # # 消费组id + # 'group.id': 'kaiyuanyun_msg_group', + # 'auto.offset.reset': 'latest', + # 'fetch.message.max.bytes': '1024*512', + # }) - # 订阅的主题名称 - consumer.subscribe(['kaiyuanyun_msg_topic']) + # # 订阅的主题名称 + # consumer.subscribe(['kaiyuanyun_msg_topic']) + + import os files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] for filename in files: @@ -38,143 +39,129 @@ async def baidu_sms_kafka_consumer(ns={}): else: pass - total_count = 0 - for i in range(10): - # if i == 0: - # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS - # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - - msg = consumer.poll(0.01) # 单次轮询获取消息 - - if msg is None: - if i == 10: - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - continue - elif msg.error(): - # 写入日志文件记录错误信息 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") - else: - total_count += 1 - try: - # 解析消息内容为字典(避免变量名冲突) - msg_data_ori = json.loads(msg.value().decode('utf-8')) - msg_data = msg_data_ori['messages'][0] - messageid = msg_data.get('id') - taskid = msg_data.get('taskId') - - # 读取文件内容进行检查 - with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: - content = f.read() - - if messageid in content: - print(f"文件中已存在 '{messageid}',跳过写入") - continue - else: - # 追加写入目标内容 - with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: - f.write(messageid + '\n') - print(f"已写入 '{messageid}' 到文件") - - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(str(msg_data) + '\n') - - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # 1. 去重检查(messageid和taskid) - exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) - if exist_msg: - print(f"消息id {messageid} 已存在,跳过处理") - continue - # consumer.close() - # return - - # 2. 构建小写key的ns字典(完整映射所有字段) - ns_msg = { - 'id': uuid(), - 'messageid': messageid, - 'taskid': taskid, - 'userid': msg_data.get('userId'), - 'accountid': msg_data.get('accountId'), - 'usertype': msg_data.get('userType'), - 'receiverid': msg_data.get('receiverId'), - 'contentvar': msg_data.get('contentVar'), - 'sendchannel': msg_data.get('sendChannel'), - 'content': msg_data.get('content'), - 'messagetemplateid': msg_data.get('messageTemplateId'), - 'isvirtualstore': msg_data.get('isVirtualStore'), - 'channelmessageid': msg_data.get('channelMessageId'), - 'channelstatus': msg_data.get('channelStatus'), - 'majorcategory': msg_data.get('majorCategory'), - 'minorcategory': msg_data.get('minorCategory'), - 'status': msg_data.get('status'), - 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, - 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, - 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, - 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, - 'valid': msg_data.get('valid'), - 'complete': msg_data.get('complete'), - 'disturbhold': msg_data.get('disturbHold'), - 'sendcomplete': msg_data.get('sendComplete') - } - - # 3. 执行存库 - await sor.C('baidu_kafka_msg', ns_msg) - print(f"消息id {messageid} 存储成功") - - # 4. 触发短信发送(当sendChannel为MOBILE时) - send_channel = msg_data.get('sendChannel') - if send_channel == 'MOBILE': - msg_content = msg_data.get('content') - - # 判断验证码类短信 | 通知类短信 - if '验证码' in msg_content: - sms_stype = '百度kafka普通验证码' - else: - sms_stype = '百度kafka普通通知' - - # 查询用户手机号 - account_id = msg_data.get('accountId') - user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - if user_local_id_li: - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - else: - # 记录错误日志 短信发送失败 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") - print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") - else: - # 记录错误日志 用户未找到 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") - print(f"未找到百度用户ID {account_id} 对应的本地用户") - - except json.JSONDecodeError: - print("错误:消息内容非有效JSON") - return { - 'status': False, - 'msg': '错误:消息内容非有效JSON' - } - except Exception as e: - print(f"处理异常: {str(e)}") - return { - 'status': False, - 'msg': f"处理异常: {str(e)}" - } - # 记录total_count - # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - # f.write(f"本次轮询共处理消息数:{total_count}\n") + msg = ns.get('msg') - consumer.close() # 确保消费者关闭 + if msg.error(): + # 写入日志文件记录错误信息 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") + else: + try: + # 解析消息内容为字典(避免变量名冲突) + msg_data_ori = json.loads(msg.value().decode('utf-8')) + msg_data = msg_data_ori['messages'][0] + messageid = msg_data.get('id') + taskid = msg_data.get('taskId') + + # 读取文件内容进行检查 + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: + content = f.read() + + if messageid in content: + print(f"文件中已存在 '{messageid}',跳过写入") + return + else: + # 追加写入目标内容 + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: + f.write(messageid + '\n') + print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + # 1. 去重检查(messageid和taskid) + exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) + if exist_msg: + print(f"消息id {messageid} 已存在,跳过处理") + return + + # 2. 构建小写key的ns字典(完整映射所有字段) + ns_msg = { + 'id': uuid(), + 'messageid': messageid, + 'taskid': taskid, + 'userid': msg_data.get('userId'), + 'accountid': msg_data.get('accountId'), + 'usertype': msg_data.get('userType'), + 'receiverid': msg_data.get('receiverId'), + 'contentvar': msg_data.get('contentVar'), + 'sendchannel': msg_data.get('sendChannel'), + 'content': msg_data.get('content'), + 'messagetemplateid': msg_data.get('messageTemplateId'), + 'isvirtualstore': msg_data.get('isVirtualStore'), + 'channelmessageid': msg_data.get('channelMessageId'), + 'channelstatus': msg_data.get('channelStatus'), + 'majorcategory': msg_data.get('majorCategory'), + 'minorcategory': msg_data.get('minorCategory'), + 'status': msg_data.get('status'), + 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, + 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, + 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, + 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, + 'valid': msg_data.get('valid'), + 'complete': msg_data.get('complete'), + 'disturbhold': msg_data.get('disturbHold'), + 'sendcomplete': msg_data.get('sendComplete') + } + + # 3. 执行存库 + await sor.C('baidu_kafka_msg', ns_msg) + print(f"消息id {messageid} 存储成功") + + # 4. 触发短信发送(当sendChannel为MOBILE时) + send_channel = msg_data.get('sendChannel') + if send_channel == 'MOBILE': + msg_content = msg_data.get('content') + + # 判断验证码类短信 | 通知类短信 + if '验证码' in msg_content: + sms_stype = '百度kafka普通验证码' + else: + sms_stype = '百度kafka普通通知' + + # 查询用户手机号 + account_id = msg_data.get('accountId') + user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) + else: + # 记录错误日志 短信发送失败 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + + except json.JSONDecodeError: + print("错误:消息内容非有效JSON") + return { + 'status': False, + 'msg': '错误:消息内容非有效JSON' + } + except Exception as e: + print(f"处理异常: {str(e)}") + import traceback + with open('baidu_kafka_error.txt', 'w') as f: + f.write(str(e)+ traceback.format_exc()) + traceback.print_exc() + return { + 'status': False, + 'msg': f"处理异常: {str(e)}" + } + + # consumer.close() # 确保消费者关闭 return { 'status': True, 'msg': '获取信息执行结束' From 9cbda277fb6e27e596b3f9a3507dcbbc9e1a9a33 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 16:15:13 +0800 Subject: [PATCH 04/10] update --- b/baiduc/baidu_sms_kafka_consumer.dspy | 219 ++++++++++++------------ kgadget/src/baidu_sms_kafka_consumer.py | 15 +- 2 files changed, 120 insertions(+), 114 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index 74271e0..dbefa7c 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -41,125 +41,126 @@ async def baidu_sms_kafka_consumer(ns={}): msg = ns.get('msg') - if msg.error(): - # 写入日志文件记录错误信息 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") - else: - try: - # 解析消息内容为字典(避免变量名冲突) - msg_data_ori = json.loads(msg.value().decode('utf-8')) - msg_data = msg_data_ori['messages'][0] - messageid = msg_data.get('id') - taskid = msg_data.get('taskId') + # if msg.error(): + # # 写入日志文件记录错误信息 + # with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + # f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") + # else: + try: + # 解析消息内容为字典(避免变量名冲突) + # msg_data_ori = json.loads(msg.value().decode('utf-8')) + msg_data_ori = json.loads(msg) + msg_data = msg_data_ori['messages'][0] + messageid = msg_data.get('id') + taskid = msg_data.get('taskId') - # 读取文件内容进行检查 - with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: - content = f.read() + # 读取文件内容进行检查 + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: + content = f.read() - if messageid in content: - print(f"文件中已存在 '{messageid}',跳过写入") + if messageid in content: + print(f"文件中已存在 '{messageid}',跳过写入") + return + else: + # 追加写入目标内容 + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: + f.write(messageid + '\n') + print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + # 1. 去重检查(messageid和taskid) + exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) + if exist_msg: + print(f"消息id {messageid} 已存在,跳过处理") return - else: - # 追加写入目标内容 - with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: - f.write(messageid + '\n') - print(f"已写入 '{messageid}' 到文件") - - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(str(msg_data) + '\n') - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # 1. 去重检查(messageid和taskid) - exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) - if exist_msg: - print(f"消息id {messageid} 已存在,跳过处理") - return + # 2. 构建小写key的ns字典(完整映射所有字段) + ns_msg = { + 'id': uuid(), + 'messageid': messageid, + 'taskid': taskid, + 'userid': msg_data.get('userId'), + 'accountid': msg_data.get('accountId'), + 'usertype': msg_data.get('userType'), + 'receiverid': msg_data.get('receiverId'), + 'contentvar': msg_data.get('contentVar'), + 'sendchannel': msg_data.get('sendChannel'), + 'content': msg_data.get('content'), + 'messagetemplateid': msg_data.get('messageTemplateId'), + 'isvirtualstore': msg_data.get('isVirtualStore'), + 'channelmessageid': msg_data.get('channelMessageId'), + 'channelstatus': msg_data.get('channelStatus'), + 'majorcategory': msg_data.get('majorCategory'), + 'minorcategory': msg_data.get('minorCategory'), + 'status': msg_data.get('status'), + 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, + 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, + 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, + 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, + 'valid': msg_data.get('valid'), + 'complete': msg_data.get('complete'), + 'disturbhold': msg_data.get('disturbHold'), + 'sendcomplete': msg_data.get('sendComplete') + } - # 2. 构建小写key的ns字典(完整映射所有字段) - ns_msg = { - 'id': uuid(), - 'messageid': messageid, - 'taskid': taskid, - 'userid': msg_data.get('userId'), - 'accountid': msg_data.get('accountId'), - 'usertype': msg_data.get('userType'), - 'receiverid': msg_data.get('receiverId'), - 'contentvar': msg_data.get('contentVar'), - 'sendchannel': msg_data.get('sendChannel'), - 'content': msg_data.get('content'), - 'messagetemplateid': msg_data.get('messageTemplateId'), - 'isvirtualstore': msg_data.get('isVirtualStore'), - 'channelmessageid': msg_data.get('channelMessageId'), - 'channelstatus': msg_data.get('channelStatus'), - 'majorcategory': msg_data.get('majorCategory'), - 'minorcategory': msg_data.get('minorCategory'), - 'status': msg_data.get('status'), - 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, - 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, - 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, - 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, - 'valid': msg_data.get('valid'), - 'complete': msg_data.get('complete'), - 'disturbhold': msg_data.get('disturbHold'), - 'sendcomplete': msg_data.get('sendComplete') - } + # 3. 执行存库 + await sor.C('baidu_kafka_msg', ns_msg) + print(f"消息id {messageid} 存储成功") - # 3. 执行存库 - await sor.C('baidu_kafka_msg', ns_msg) - print(f"消息id {messageid} 存储成功") + # 4. 触发短信发送(当sendChannel为MOBILE时) + send_channel = msg_data.get('sendChannel') + if send_channel == 'MOBILE': + msg_content = msg_data.get('content') - # 4. 触发短信发送(当sendChannel为MOBILE时) - send_channel = msg_data.get('sendChannel') - if send_channel == 'MOBILE': - msg_content = msg_data.get('content') + # 判断验证码类短信 | 通知类短信 + if '验证码' in msg_content: + sms_stype = '百度kafka普通验证码' + else: + sms_stype = '百度kafka普通通知' - # 判断验证码类短信 | 通知类短信 - if '验证码' in msg_content: - sms_stype = '百度kafka普通验证码' + # 查询用户手机号 + account_id = msg_data.get('accountId') + user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) else: - sms_stype = '百度kafka普通通知' - - # 查询用户手机号 - account_id = msg_data.get('accountId') - user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - if user_local_id_li: - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - else: - # 记录错误日志 短信发送失败 - with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") - print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") - else: - # 记录错误日志 用户未找到 + # 记录错误日志 短信发送失败 with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: - f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") - print(f"未找到百度用户ID {account_id} 对应的本地用户") - - except json.JSONDecodeError: - print("错误:消息内容非有效JSON") - return { - 'status': False, - 'msg': '错误:消息内容非有效JSON' - } - except Exception as e: - print(f"处理异常: {str(e)}") - import traceback - with open('baidu_kafka_error.txt', 'w') as f: - f.write(str(e)+ traceback.format_exc()) - traceback.print_exc() - return { - 'status': False, - 'msg': f"处理异常: {str(e)}" - } + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + + except json.JSONDecodeError: + print("错误:消息内容非有效JSON") + return { + 'status': False, + 'msg': '错误:消息内容非有效JSON' + } + except Exception as e: + print(f"处理异常: {str(e)}") + import traceback + with open('baidu_kafka_error.txt', 'w') as f: + f.write(str(e)+ traceback.format_exc()) + traceback.print_exc() + return { + 'status': False, + 'msg': f"处理异常: {str(e)}" + } # consumer.close() # 确保消费者关闭 return { diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 399c421..03c6d48 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -100,7 +100,7 @@ async def baidu_sms_kafka_consumer(ns={}): 'security.protocol': 'SASL_SSL', 'ssl.endpoint.identification.algorithm': 'none', # 证书文件路径 - 'ssl.ca.location': 'baidu_kafka_ca.pem', + 'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem', # SASL 机制 'sasl.mechanism': 'SCRAM-SHA-512', # SASL 用户名 @@ -124,21 +124,23 @@ async def baidu_sms_kafka_consumer(ns={}): pass else: try: + # print('Received message: {}'.format(msg.value().decode('utf-8'))) method = "POST" url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy' header = { "Host": "www.opencomputing.cn", - "Content-Type": "application/json" + # "Content-Type": "application/json" } data = { - 'msg': msg + 'msg': msg.value().decode('utf-8') } async with aiohttp_client.request( method=method, url=url, headers=header, - json=data) as res: - data_ = await res.json() + data=data) as res: + data_ = await res.text() + # print("Response data:", data_) except json.JSONDecodeError: return { @@ -146,6 +148,9 @@ async def baidu_sms_kafka_consumer(ns={}): 'msg': '错误:消息内容非有效JSON' } except Exception as e: + import traceback + print(str(e)+ traceback.format_exc()) + traceback.print_exc() return { 'status': False, 'msg': f"处理异常: {str(e)}" From 1c16423a9879c6f4c565916a49a344b6468468b8 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 16:32:53 +0800 Subject: [PATCH 05/10] update --- kgadget/src/baidu_sms_kafka_consumer.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 03c6d48..f79c419 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -113,6 +113,14 @@ async def baidu_sms_kafka_consumer(ns={}): 'fetch.message.max.bytes': '1024*512', }) + # 创建日志文件 + files = "baidu_kafka_msg.txt" + if not os.path.exists(filename): + with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 + pass # 创建空文件 + else: + pass + # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) while True: @@ -124,6 +132,9 @@ async def baidu_sms_kafka_consumer(ns={}): pass else: try: + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n") + # print('Received message: {}'.format(msg.value().decode('utf-8'))) method = "POST" url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy' From af535e7c712106289d2146b36ff7c2f3af15652c Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 16:37:23 +0800 Subject: [PATCH 06/10] update --- kgadget/src/baidu_sms_kafka_consumer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index f79c419..4acac5f 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -114,9 +114,9 @@ async def baidu_sms_kafka_consumer(ns={}): }) # 创建日志文件 - files = "baidu_kafka_msg.txt" - if not os.path.exists(filename): - with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 + filename = "baidu_kafka_msg.txt" + if not os.path.exists('/d/zhc/' + filename): + with open('/d/zhc/' + filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 pass # 创建空文件 else: pass From 2c52b062da7cf25d8b0c9b42b7050f4055f11add Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 16:50:26 +0800 Subject: [PATCH 07/10] update --- kgadget/src/baidu_sms_kafka_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 4acac5f..8434f5d 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -100,7 +100,7 @@ async def baidu_sms_kafka_consumer(ns={}): 'security.protocol': 'SASL_SSL', 'ssl.endpoint.identification.algorithm': 'none', # 证书文件路径 - 'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem', + 'ssl.ca.location': 'baidu_kafka_ca.pem', # SASL 机制 'sasl.mechanism': 'SCRAM-SHA-512', # SASL 用户名 From 104a5b4bc5c319c5bb4a4bcba863a012d4f5efbd Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 17:21:02 +0800 Subject: [PATCH 08/10] updsate --- kgadget/src/baidu_sms_kafka_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 8434f5d..d247015 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -124,7 +124,7 @@ async def baidu_sms_kafka_consumer(ns={}): # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) while True: - msg = consumer.poll(2) # 单次轮询获取消息 + msg = consumer.poll(1) # 单次轮询获取消息 if msg is None: continue @@ -132,7 +132,7 @@ async def baidu_sms_kafka_consumer(ns={}): pass else: try: - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + with open('/d/zhc/baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n") # print('Received message: {}'.format(msg.value().decode('utf-8'))) From d4c85a4db3784490500abcf41282068b6de8966a Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 17:23:54 +0800 Subject: [PATCH 09/10] updsate --- kgadget/src/baidu_sms_kafka_consumer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index d247015..5ab15ad 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -21,6 +21,7 @@ from appPublic.argsConvert import ArgsConvert # from baiDuSmsClient import send_baidu_vcode as send_vcode from aiohttp import client as aiohttp_client +import datetime # def key2ansi(dict): From 18e5021c6485a4d24b567260ffae0ff651dbef3a Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 18:19:02 +0800 Subject: [PATCH 10/10] update --- b/baiduc/baidu_confirm_refund_order.dspy | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/b/baiduc/baidu_confirm_refund_order.dspy b/b/baiduc/baidu_confirm_refund_order.dspy index 6312104..ed3ad96 100644 --- a/b/baiduc/baidu_confirm_refund_order.dspy +++ b/b/baiduc/baidu_confirm_refund_order.dspy @@ -72,15 +72,20 @@ async def affirmbz_order(ns={}): resource_find_sql = """select id, resourceid, expire_resourceid from customer_goods where FIND_IN_SET('%s', resourceid) and del_flg = '0';""" % j['resourceids'] resource_find_li = await sor.sqlExe(resource_find_sql, {}) resource_find_id = resource_find_li[0]['id'] + expire_resourceid = resource_find_li[0]['expire_resourceid'] - expire_resourceid += expire_resourceid + ',' + j['resourceids'] if expire_resourceid else j['resourceids'] + if expire_resourceid: + new_expire_resourceid = expire_resourceid + ',' + j['resourceids'] + else: + new_expire_resourceid = j['resourceids'] + items_refund = resource_find_li[0]['resourceid'].split(',') if resource_find_li[0]['resourceid'] else [] filtered_items = [item for item in items_refund if item != j['resourceids']] result = ','.join(filtered_items) if not result: await sor.U('customer_goods', {'id': resource_find_id, 'del_flg': '1'}) else: - await sor.U('customer_goods', {'id': resource_find_id, 'resourceid': result, 'expire_resourceid': expire_resourceid}) + await sor.U('customer_goods', {'id': resource_find_id, 'resourceid': result, 'expire_resourceid': new_expire_resourceid}) # 处理续费逻辑 elif order_type == 'RENEW': @@ -118,7 +123,11 @@ async def affirmbz_order(ns={}): return {'status': True, 'msg': '支付成功'} except Exception as error: await sor.rollback() - return {'status': False, 'msg': str(error)} + import traceback + with open('baiducloud_err.txt', 'w') as f: + f.write(str(error) + str(traceback.format_exc())) + traceback.print_exc() + return {'status': False, 'msg': str(error) + str(traceback.format_exc())} async def baidu_new_update_resouce(ns={}): @@ -514,7 +523,8 @@ async def get_baidu_orderlist(ns={}): await user_action_record(ns_record) return { 'status': False, - 'msg': '支付错误, 请联系售后' + 'msg': '支付错误, 请联系售后,', + 'data': affirmbz_order_res } # 预配置local_refund用于本地操作 @@ -737,8 +747,8 @@ async def update_baidu_order_list(ns={}): async def baidu_confirm_refund_order(ns={}): # ns = { - # 'order_id': ["13f3518648054796abf7f3d00ed611bf"], - # 'userid': 'KsKhCUPizQyGiw3L1WVRy' + # 'order_id': ["2996f0baf34c4a0a98e1da0b4e290a35"], + # 'userid': 'y_xQK0G62dtZT5EneMQFT' # } import asyncio # 把 NEED_CONFIRM的订单同步到本地库,用于后续状态更新