From 21e9e9b08bf6046ce5cff21f32653bef21caae7f Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Fri, 21 Nov 2025 09:26:50 +0800 Subject: [PATCH] update --- kgadget/src/baidu_sms_kafka_consumer.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py index 90d2153..fbc1202 100644 --- a/kgadget/src/baidu_sms_kafka_consumer.py +++ b/kgadget/src/baidu_sms_kafka_consumer.py @@ -110,7 +110,7 @@ async def baidu_sms_kafka_consumer(ns={}): 'auto.offset.reset': 'latest', 'fetch.message.max.bytes': '1024*512', }) - + print('开始创建文件夹') # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) @@ -125,10 +125,10 @@ async def baidu_sms_kafka_consumer(ns={}): # total_count = 0 while True: i = 0 - if i == 0: - # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS - with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: - f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + # if i == 0: + # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS + # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") msg = consumer.poll(0.1) # 单次轮询获取消息 @@ -251,6 +251,10 @@ async def baidu_sms_kafka_consumer(ns={}): } except Exception as e: print(f"处理异常: {str(e)}") + import traceback + with open('baidu_kafka_error.txt', 'w') as f: + f.write(str(e)+ traceback.format_exc()) + traceback.print_exc() return { 'status': False, 'msg': f"处理异常: {str(e)}" @@ -272,5 +276,6 @@ if __name__ == '__main__': two_levels_up = '/'.join(path_parts[:-2]) config = getConfig(two_levels_up) DBPools(config.databases) + print(config.databases) loop = asyncio.get_event_loop() print(loop.run_until_complete(baidu_sms_kafka_consumer()))