This commit is contained in:
ping 2025-11-21 09:26:50 +08:00
parent a77cba4dfe
commit 21e9e9b08b

View File

@ -110,7 +110,7 @@ async def baidu_sms_kafka_consumer(ns={}):
'auto.offset.reset': 'latest', 'auto.offset.reset': 'latest',
'fetch.message.max.bytes': '1024*512', 'fetch.message.max.bytes': '1024*512',
}) })
print('开始创建文件夹')
# 订阅的主题名称 # 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic']) consumer.subscribe(['kaiyuanyun_msg_topic'])
@ -125,10 +125,10 @@ async def baidu_sms_kafka_consumer(ns={}):
# total_count = 0 # total_count = 0
while True: while True:
i = 0 i = 0
if i == 0: # if i == 0:
# 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
msg = consumer.poll(0.1) # 单次轮询获取消息 msg = consumer.poll(0.1) # 单次轮询获取消息
@ -251,6 +251,10 @@ async def baidu_sms_kafka_consumer(ns={}):
} }
except Exception as e: except Exception as e:
print(f"处理异常: {str(e)}") print(f"处理异常: {str(e)}")
import traceback
with open('baidu_kafka_error.txt', 'w') as f:
f.write(str(e)+ traceback.format_exc())
traceback.print_exc()
return { return {
'status': False, 'status': False,
'msg': f"处理异常: {str(e)}" 'msg': f"处理异常: {str(e)}"
@ -272,5 +276,6 @@ if __name__ == '__main__':
two_levels_up = '/'.join(path_parts[:-2]) two_levels_up = '/'.join(path_parts[:-2])
config = getConfig(two_levels_up) config = getConfig(two_levels_up)
DBPools(config.databases) DBPools(config.databases)
print(config.databases)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
print(loop.run_until_complete(baidu_sms_kafka_consumer())) print(loop.run_until_complete(baidu_sms_kafka_consumer()))