# -*- coding: utf-8 -*- # @Time: 2025/6/23 14:20 import os import json import asyncio # import datetime # from sqlor.dbpools import DBPools # from appPublic.jsonConfig import getConfig from appPublic.uniqueID import getID as uuid from confluent_kafka import Consumer as BaiduKafKaConsumer import os,sys import json from pathlib import Path from appPublic.dictObject import DictObject from appPublic.Singleton import SingletonDecorator from appPublic.folderUtils import ProgramPath from appPublic.argsConvert import ArgsConvert # from baiDuSmsClient import send_baidu_vcode as send_vcode from aiohttp import client as aiohttp_client # def key2ansi(dict): # # print dict # return dict # a = {} # for k, v in dict.items(): # k = k.encode('utf-8') # # if type(v) == type(u" "): # # v = v.encode('utf-8') # a[k] = v # return a # class JsonObject(DictObject): # """ # JsonObject class load json from a json file # """ # def __init__(self, jsonholder, keytype='ansi', NS=None): # jhtype = type(jsonholder) # if jhtype == type("") or jhtype == type(u''): # f = open(jsonholder, 'r') # else: # f = jsonholder # try: # a = json.load(f) # except Exception as e: # print("exception:", self.__jsonholder__, e) # raise e # finally: # if type(jsonholder) == type(""): # f.close() # if NS is not None: # ac = ArgsConvert('$[', ']$') # a = ac.convert(a, NS) # a['__jsonholder__'] = jsonholder # a['NS'] = NS # DictObject.__init__(self, **a) # @SingletonDecorator # class JsonConfig(JsonObject): # pass # def getConfig(path=None, NS=None): # pp = ProgramPath() # if path == None: # path = os.getcwd() # cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json")) # # print __name__,cfname # ns = { # 'home': str(Path.home()), # 'workdir': path, # 'ProgramPath': pp # } # if NS is not None: # ns.update(NS) # a = JsonConfig(cfname, NS=ns) # return a # async def time_convert(resoucetime=None): # if not resoucetime: # return # utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) # beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) # return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): import os consumer = BaiduKafKaConsumer({ # 接入点 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', # 接入协议 'security.protocol': 'SASL_SSL', 'ssl.endpoint.identification.algorithm': 'none', # 证书文件路径 'ssl.ca.location': 'baidu_kafka_ca.pem', # SASL 机制 'sasl.mechanism': 'SCRAM-SHA-512', # SASL 用户名 'sasl.username': 'kaiyuanyun', # SASL 用户密码 'sasl.password': 'Kyy250609#', # 消费组id 'group.id': 'kaiyuanyun_msg_group', 'auto.offset.reset': 'latest', 'fetch.message.max.bytes': '1024*512', }) # 创建日志文件 filename = "baidu_kafka_msg.txt" if not os.path.exists('/d/zhc/' + filename): with open('/d/zhc/' + filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 pass # 创建空文件 else: pass # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) while True: msg = consumer.poll(2) # 单次轮询获取消息 if msg is None: continue elif msg.error(): pass else: try: with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n") # print('Received message: {}'.format(msg.value().decode('utf-8'))) method = "POST" url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy' header = { "Host": "www.opencomputing.cn", # "Content-Type": "application/json" } data = { 'msg': msg.value().decode('utf-8') } async with aiohttp_client.request( method=method, url=url, headers=header, data=data) as res: data_ = await res.text() # print("Response data:", data_) except json.JSONDecodeError: return { 'status': False, 'msg': '错误:消息内容非有效JSON' } except Exception as e: import traceback print(str(e)+ traceback.format_exc()) traceback.print_exc() return { 'status': False, 'msg': f"处理异常: {str(e)}" } consumer.close() # 确保消费者关闭 return { 'status': True, 'msg': '获取信息执行结束' } if __name__ == '__main__': # p = os.getcwd() # current_dir = os.getcwd() # path_parts = current_dir.split('/') # two_levels_up = '/'.join(path_parts[:-2]) # config = getConfig(two_levels_up) # DBPools(config.databases) loop = asyncio.get_event_loop() print(loop.run_until_complete(baidu_sms_kafka_consumer()))