This commit is contained in:
hrx 2025-11-25 15:53:20 +08:00
commit 649f93e02a
3 changed files with 265 additions and 358 deletions

View File

@ -72,15 +72,20 @@ async def affirmbz_order(ns={}):
resource_find_sql = """select id, resourceid, expire_resourceid from customer_goods where FIND_IN_SET('%s', resourceid) and del_flg = '0';""" % j['resourceids'] resource_find_sql = """select id, resourceid, expire_resourceid from customer_goods where FIND_IN_SET('%s', resourceid) and del_flg = '0';""" % j['resourceids']
resource_find_li = await sor.sqlExe(resource_find_sql, {}) resource_find_li = await sor.sqlExe(resource_find_sql, {})
resource_find_id = resource_find_li[0]['id'] resource_find_id = resource_find_li[0]['id']
expire_resourceid = resource_find_li[0]['expire_resourceid'] expire_resourceid = resource_find_li[0]['expire_resourceid']
expire_resourceid += expire_resourceid + ',' + j['resourceids'] if expire_resourceid else j['resourceids'] if expire_resourceid:
new_expire_resourceid = expire_resourceid + ',' + j['resourceids']
else:
new_expire_resourceid = j['resourceids']
items_refund = resource_find_li[0]['resourceid'].split(',') if resource_find_li[0]['resourceid'] else [] items_refund = resource_find_li[0]['resourceid'].split(',') if resource_find_li[0]['resourceid'] else []
filtered_items = [item for item in items_refund if item != j['resourceids']] filtered_items = [item for item in items_refund if item != j['resourceids']]
result = ','.join(filtered_items) result = ','.join(filtered_items)
if not result: if not result:
await sor.U('customer_goods', {'id': resource_find_id, 'del_flg': '1'}) await sor.U('customer_goods', {'id': resource_find_id, 'del_flg': '1'})
else: else:
await sor.U('customer_goods', {'id': resource_find_id, 'resourceid': result, 'expire_resourceid': expire_resourceid}) await sor.U('customer_goods', {'id': resource_find_id, 'resourceid': result, 'expire_resourceid': new_expire_resourceid})
# 处理续费逻辑 # 处理续费逻辑
elif order_type == 'RENEW': elif order_type == 'RENEW':
@ -118,7 +123,11 @@ async def affirmbz_order(ns={}):
return {'status': True, 'msg': '支付成功'} return {'status': True, 'msg': '支付成功'}
except Exception as error: except Exception as error:
await sor.rollback() await sor.rollback()
return {'status': False, 'msg': str(error)} import traceback
with open('baiducloud_err.txt', 'w') as f:
f.write(str(error) + str(traceback.format_exc()))
traceback.print_exc()
return {'status': False, 'msg': str(error) + str(traceback.format_exc())}
async def baidu_new_update_resouce(ns={}): async def baidu_new_update_resouce(ns={}):
@ -514,7 +523,8 @@ async def get_baidu_orderlist(ns={}):
await user_action_record(ns_record) await user_action_record(ns_record)
return { return {
'status': False, 'status': False,
'msg': '支付错误, 请联系售后' 'msg': '支付错误, 请联系售后,',
'data': affirmbz_order_res
} }
# 预配置local_refund用于本地操作 # 预配置local_refund用于本地操作
@ -737,8 +747,8 @@ async def update_baidu_order_list(ns={}):
async def baidu_confirm_refund_order(ns={}): async def baidu_confirm_refund_order(ns={}):
# ns = { # ns = {
# 'order_id': ["13f3518648054796abf7f3d00ed611bf"], # 'order_id': ["2996f0baf34c4a0a98e1da0b4e290a35"],
# 'userid': 'KsKhCUPizQyGiw3L1WVRy' # 'userid': 'y_xQK0G62dtZT5EneMQFT'
# } # }
import asyncio import asyncio
# 把 NEED_CONFIRM的订单同步到本地库用于后续状态更新 # 把 NEED_CONFIRM的订单同步到本地库用于后续状态更新

View File

