@ -6,29 +6,30 @@ async def time_convert(resoucetime=None):
|
|||||||
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
async def baidu_sms_kafka_consumer(ns={}):
|
async def baidu_sms_kafka_consumer(ns={}):
|
||||||
import os
|
# consumer = BaiduKafKaConsumer({
|
||||||
consumer = BaiduKafKaConsumer({
|
# # 接入点
|
||||||
# 接入点
|
# 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
|
||||||
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
|
# # 接入协议
|
||||||
# 接入协议
|
# 'security.protocol': 'SASL_SSL',
|
||||||
'security.protocol': 'SASL_SSL',
|
# 'ssl.endpoint.identification.algorithm': 'none',
|
||||||
'ssl.endpoint.identification.algorithm': 'none',
|
# # 证书文件路径
|
||||||
# 证书文件路径
|
# 'ssl.ca.location': 'baidu_kafka_ca.pem',
|
||||||
'ssl.ca.location': 'baidu_kafka_ca.pem',
|
# # SASL 机制
|
||||||
# SASL 机制
|
# 'sasl.mechanism': 'SCRAM-SHA-512',
|
||||||
'sasl.mechanism': 'SCRAM-SHA-512',
|
# # SASL 用户名
|
||||||
# SASL 用户名
|
# 'sasl.username': 'kaiyuanyun',
|
||||||
'sasl.username': 'kaiyuanyun',
|
# # SASL 用户密码
|
||||||
# SASL 用户密码
|
# 'sasl.password': 'Kyy250609#',
|
||||||
'sasl.password': 'Kyy250609#',
|
# # 消费组id
|
||||||
# 消费组id
|
# 'group.id': 'kaiyuanyun_msg_group',
|
||||||
'group.id': 'kaiyuanyun_msg_group',
|
# 'auto.offset.reset': 'latest',
|
||||||
'auto.offset.reset': 'latest',
|
# 'fetch.message.max.bytes': '1024*512',
|
||||||
'fetch.message.max.bytes': '1024*512',
|
# })
|
||||||
})
|
|
||||||
|
|
||||||
# 订阅的主题名称
|
# # 订阅的主题名称
|
||||||
consumer.subscribe(['kaiyuanyun_msg_topic'])
|
# consumer.subscribe(['kaiyuanyun_msg_topic'])
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
|
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
|
||||||
for filename in files:
|
for filename in files:
|
||||||
@ -38,29 +39,17 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
total_count = 0
|
msg = ns.get('msg')
|
||||||
for i in range(10):
|
|
||||||
# if i == 0:
|
|
||||||
# # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
|
|
||||||
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
|
||||||
# f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
||||||
|
|
||||||
msg = consumer.poll(0.01) # 单次轮询获取消息
|
# if msg.error():
|
||||||
|
# # 写入日志文件记录错误信息
|
||||||
if msg is None:
|
# with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
||||||
if i == 10:
|
# f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
|
||||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
# else:
|
||||||
f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
||||||
continue
|
|
||||||
elif msg.error():
|
|
||||||
# 写入日志文件记录错误信息
|
|
||||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
|
|
||||||
else:
|
|
||||||
total_count += 1
|
|
||||||
try:
|
try:
|
||||||
# 解析消息内容为字典(避免变量名冲突)
|
# 解析消息内容为字典(避免变量名冲突)
|
||||||
msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
# msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
||||||
|
msg_data_ori = json.loads(msg)
|
||||||
msg_data = msg_data_ori['messages'][0]
|
msg_data = msg_data_ori['messages'][0]
|
||||||
messageid = msg_data.get('id')
|
messageid = msg_data.get('id')
|
||||||
taskid = msg_data.get('taskId')
|
taskid = msg_data.get('taskId')
|
||||||
@ -71,7 +60,7 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
|
|
||||||
if messageid in content:
|
if messageid in content:
|
||||||
print(f"文件中已存在 '{messageid}',跳过写入")
|
print(f"文件中已存在 '{messageid}',跳过写入")
|
||||||
continue
|
return
|
||||||
else:
|
else:
|
||||||
# 追加写入目标内容
|
# 追加写入目标内容
|
||||||
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
|
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
|
||||||
@ -87,9 +76,7 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
||||||
if exist_msg:
|
if exist_msg:
|
||||||
print(f"消息id {messageid} 已存在,跳过处理")
|
print(f"消息id {messageid} 已存在,跳过处理")
|
||||||
continue
|
return
|
||||||
# consumer.close()
|
|
||||||
# return
|
|
||||||
|
|
||||||
# 2. 构建小写key的ns字典(完整映射所有字段)
|
# 2. 构建小写key的ns字典(完整映射所有字段)
|
||||||
ns_msg = {
|
ns_msg = {
|
||||||
@ -166,15 +153,16 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"处理异常: {str(e)}")
|
print(f"处理异常: {str(e)}")
|
||||||
|
import traceback
|
||||||
|
with open('baidu_kafka_error.txt', 'w') as f:
|
||||||
|
f.write(str(e)+ traceback.format_exc())
|
||||||
|
traceback.print_exc()
|
||||||
return {
|
return {
|
||||||
'status': False,
|
'status': False,
|
||||||
'msg': f"处理异常: {str(e)}"
|
'msg': f"处理异常: {str(e)}"
|
||||||
}
|
}
|
||||||
# 记录total_count
|
|
||||||
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
|
||||||
# f.write(f"本次轮询共处理消息数:{total_count}\n")
|
|
||||||
|
|
||||||
consumer.close() # 确保消费者关闭
|
# consumer.close() # 确保消费者关闭
|
||||||
return {
|
return {
|
||||||
'status': True,
|
'status': True,
|
||||||
'msg': '获取信息执行结束'
|
'msg': '获取信息执行结束'
|
||||||
|
|||||||
@ -100,7 +100,7 @@ Vue.use(HappyScroll)
|
|||||||
// });
|
// });
|
||||||
|
|
||||||
// // 禁止F12和开发者工具快捷键
|
// // 禁止F12和开发者工具快捷键
|
||||||
// document.addEventListener('keydown', function(e) {
|
// document.addEventListener('keydown', function(e ) {
|
||||||
// // 禁止F12
|
// // 禁止F12
|
||||||
// if (e.key === 'F12') {
|
// if (e.key === 'F12') {
|
||||||
// e.preventDefault();
|
// e.preventDefault();
|
||||||
|
|||||||
|
Before Width: | Height: | Size: 2.1 KiB After Width: | Height: | Size: 8.3 KiB |
|
Before Width: | Height: | Size: 2.4 KiB After Width: | Height: | Size: 8.7 KiB |
|
Before Width: | Height: | Size: 2.0 KiB After Width: | Height: | Size: 8.3 KiB |
|
Before Width: | Height: | Size: 2.3 KiB After Width: | Height: | Size: 8.3 KiB |
|
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 8.3 KiB |
@ -550,8 +550,8 @@ export default Vue.extend({
|
|||||||
}
|
}
|
||||||
|
|
||||||
.function-icon {
|
.function-icon {
|
||||||
width: 30px;
|
width: 100%;
|
||||||
height: 30px;
|
height: 100%;
|
||||||
}
|
}
|
||||||
|
|
||||||
.function-content {
|
.function-content {
|
||||||
|
|||||||
@ -142,8 +142,8 @@ export default Vue.extend({
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
created() {
|
created() {
|
||||||
// 移除 initMybalance 调用,因为现在使用 computed 的 mybalance
|
|
||||||
this.initMybalance(); // ❌ 删除这一行
|
this.initMybalance();
|
||||||
this.getUnreadMsgCount();
|
this.getUnreadMsgCount();
|
||||||
this.fetchTodoCount();
|
this.fetchTodoCount();
|
||||||
},
|
},
|
||||||
@ -186,8 +186,8 @@ export default Vue.extend({
|
|||||||
const iconIndex = index % this.navIcons.length;
|
const iconIndex = index % this.navIcons.length;
|
||||||
return this.navIcons[iconIndex];
|
return this.navIcons[iconIndex];
|
||||||
},
|
},
|
||||||
// 移除 initMybalance 方法,因为现在使用 computed 的 mybalance
|
|
||||||
async initMybalance() { // ❌ 删除这个方法
|
async initMybalance() {
|
||||||
const res = await editReachargelogAPI()
|
const res = await editReachargelogAPI()
|
||||||
if (res.status) {
|
if (res.status) {
|
||||||
this.mybalance = res.data
|
this.mybalance = res.data
|
||||||
|
|||||||
@ -4,8 +4,8 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
import datetime
|
# import datetime
|
||||||
from sqlor.dbpools import DBPools
|
# from sqlor.dbpools import DBPools
|
||||||
# from appPublic.jsonConfig import getConfig
|
# from appPublic.jsonConfig import getConfig
|
||||||
from appPublic.uniqueID import getID as uuid
|
from appPublic.uniqueID import getID as uuid
|
||||||
from confluent_kafka import Consumer as BaiduKafKaConsumer
|
from confluent_kafka import Consumer as BaiduKafKaConsumer
|
||||||
@ -18,76 +18,78 @@ from appPublic.Singleton import SingletonDecorator
|
|||||||
from appPublic.folderUtils import ProgramPath
|
from appPublic.folderUtils import ProgramPath
|
||||||
from appPublic.argsConvert import ArgsConvert
|
from appPublic.argsConvert import ArgsConvert
|
||||||
|
|
||||||
from baiDuSmsClient import send_baidu_vcode as send_vcode
|
# from baiDuSmsClient import send_baidu_vcode as send_vcode
|
||||||
|
|
||||||
|
from aiohttp import client as aiohttp_client
|
||||||
|
|
||||||
|
|
||||||
def key2ansi(dict):
|
# def key2ansi(dict):
|
||||||
# print dict
|
# # print dict
|
||||||
return dict
|
# return dict
|
||||||
a = {}
|
# a = {}
|
||||||
for k, v in dict.items():
|
# for k, v in dict.items():
|
||||||
k = k.encode('utf-8')
|
# k = k.encode('utf-8')
|
||||||
# if type(v) == type(u" "):
|
# # if type(v) == type(u" "):
|
||||||
# v = v.encode('utf-8')
|
# # v = v.encode('utf-8')
|
||||||
a[k] = v
|
# a[k] = v
|
||||||
|
|
||||||
return a
|
# return a
|
||||||
|
|
||||||
class JsonObject(DictObject):
|
# class JsonObject(DictObject):
|
||||||
"""
|
# """
|
||||||
JsonObject class load json from a json file
|
# JsonObject class load json from a json file
|
||||||
"""
|
# """
|
||||||
|
|
||||||
def __init__(self, jsonholder, keytype='ansi', NS=None):
|
# def __init__(self, jsonholder, keytype='ansi', NS=None):
|
||||||
jhtype = type(jsonholder)
|
# jhtype = type(jsonholder)
|
||||||
if jhtype == type("") or jhtype == type(u''):
|
# if jhtype == type("") or jhtype == type(u''):
|
||||||
f = open(jsonholder, 'r')
|
# f = open(jsonholder, 'r')
|
||||||
else:
|
# else:
|
||||||
f = jsonholder
|
# f = jsonholder
|
||||||
try:
|
# try:
|
||||||
a = json.load(f)
|
# a = json.load(f)
|
||||||
except Exception as e:
|
# except Exception as e:
|
||||||
print("exception:", self.__jsonholder__, e)
|
# print("exception:", self.__jsonholder__, e)
|
||||||
raise e
|
# raise e
|
||||||
finally:
|
# finally:
|
||||||
if type(jsonholder) == type(""):
|
# if type(jsonholder) == type(""):
|
||||||
f.close()
|
# f.close()
|
||||||
|
|
||||||
if NS is not None:
|
# if NS is not None:
|
||||||
ac = ArgsConvert('$[', ']$')
|
# ac = ArgsConvert('$[', ']$')
|
||||||
a = ac.convert(a, NS)
|
# a = ac.convert(a, NS)
|
||||||
a['__jsonholder__'] = jsonholder
|
# a['__jsonholder__'] = jsonholder
|
||||||
a['NS'] = NS
|
# a['NS'] = NS
|
||||||
DictObject.__init__(self, **a)
|
# DictObject.__init__(self, **a)
|
||||||
|
|
||||||
|
|
||||||
@SingletonDecorator
|
# @SingletonDecorator
|
||||||
class JsonConfig(JsonObject):
|
# class JsonConfig(JsonObject):
|
||||||
pass
|
# pass
|
||||||
|
|
||||||
|
|
||||||
def getConfig(path=None, NS=None):
|
# def getConfig(path=None, NS=None):
|
||||||
pp = ProgramPath()
|
# pp = ProgramPath()
|
||||||
if path == None:
|
# if path == None:
|
||||||
path = os.getcwd()
|
# path = os.getcwd()
|
||||||
cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json"))
|
# cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json"))
|
||||||
# print __name__,cfname
|
# # print __name__,cfname
|
||||||
ns = {
|
# ns = {
|
||||||
'home': str(Path.home()),
|
# 'home': str(Path.home()),
|
||||||
'workdir': path,
|
# 'workdir': path,
|
||||||
'ProgramPath': pp
|
# 'ProgramPath': pp
|
||||||
}
|
# }
|
||||||
if NS is not None:
|
# if NS is not None:
|
||||||
ns.update(NS)
|
# ns.update(NS)
|
||||||
a = JsonConfig(cfname, NS=ns)
|
# a = JsonConfig(cfname, NS=ns)
|
||||||
return a
|
# return a
|
||||||
|
|
||||||
async def time_convert(resoucetime=None):
|
# async def time_convert(resoucetime=None):
|
||||||
if not resoucetime:
|
# if not resoucetime:
|
||||||
return
|
# return
|
||||||
utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
|
# utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
|
||||||
beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
|
# beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
|
||||||
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
# return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
async def baidu_sms_kafka_consumer(ns={}):
|
async def baidu_sms_kafka_consumer(ns={}):
|
||||||
import os
|
import os
|
||||||
@ -98,7 +100,7 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
'security.protocol': 'SASL_SSL',
|
'security.protocol': 'SASL_SSL',
|
||||||
'ssl.endpoint.identification.algorithm': 'none',
|
'ssl.endpoint.identification.algorithm': 'none',
|
||||||
# 证书文件路径
|
# 证书文件路径
|
||||||
'ssl.ca.location': 'baidu_kafka_ca.pem',
|
'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem',
|
||||||
# SASL 机制
|
# SASL 机制
|
||||||
'sasl.mechanism': 'SCRAM-SHA-512',
|
'sasl.mechanism': 'SCRAM-SHA-512',
|
||||||
# SASL 用户名
|
# SASL 用户名
|
||||||
@ -111,153 +113,59 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
'fetch.message.max.bytes': '1024*512',
|
'fetch.message.max.bytes': '1024*512',
|
||||||
})
|
})
|
||||||
|
|
||||||
# 订阅的主题名称
|
# 创建日志文件
|
||||||
consumer.subscribe(['kaiyuanyun_msg_topic'])
|
files = "baidu_kafka_msg.txt"
|
||||||
|
|
||||||
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
|
|
||||||
for filename in files:
|
|
||||||
if not os.path.exists(filename):
|
if not os.path.exists(filename):
|
||||||
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
|
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
|
||||||
pass # 创建空文件
|
pass # 创建空文件
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# total_count = 0
|
# 订阅的主题名称
|
||||||
|
consumer.subscribe(['kaiyuanyun_msg_topic'])
|
||||||
while True:
|
while True:
|
||||||
i = 0
|
msg = consumer.poll(2) # 单次轮询获取消息
|
||||||
if i == 0:
|
|
||||||
# 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
|
|
||||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
||||||
|
|
||||||
msg = consumer.poll(0.1) # 单次轮询获取消息
|
|
||||||
|
|
||||||
if msg is None:
|
if msg is None:
|
||||||
if i == 10:
|
|
||||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
||||||
continue
|
continue
|
||||||
elif msg.error():
|
elif msg.error():
|
||||||
# 写入日志文件记录错误信息
|
pass
|
||||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
|
|
||||||
else:
|
else:
|
||||||
# total_count += 1
|
|
||||||
try:
|
try:
|
||||||
# 解析消息内容为字典(避免变量名冲突)
|
|
||||||
msg_data_ori = json.loads(msg.value().decode('utf-8'))
|
|
||||||
msg_data = msg_data_ori['messages'][0]
|
|
||||||
messageid = msg_data.get('id')
|
|
||||||
taskid = msg_data.get('taskId')
|
|
||||||
|
|
||||||
# 读取文件内容进行检查
|
|
||||||
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
|
|
||||||
content = f.read()
|
|
||||||
|
|
||||||
if messageid in content:
|
|
||||||
print(f"文件中已存在 '{messageid}',跳过写入")
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
# 追加写入目标内容
|
|
||||||
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(messageid + '\n')
|
|
||||||
print(f"已写入 '{messageid}' 到文件")
|
|
||||||
|
|
||||||
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
||||||
f.write(str(msg_data) + '\n')
|
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n")
|
||||||
|
|
||||||
db = DBPools()
|
# print('Received message: {}'.format(msg.value().decode('utf-8')))
|
||||||
async with db.sqlorContext('kboss') as sor:
|
method = "POST"
|
||||||
# 1. 去重检查(messageid和taskid)
|
url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy'
|
||||||
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
|
header = {
|
||||||
if exist_msg:
|
"Host": "www.opencomputing.cn",
|
||||||
print(f"消息id {messageid} 已存在,跳过处理")
|
# "Content-Type": "application/json"
|
||||||
continue
|
|
||||||
# consumer.close()
|
|
||||||
# return
|
|
||||||
|
|
||||||
# 2. 构建小写key的ns字典(完整映射所有字段)
|
|
||||||
ns_msg = {
|
|
||||||
'id': uuid(),
|
|
||||||
'messageid': messageid,
|
|
||||||
'taskid': taskid,
|
|
||||||
'userid': msg_data.get('userId'),
|
|
||||||
'accountid': msg_data.get('accountId'),
|
|
||||||
'usertype': msg_data.get('userType'),
|
|
||||||
'receiverid': msg_data.get('receiverId'),
|
|
||||||
'contentvar': msg_data.get('contentVar'),
|
|
||||||
'sendchannel': msg_data.get('sendChannel'),
|
|
||||||
'content': msg_data.get('content'),
|
|
||||||
'messagetemplateid': msg_data.get('messageTemplateId'),
|
|
||||||
'isvirtualstore': msg_data.get('isVirtualStore'),
|
|
||||||
'channelmessageid': msg_data.get('channelMessageId'),
|
|
||||||
'channelstatus': msg_data.get('channelStatus'),
|
|
||||||
'majorcategory': msg_data.get('majorCategory'),
|
|
||||||
'minorcategory': msg_data.get('minorCategory'),
|
|
||||||
'status': msg_data.get('status'),
|
|
||||||
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
|
|
||||||
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
|
|
||||||
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
|
|
||||||
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
|
|
||||||
'valid': msg_data.get('valid'),
|
|
||||||
'complete': msg_data.get('complete'),
|
|
||||||
'disturbhold': msg_data.get('disturbHold'),
|
|
||||||
'sendcomplete': msg_data.get('sendComplete')
|
|
||||||
}
|
}
|
||||||
|
data = {
|
||||||
# 3. 执行存库
|
'msg': msg.value().decode('utf-8')
|
||||||
await sor.C('baidu_kafka_msg', ns_msg)
|
}
|
||||||
# print(f"消息id {messageid} 存储成功")
|
async with aiohttp_client.request(
|
||||||
|
method=method,
|
||||||
# 4. 触发短信发送(当sendChannel为MOBILE时)
|
url=url,
|
||||||
send_channel = msg_data.get('sendChannel')
|
headers=header,
|
||||||
if send_channel == 'MOBILE':
|
data=data) as res:
|
||||||
msg_content = msg_data.get('content')
|
data_ = await res.text()
|
||||||
|
# print("Response data:", data_)
|
||||||
# 判断验证码类短信 | 通知类短信
|
|
||||||
if '验证码' in msg_content:
|
|
||||||
sms_stype = '百度kafka普通验证码'
|
|
||||||
else:
|
|
||||||
sms_stype = '百度kafka普通通知'
|
|
||||||
|
|
||||||
# 查询用户手机号
|
|
||||||
account_id = msg_data.get('accountId')
|
|
||||||
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
|
|
||||||
if user_local_id_li:
|
|
||||||
user_local_id = user_local_id_li[0]['user_id']
|
|
||||||
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
|
|
||||||
mobile = user_mobile_li[0]['mobile']
|
|
||||||
|
|
||||||
# 调用短信发送接口
|
|
||||||
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
|
|
||||||
if kyy_send_status.get('status'):
|
|
||||||
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
|
|
||||||
else:
|
|
||||||
# 记录错误日志 短信发送失败
|
|
||||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
|
|
||||||
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
|
|
||||||
else:
|
|
||||||
# 记录错误日志 用户未找到
|
|
||||||
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
|
|
||||||
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
|
|
||||||
print(f"未找到百度用户ID {account_id} 对应的本地用户")
|
|
||||||
|
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
print("错误:消息内容非有效JSON")
|
|
||||||
return {
|
return {
|
||||||
'status': False,
|
'status': False,
|
||||||
'msg': '错误:消息内容非有效JSON'
|
'msg': '错误:消息内容非有效JSON'
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"处理异常: {str(e)}")
|
import traceback
|
||||||
|
print(str(e)+ traceback.format_exc())
|
||||||
|
traceback.print_exc()
|
||||||
return {
|
return {
|
||||||
'status': False,
|
'status': False,
|
||||||
'msg': f"处理异常: {str(e)}"
|
'msg': f"处理异常: {str(e)}"
|
||||||
}
|
}
|
||||||
# 记录total_count
|
|
||||||
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
|
|
||||||
# f.write(f"本次轮询共处理消息数:{total_count}\n")
|
|
||||||
|
|
||||||
consumer.close() # 确保消费者关闭
|
consumer.close() # 确保消费者关闭
|
||||||
return {
|
return {
|
||||||
@ -267,10 +175,10 @@ async def baidu_sms_kafka_consumer(ns={}):
|
|||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
# p = os.getcwd()
|
# p = os.getcwd()
|
||||||
current_dir = os.getcwd()
|
# current_dir = os.getcwd()
|
||||||
path_parts = current_dir.split('/')
|
# path_parts = current_dir.split('/')
|
||||||
two_levels_up = '/'.join(path_parts[:-2])
|
# two_levels_up = '/'.join(path_parts[:-2])
|
||||||
config = getConfig(two_levels_up)
|
# config = getConfig(two_levels_up)
|
||||||
DBPools(config.databases)
|
# DBPools(config.databases)
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
print(loop.run_until_complete(baidu_sms_kafka_consumer()))
|
print(loop.run_until_complete(baidu_sms_kafka_consumer()))
|
||||||
|
|||||||