# -*- coding: utf-8 -*- # @Time: 2025/6/23 14:20 import os import json import asyncio import datetime from sqlor.dbpools import DBPools # from appPublic.jsonConfig import getConfig from appPublic.uniqueID import getID as uuid from confluent_kafka import Consumer as BaiduKafKaConsumer import os,sys import json from pathlib import Path from appPublic.dictObject import DictObject from appPublic.Singleton import SingletonDecorator from appPublic.folderUtils import ProgramPath from appPublic.argsConvert import ArgsConvert from baiDuSmsClient import send_baidu_vcode as send_vcode def key2ansi(dict): # print dict return dict a = {} for k, v in dict.items(): k = k.encode('utf-8') # if type(v) == type(u" "): # v = v.encode('utf-8') a[k] = v return a class JsonObject(DictObject): """ JsonObject class load json from a json file """ def __init__(self, jsonholder, keytype='ansi', NS=None): jhtype = type(jsonholder) if jhtype == type("") or jhtype == type(u''): f = open(jsonholder, 'r') else: f = jsonholder try: a = json.load(f) except Exception as e: print("exception:", self.__jsonholder__, e) raise e finally: if type(jsonholder) == type(""): f.close() if NS is not None: ac = ArgsConvert('$[', ']$') a = ac.convert(a, NS) a['__jsonholder__'] = jsonholder a['NS'] = NS DictObject.__init__(self, **a) @SingletonDecorator class JsonConfig(JsonObject): pass def getConfig(path=None, NS=None): pp = ProgramPath() if path == None: path = os.getcwd() cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json")) # print __name__,cfname ns = { 'home': str(Path.home()), 'workdir': path, 'ProgramPath': pp } if NS is not None: ns.update(NS) a = JsonConfig(cfname, NS=ns) return a async def time_convert(resoucetime=None): if not resoucetime: return utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): consumer = BaiduKafKaConsumer({ # 接入点 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', # 接入协议 'security.protocol': 'SASL_SSL', 'ssl.endpoint.identification.algorithm': 'none', # 证书文件路径 'ssl.ca.location': 'baidu_kafka_ca.pem', # SASL 机制 'sasl.mechanism': 'SCRAM-SHA-512', # SASL 用户名 'sasl.username': 'kaiyuanyun', # SASL 用户密码 'sasl.password': 'Kyy250609#', # 消费组id 'group.id': 'kaiyuanyun_msg_group', 'auto.offset.reset': 'latest', 'fetch.message.max.bytes': '1024*512', }) # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) while True: msg = consumer.poll(1) # 单次轮询获取消息 if msg is None: continue elif msg.error(): print(f"消费者错误: {msg.error()}") else: try: # 解析消息内容为字典(避免变量名冲突) msg_data_ori = json.loads(msg.value().decode('utf-8')) msg_data = msg_data_ori['messages'][0] messageid = msg_data.get('id') taskid = msg_data.get('taskId') filename = 'baidu_kafka_msg.txt' # 检查文件是否存在,不存在则创建 if not os.path.exists(filename): with open(filename, 'w', encoding='utf-8') as f: print(f"文件不存在,已创建 {filename}") # 读取文件内容进行检查 with open(filename, 'r', encoding='utf-8') as f: content = f.read() if messageid in content: print(f"文件中已存在 '{messageid}',跳过写入") continue else: # 追加写入目标内容 with open(filename, 'a', encoding='utf-8') as f: f.write(messageid + '\n') print(f"已写入 '{messageid}' 到文件") db = DBPools() async with db.sqlorContext('kboss') as sor: # 1. 去重检查(messageid和taskid) exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) if exist_msg: print(f"消息id {messageid} 已存在,跳过处理") consumer.close() return # 2. 构建小写key的ns字典(完整映射所有字段) ns_msg = { 'id': uuid(), 'messageid': messageid, 'taskid': taskid, 'userid': msg_data.get('userId'), 'accountid': msg_data.get('accountId'), 'usertype': msg_data.get('userType'), 'receiverid': msg_data.get('receiverId'), 'contentvar': msg_data.get('contentVar'), 'sendchannel': msg_data.get('sendChannel'), 'content': msg_data.get('content'), 'messagetemplateid': msg_data.get('messageTemplateId'), 'isvirtualstore': msg_data.get('isVirtualStore'), 'channelmessageid': msg_data.get('channelMessageId'), 'channelstatus': msg_data.get('channelStatus'), 'majorcategory': msg_data.get('majorCategory'), 'minorcategory': msg_data.get('minorCategory'), 'status': msg_data.get('status'), 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None, 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None, 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None, 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None, 'valid': msg_data.get('valid'), 'complete': msg_data.get('complete'), 'disturbhold': msg_data.get('disturbHold'), 'sendcomplete': msg_data.get('sendComplete') } # 3. 执行存库 await sor.C('baidu_kafka_msg', ns_msg) print(f"消息id {messageid} 存储成功") # 4. 触发短信发送(当sendChannel为MOBILE时) send_channel = msg_data.get('sendChannel') if send_channel == 'MOBILE': msg_content = msg_data.get('content') # 判断验证码类短信 | 通知类短信 if '验证码' in msg_content: sms_stype = '百度kafka普通验证码' else: sms_stype = '百度kafka普通通知' # 查询用户手机号 account_id = msg_data.get('accountId') user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) user_local_id = user_local_id_li[0]['user_id'] user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) mobile = user_mobile_li[0]['mobile'] # 调用短信发送接口 kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) if kyy_send_status.get('status'): await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) print(f"已触发短信发送至 {mobile}") except json.JSONDecodeError: print("错误:消息内容非有效JSON") return { 'status': False, 'msg': '错误:消息内容非有效JSON' } except Exception as e: print(f"处理异常: {str(e)}") return { 'status': False, 'msg': f"处理异常: {str(e)}" } consumer.close() # 确保消费者关闭 return { 'status': True, 'msg': '获取信息执行结束' } if __name__ == '__main__': # p = os.getcwd() current_dir = os.getcwd() path_parts = current_dir.split('/') two_levels_up = '/'.join(path_parts[:-2]) config = getConfig(two_levels_up) DBPools(config.databases) loop = asyncio.get_event_loop() print(loop.run_until_complete(baidu_sms_kafka_consumer()))