@ -6,29 +6,30 @@ async def time_convert(resoucetime=None):
return beijing_time.strftime("%Y-%m-%d %H:%M:%S") return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}): async def baidu_sms_kafka_consumer(ns={}):
import os # consumer = BaiduKafKaConsumer({
consumer = BaiduKafKaConsumer({ # # 接入点
# 接入点 # 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', # # 接入协议
# 接入协议 # 'security.protocol': 'SASL_SSL',
'security.protocol': 'SASL_SSL', # 'ssl.endpoint.identification.algorithm': 'none',
'ssl.endpoint.identification.algorithm': 'none', # # 证书文件路径
# 证书文件路径 # 'ssl.ca.location': 'baidu_kafka_ca.pem',
'ssl.ca.location': 'baidu_kafka_ca.pem', # # SASL 机制
# SASL 机制 # 'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.mechanism': 'SCRAM-SHA-512', # # SASL 用户名
# SASL 用户名 # 'sasl.username': 'kaiyuanyun',
'sasl.username': 'kaiyuanyun', # # SASL 用户密码
# SASL 用户密码 # 'sasl.password': 'Kyy250609#',
'sasl.password': 'Kyy250609#', # # 消费组id
# 消费组id # 'group.id': 'kaiyuanyun_msg_group',
'group.id': 'kaiyuanyun_msg_group', # 'auto.offset.reset': 'latest',
'auto.offset.reset': 'latest', # 'fetch.message.max.bytes': '1024*512',
'fetch.message.max.bytes': '1024*512', # })
})
# 订阅的主题名称 # # 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic']) # consumer.subscribe(['kaiyuanyun_msg_topic'])
import os
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
for filename in files: for filename in files:
@ -38,29 +39,17 @@ async def baidu_sms_kafka_consumer(ns={}):
else: else:
pass pass
total_count = 0 msg = ns.get('msg')
for i in range(10):
# if i == 0:
# # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
msg = consumer.poll(0.01) # 单次轮询获取消息 # if msg.error():
# # 写入日志文件记录错误信息
if msg is None: # with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
if i == 10: # f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: # else:
f.write(f"轮询第10次消息为None{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
continue
elif msg.error():
# 写入日志文件记录错误信息
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else:
total_count += 1
try: try:
# 解析消息内容为字典(避免变量名冲突) # 解析消息内容为字典(避免变量名冲突)
msg_data_ori = json.loads(msg.value().decode('utf-8')) # msg_data_ori = json.loads(msg.value().decode('utf-8'))
msg_data_ori = json.loads(msg)
msg_data = msg_data_ori['messages'][0] msg_data = msg_data_ori['messages'][0]
messageid = msg_data.get('id') messageid = msg_data.get('id')
taskid = msg_data.get('taskId') taskid = msg_data.get('taskId')
@ -71,7 +60,7 @@ async def baidu_sms_kafka_consumer(ns={}):
if messageid in content: if messageid in content:
print(f"文件中已存在 '{messageid}',跳过写入") print(f"文件中已存在 '{messageid}',跳过写入")
continue return
else: else:
# 追加写入目标内容 # 追加写入目标内容
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
@ -87,9 +76,7 @@ async def baidu_sms_kafka_consumer(ns={}):
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg: if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理") print(f"消息id {messageid} 已存在,跳过处理")
continue return
# consumer.close()
# return
# 2. 构建小写key的ns字典完整映射所有字段 # 2. 构建小写key的ns字典完整映射所有字段
ns_msg = { ns_msg = {
@ -166,15 +153,16 @@ async def baidu_sms_kafka_consumer(ns={}):
} }
except Exception as e: except Exception as e:
print(f"处理异常: {str(e)}") print(f"处理异常: {str(e)}")
import traceback
with open('baidu_kafka_error.txt', 'w') as f:
f.write(str(e)+ traceback.format_exc())
traceback.print_exc()
return { return {
'status': False, 'status': False,
'msg': f"处理异常: {str(e)}" 'msg': f"处理异常: {str(e)}"
} }
# 记录total_count
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"本次轮询共处理消息数:{total_count}\n")
consumer.close() # 确保消费者关闭 # consumer.close() # 确保消费者关闭
return { return {
'status': True, 'status': True,
'msg': '获取信息执行结束' 'msg': '获取信息执行结束'

View File

@ -4,8 +4,8 @@
import os import os
import json import json
import asyncio import asyncio
import datetime # import datetime
from sqlor.dbpools import DBPools # from sqlor.dbpools import DBPools
# from appPublic.jsonConfig import getConfig # from appPublic.jsonConfig import getConfig
from appPublic.uniqueID import getID as uuid from appPublic.uniqueID import getID as uuid
from confluent_kafka import Consumer as BaiduKafKaConsumer from confluent_kafka import Consumer as BaiduKafKaConsumer
@ -18,76 +18,79 @@ from appPublic.Singleton import SingletonDecorator
from appPublic.folderUtils import ProgramPath from appPublic.folderUtils import ProgramPath
from appPublic.argsConvert import ArgsConvert from appPublic.argsConvert import ArgsConvert
from baiDuSmsClient import send_baidu_vcode as send_vcode # from baiDuSmsClient import send_baidu_vcode as send_vcode
from aiohttp import client as aiohttp_client
import datetime
def key2ansi(dict): # def key2ansi(dict):
# print dict # # print dict
return dict # return dict
a = {} # a = {}
for k, v in dict.items(): # for k, v in dict.items():
k = k.encode('utf-8') # k = k.encode('utf-8')
# if type(v) == type(u" "): # # if type(v) == type(u" "):
# v = v.encode('utf-8') # # v = v.encode('utf-8')
a[k] = v # a[k] = v
return a # return a
class JsonObject(DictObject): # class JsonObject(DictObject):
""" # """
JsonObject class load json from a json file # JsonObject class load json from a json file
""" # """
def __init__(self, jsonholder, keytype='ansi', NS=None): # def __init__(self, jsonholder, keytype='ansi', NS=None):
jhtype = type(jsonholder) # jhtype = type(jsonholder)
if jhtype == type("") or jhtype == type(u''): # if jhtype == type("") or jhtype == type(u''):
f = open(jsonholder, 'r') # f = open(jsonholder, 'r')
else: # else:
f = jsonholder # f = jsonholder
try: # try:
a = json.load(f) # a = json.load(f)
except Exception as e: # except Exception as e:
print("exception:", self.__jsonholder__, e) # print("exception:", self.__jsonholder__, e)
raise e # raise e
finally: # finally:
if type(jsonholder) == type(""): # if type(jsonholder) == type(""):
f.close() # f.close()
if NS is not None: # if NS is not None:
ac = ArgsConvert('$[', ']$') # ac = ArgsConvert('$[', ']$')
a = ac.convert(a, NS) # a = ac.convert(a, NS)
a['__jsonholder__'] = jsonholder # a['__jsonholder__'] = jsonholder
a['NS'] = NS # a['NS'] = NS
DictObject.__init__(self, **a) # DictObject.__init__(self, **a)
@SingletonDecorator # @SingletonDecorator
class JsonConfig(JsonObject): # class JsonConfig(JsonObject):
pass # pass
def getConfig(path=None, NS=None): # def getConfig(path=None, NS=None):
pp = ProgramPath() # pp = ProgramPath()
if path == None: # if path == None:
path = os.getcwd() # path = os.getcwd()
cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json")) # cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json"))
# print __name__,cfname # # print __name__,cfname
ns = { # ns = {
'home': str(Path.home()), # 'home': str(Path.home()),
'workdir': path, # 'workdir': path,
'ProgramPath': pp # 'ProgramPath': pp
} # }
if NS is not None: # if NS is not None:
ns.update(NS) # ns.update(NS)
a = JsonConfig(cfname, NS=ns) # a = JsonConfig(cfname, NS=ns)
return a # return a
async def time_convert(resoucetime=None): # async def time_convert(resoucetime=None):
if not resoucetime: # if not resoucetime:
return # return
utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) # utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) # beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
return beijing_time.strftime("%Y-%m-%d %H:%M:%S") # return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}): async def baidu_sms_kafka_consumer(ns={}):
import os import os
@ -111,153 +114,59 @@ async def baidu_sms_kafka_consumer(ns={}):
'fetch.message.max.bytes': '1024*512', 'fetch.message.max.bytes': '1024*512',
}) })
# 订阅的主题名称 # 创建日志文件
consumer.subscribe(['kaiyuanyun_msg_topic']) filename = "baidu_kafka_msg.txt"
if not os.path.exists('/d/zhc/' + filename):
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] with open('/d/zhc/' + filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
for filename in files:
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
pass # 创建空文件 pass # 创建空文件
else: else:
pass pass
# total_count = 0 # 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic'])
while True: while True:
i = 0 msg = consumer.poll(1) # 单次轮询获取消息
if i == 0:
# 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
msg = consumer.poll(0.1) # 单次轮询获取消息
if msg is None: if msg is None:
if i == 10:
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询第10次消息为None{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
continue continue
elif msg.error(): elif msg.error():
# 写入日志文件记录错误信息 pass
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else: else:
# total_count += 1
try: try:
# 解析消息内容为字典(避免变量名冲突) with open('/d/zhc/baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
msg_data_ori = json.loads(msg.value().decode('utf-8')) f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n")
msg_data = msg_data_ori['messages'][0]
messageid = msg_data.get('id')
taskid = msg_data.get('taskId')
# 读取文件内容进行检查 # print('Received message: {}'.format(msg.value().decode('utf-8')))
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: method = "POST"
content = f.read() url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy'
header = {
if messageid in content: "Host": "www.opencomputing.cn",
print(f"文件中已存在 '{messageid}',跳过写入") # "Content-Type": "application/json"
continue
else:
# 追加写入目标内容
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
f.write(messageid + '\n')
print(f"已写入 '{messageid}' 到文件")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# 1. 去重检查messageid和taskid
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理")
continue
# consumer.close()
# return
# 2. 构建小写key的ns字典完整映射所有字段
ns_msg = {
'id': uuid(),
'messageid': messageid,
'taskid': taskid,
'userid': msg_data.get('userId'),
'accountid': msg_data.get('accountId'),
'usertype': msg_data.get('userType'),
'receiverid': msg_data.get('receiverId'),
'contentvar': msg_data.get('contentVar'),
'sendchannel': msg_data.get('sendChannel'),
'content': msg_data.get('content'),
'messagetemplateid': msg_data.get('messageTemplateId'),
'isvirtualstore': msg_data.get('isVirtualStore'),
'channelmessageid': msg_data.get('channelMessageId'),
'channelstatus': msg_data.get('channelStatus'),
'majorcategory': msg_data.get('majorCategory'),
'minorcategory': msg_data.get('minorCategory'),
'status': msg_data.get('status'),
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
'valid': msg_data.get('valid'),
'complete': msg_data.get('complete'),
'disturbhold': msg_data.get('disturbHold'),
'sendcomplete': msg_data.get('sendComplete')
} }
data = {
# 3. 执行存库 'msg': msg.value().decode('utf-8')
await sor.C('baidu_kafka_msg', ns_msg) }
# print(f"消息id {messageid} 存储成功") async with aiohttp_client.request(
method=method,
# 4. 触发短信发送当sendChannel为MOBILE时 url=url,
send_channel = msg_data.get('sendChannel') headers=header,
if send_channel == 'MOBILE': data=data) as res:
msg_content = msg_data.get('content') data_ = await res.text()
# print("Response data:", data_)
# 判断验证码类短信 | 通知类短信
if '验证码' in msg_content:
sms_stype = '百度kafka普通验证码'
else:
sms_stype = '百度kafka普通通知'
# 查询用户手机号
account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户")
except json.JSONDecodeError: except json.JSONDecodeError:
print("错误消息内容非有效JSON")
return { return {
'status': False, 'status': False,
'msg': '错误消息内容非有效JSON' 'msg': '错误消息内容非有效JSON'
} }
except Exception as e: except Exception as e:
print(f"处理异常: {str(e)}") import traceback
print(str(e)+ traceback.format_exc())
traceback.print_exc()
return { return {
'status': False, 'status': False,
'msg': f"处理异常: {str(e)}" 'msg': f"处理异常: {str(e)}"
} }
# 记录total_count
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"本次轮询共处理消息数:{total_count}\n")
consumer.close() # 确保消费者关闭 consumer.close() # 确保消费者关闭
return { return {
@ -267,10 +176,10 @@ async def baidu_sms_kafka_consumer(ns={}):
if __name__ == '__main__': if __name__ == '__main__':
# p = os.getcwd() # p = os.getcwd()
current_dir = os.getcwd() # current_dir = os.getcwd()
path_parts = current_dir.split('/') # path_parts = current_dir.split('/')
two_levels_up = '/'.join(path_parts[:-2]) # two_levels_up = '/'.join(path_parts[:-2])
config = getConfig(two_levels_up) # config = getConfig(two_levels_up)
DBPools(config.databases) # DBPools(config.databases)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
print(loop.run_until_complete(baidu_sms_kafka_consumer())) print(loop.run_until_complete(baidu_sms_kafka_consumer()))