Compare commits

..

No commits in common. "109e1156c8c16e2c6e2a5375354191c88331dd5f" and "9e5d0466608f8828509af842581263ececed0766" have entirely different histories.

10 changed files with 359 additions and 255 deletions

View File

@ -6,30 +6,29 @@ async def time_convert(resoucetime=None):
return beijing_time.strftime("%Y-%m-%d %H:%M:%S") return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}): async def baidu_sms_kafka_consumer(ns={}):
# consumer = BaiduKafKaConsumer({
# # 接入点
# 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
# # 接入协议
# 'security.protocol': 'SASL_SSL',
# 'ssl.endpoint.identification.algorithm': 'none',
# # 证书文件路径
# 'ssl.ca.location': 'baidu_kafka_ca.pem',
# # SASL 机制
# 'sasl.mechanism': 'SCRAM-SHA-512',
# # SASL 用户名
# 'sasl.username': 'kaiyuanyun',
# # SASL 用户密码
# 'sasl.password': 'Kyy250609#',
# # 消费组id
# 'group.id': 'kaiyuanyun_msg_group',
# 'auto.offset.reset': 'latest',
# 'fetch.message.max.bytes': '1024*512',
# })
# # 订阅的主题名称
# consumer.subscribe(['kaiyuanyun_msg_topic'])
import os import os
consumer = BaiduKafKaConsumer({
# 接入点
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
# 接入协议
'security.protocol': 'SASL_SSL',
'ssl.endpoint.identification.algorithm': 'none',
# 证书文件路径
'ssl.ca.location': 'baidu_kafka_ca.pem',
# SASL 机制
'sasl.mechanism': 'SCRAM-SHA-512',
# SASL 用户名
'sasl.username': 'kaiyuanyun',
# SASL 用户密码
'sasl.password': 'Kyy250609#',
# 消费组id
'group.id': 'kaiyuanyun_msg_group',
'auto.offset.reset': 'latest',
'fetch.message.max.bytes': '1024*512',
})
# 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic'])
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
for filename in files: for filename in files:
@ -39,130 +38,143 @@ async def baidu_sms_kafka_consumer(ns={}):
else: else:
pass pass
msg = ns.get('msg') total_count = 0
for i in range(10):
# if msg.error(): # if i == 0:
# # 写入日志文件记录错误信息 # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
# with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# else:
try: msg = consumer.poll(0.01) # 单次轮询获取消息
# 解析消息内容为字典(避免变量名冲突)
# msg_data_ori = json.loads(msg.value().decode('utf-8'))
msg_data_ori = json.loads(msg)
msg_data = msg_data_ori['messages'][0]
messageid = msg_data.get('id')
taskid = msg_data.get('taskId')
# 读取文件内容进行检查 if msg is None:
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: if i == 10:
content = f.read() with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询第10次消息为None{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
if messageid in content: continue
print(f"文件中已存在 '{messageid}',跳过写入") elif msg.error():
return # 写入日志文件记录错误信息
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else: else:
# 追加写入目标内容 total_count += 1
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: try:
f.write(messageid + '\n') # 解析消息内容为字典(避免变量名冲突)
print(f"已写入 '{messageid}' 到文件") msg_data_ori = json.loads(msg.value().decode('utf-8'))
msg_data = msg_data_ori['messages'][0]
messageid = msg_data.get('id')
taskid = msg_data.get('taskId')
# 读取文件内容进行检查
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
content = f.read()
if messageid in content:
print(f"文件中已存在 '{messageid}',跳过写入")
continue
else:
# 追加写入目标内容
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
f.write(messageid + '\n')
print(f"已写入 '{messageid}' 到文件")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# 1. 去重检查messageid和taskid
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理")
continue
# consumer.close()
# return
# 2. 构建小写key的ns字典完整映射所有字段
ns_msg = {
'id': uuid(),
'messageid': messageid,
'taskid': taskid,
'userid': msg_data.get('userId'),
'accountid': msg_data.get('accountId'),
'usertype': msg_data.get('userType'),
'receiverid': msg_data.get('receiverId'),
'contentvar': msg_data.get('contentVar'),
'sendchannel': msg_data.get('sendChannel'),
'content': msg_data.get('content'),
'messagetemplateid': msg_data.get('messageTemplateId'),
'isvirtualstore': msg_data.get('isVirtualStore'),
'channelmessageid': msg_data.get('channelMessageId'),
'channelstatus': msg_data.get('channelStatus'),
'majorcategory': msg_data.get('majorCategory'),
'minorcategory': msg_data.get('minorCategory'),
'status': msg_data.get('status'),
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
'valid': msg_data.get('valid'),
'complete': msg_data.get('complete'),
'disturbhold': msg_data.get('disturbHold'),
'sendcomplete': msg_data.get('sendComplete')
}
# 3. 执行存库
await sor.C('baidu_kafka_msg', ns_msg)
print(f"消息id {messageid} 存储成功")
# 4. 触发短信发送当sendChannel为MOBILE时
send_channel = msg_data.get('sendChannel')
if send_channel == 'MOBILE':
msg_content = msg_data.get('content')
# 判断验证码类短信 | 通知类短信
if '验证码' in msg_content:
sms_stype = '百度kafka普通验证码'
else:
sms_stype = '百度kafka普通通知'
# 查询用户手机号
account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: except json.JSONDecodeError:
f.write(str(msg_data) + '\n') print("错误消息内容非有效JSON")
return {
db = DBPools() 'status': False,
async with db.sqlorContext('kboss') as sor: 'msg': '错误消息内容非有效JSON'
# 1. 去重检查messageid和taskid }
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) except Exception as e:
if exist_msg: print(f"处理异常: {str(e)}")
print(f"消息id {messageid} 已存在,跳过处理") return {
return 'status': False,
'msg': f"处理异常: {str(e)}"
# 2. 构建小写key的ns字典完整映射所有字段 }
ns_msg = { # 记录total_count
'id': uuid(), # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
'messageid': messageid, # f.write(f"本次轮询共处理消息数:{total_count}\n")
'taskid': taskid,
'userid': msg_data.get('userId'),
'accountid': msg_data.get('accountId'),
'usertype': msg_data.get('userType'),
'receiverid': msg_data.get('receiverId'),
'contentvar': msg_data.get('contentVar'),
'sendchannel': msg_data.get('sendChannel'),
'content': msg_data.get('content'),
'messagetemplateid': msg_data.get('messageTemplateId'),
'isvirtualstore': msg_data.get('isVirtualStore'),
'channelmessageid': msg_data.get('channelMessageId'),
'channelstatus': msg_data.get('channelStatus'),
'majorcategory': msg_data.get('majorCategory'),
'minorcategory': msg_data.get('minorCategory'),
'status': msg_data.get('status'),
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
'valid': msg_data.get('valid'),
'complete': msg_data.get('complete'),
'disturbhold': msg_data.get('disturbHold'),
'sendcomplete': msg_data.get('sendComplete')
}
# 3. 执行存库
await sor.C('baidu_kafka_msg', ns_msg)
print(f"消息id {messageid} 存储成功")
# 4. 触发短信发送当sendChannel为MOBILE时
send_channel = msg_data.get('sendChannel')
if send_channel == 'MOBILE':
msg_content = msg_data.get('content')
# 判断验证码类短信 | 通知类短信
if '验证码' in msg_content:
sms_stype = '百度kafka普通验证码'
else:
sms_stype = '百度kafka普通通知'
# 查询用户手机号
account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户")
except json.JSONDecodeError: consumer.close() # 确保消费者关闭
print("错误消息内容非有效JSON")
return {
'status': False,
'msg': '错误消息内容非有效JSON'
}
except Exception as e:
print(f"处理异常: {str(e)}")
import traceback
with open('baidu_kafka_error.txt', 'w') as f:
f.write(str(e)+ traceback.format_exc())
traceback.print_exc()
return {
'status': False,
'msg': f"处理异常: {str(e)}"
}
# consumer.close() # 确保消费者关闭
return { return {
'status': True, 'status': True,
'msg': '获取信息执行结束' 'msg': '获取信息执行结束'

View File

@ -100,7 +100,7 @@ Vue.use(HappyScroll)
// }); // });
// // 禁止F12和开发者工具快捷键 // // 禁止F12和开发者工具快捷键
// document.addEventListener('keydown', function(e ) { // document.addEventListener('keydown', function(e) {
// // 禁止F12 // // 禁止F12
// if (e.key === 'F12') { // if (e.key === 'F12') {
// e.preventDefault(); // e.preventDefault();

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

After

Width:  |  Height:  |  Size: 2.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 2.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

After

Width:  |  Height:  |  Size: 2.0 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

After

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

After

Width:  |  Height:  |  Size: 2.2 KiB

View File

@ -550,8 +550,8 @@ export default Vue.extend({
} }
.function-icon { .function-icon {
width: 100%; width: 30px;
height: 100%; height: 30px;
} }
.function-content { .function-content {

View File

@ -83,7 +83,7 @@
v-for="(item, index) in todoList" v-for="(item, index) in todoList"
:key="index" :key="index"
class="todo-item" class="todo-item"
@click="handleTodoClick(item.name)" @click="handleTodoClick(item.name)"
> >
<span class="todo-name">{{ item.name }}</span> <span class="todo-name">{{ item.name }}</span>
<span class="todo-count">{{ item.count }}</span> <span class="todo-count">{{ item.count }}</span>
@ -142,9 +142,9 @@ export default Vue.extend({
} }
}, },
created() { created() {
// initMybalance 使 computed mybalance
this.initMybalance(); this.initMybalance(); //
this.getUnreadMsgCount(); this.getUnreadMsgCount();
this.fetchTodoCount(); this.fetchTodoCount();
}, },
mounted() { mounted() {
@ -186,8 +186,8 @@ export default Vue.extend({
const iconIndex = index % this.navIcons.length; const iconIndex = index % this.navIcons.length;
return this.navIcons[iconIndex]; return this.navIcons[iconIndex];
}, },
// initMybalance 使 computed mybalance
async initMybalance() { async initMybalance() { //
const res = await editReachargelogAPI() const res = await editReachargelogAPI()
if (res.status) { if (res.status) {
this.mybalance = res.data this.mybalance = res.data

View File

@ -4,8 +4,8 @@
import os import os
import json import json
import asyncio import asyncio
# import datetime import datetime
# from sqlor.dbpools import DBPools from sqlor.dbpools import DBPools
# from appPublic.jsonConfig import getConfig # from appPublic.jsonConfig import getConfig
from appPublic.uniqueID import getID as uuid from appPublic.uniqueID import getID as uuid
from confluent_kafka import Consumer as BaiduKafKaConsumer from confluent_kafka import Consumer as BaiduKafKaConsumer
@ -18,78 +18,76 @@ from appPublic.Singleton import SingletonDecorator
from appPublic.folderUtils import ProgramPath from appPublic.folderUtils import ProgramPath
from appPublic.argsConvert import ArgsConvert from appPublic.argsConvert import ArgsConvert
# from baiDuSmsClient import send_baidu_vcode as send_vcode from baiDuSmsClient import send_baidu_vcode as send_vcode
from aiohttp import client as aiohttp_client
# def key2ansi(dict): def key2ansi(dict):
# # print dict # print dict
# return dict return dict
# a = {} a = {}
# for k, v in dict.items(): for k, v in dict.items():
# k = k.encode('utf-8') k = k.encode('utf-8')
# # if type(v) == type(u" "): # if type(v) == type(u" "):
# # v = v.encode('utf-8') # v = v.encode('utf-8')
# a[k] = v a[k] = v
# return a return a
# class JsonObject(DictObject): class JsonObject(DictObject):
# """ """
# JsonObject class load json from a json file JsonObject class load json from a json file
# """ """
# def __init__(self, jsonholder, keytype='ansi', NS=None): def __init__(self, jsonholder, keytype='ansi', NS=None):
# jhtype = type(jsonholder) jhtype = type(jsonholder)
# if jhtype == type("") or jhtype == type(u''): if jhtype == type("") or jhtype == type(u''):
# f = open(jsonholder, 'r') f = open(jsonholder, 'r')
# else: else:
# f = jsonholder f = jsonholder
# try: try:
# a = json.load(f) a = json.load(f)
# except Exception as e: except Exception as e:
# print("exception:", self.__jsonholder__, e) print("exception:", self.__jsonholder__, e)
# raise e raise e
# finally: finally:
# if type(jsonholder) == type(""): if type(jsonholder) == type(""):
# f.close() f.close()
# if NS is not None: if NS is not None:
# ac = ArgsConvert('$[', ']$') ac = ArgsConvert('$[', ']$')
# a = ac.convert(a, NS) a = ac.convert(a, NS)
# a['__jsonholder__'] = jsonholder a['__jsonholder__'] = jsonholder
# a['NS'] = NS a['NS'] = NS
# DictObject.__init__(self, **a) DictObject.__init__(self, **a)
# @SingletonDecorator @SingletonDecorator
# class JsonConfig(JsonObject): class JsonConfig(JsonObject):
# pass pass
# def getConfig(path=None, NS=None): def getConfig(path=None, NS=None):
# pp = ProgramPath() pp = ProgramPath()
# if path == None: if path == None:
# path = os.getcwd() path = os.getcwd()
# cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json")) cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json"))
# # print __name__,cfname # print __name__,cfname
# ns = { ns = {
# 'home': str(Path.home()), 'home': str(Path.home()),
# 'workdir': path, 'workdir': path,
# 'ProgramPath': pp 'ProgramPath': pp
# } }
# if NS is not None: if NS is not None:
# ns.update(NS) ns.update(NS)
# a = JsonConfig(cfname, NS=ns) a = JsonConfig(cfname, NS=ns)
# return a return a
# async def time_convert(resoucetime=None): async def time_convert(resoucetime=None):
# if not resoucetime: if not resoucetime:
# return return
# utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
# beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
# return beijing_time.strftime("%Y-%m-%d %H:%M:%S") return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}): async def baidu_sms_kafka_consumer(ns={}):
import os import os
@ -100,7 +98,7 @@ async def baidu_sms_kafka_consumer(ns={}):
'security.protocol': 'SASL_SSL', 'security.protocol': 'SASL_SSL',
'ssl.endpoint.identification.algorithm': 'none', 'ssl.endpoint.identification.algorithm': 'none',
# 证书文件路径 # 证书文件路径
'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem', 'ssl.ca.location': 'baidu_kafka_ca.pem',
# SASL 机制 # SASL 机制
'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.mechanism': 'SCRAM-SHA-512',
# SASL 用户名 # SASL 用户名
@ -112,60 +110,154 @@ async def baidu_sms_kafka_consumer(ns={}):
'auto.offset.reset': 'latest', 'auto.offset.reset': 'latest',
'fetch.message.max.bytes': '1024*512', 'fetch.message.max.bytes': '1024*512',
}) })
# 创建日志文件
files = "baidu_kafka_msg.txt"
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
pass # 创建空文件
else:
pass
# 订阅的主题名称 # 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic']) consumer.subscribe(['kaiyuanyun_msg_topic'])
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
for filename in files:
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
pass # 创建空文件
else:
pass
# total_count = 0
while True: while True:
msg = consumer.poll(2) # 单次轮询获取消息 i = 0
if i == 0:
# 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
msg = consumer.poll(0.1) # 单次轮询获取消息
if msg is None: if msg is None:
if i == 10:
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询第10次消息为None{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
continue continue
elif msg.error(): elif msg.error():
pass # 写入日志文件记录错误信息
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else: else:
# total_count += 1
try: try:
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: # 解析消息内容为字典(避免变量名冲突)
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n") msg_data_ori = json.loads(msg.value().decode('utf-8'))
msg_data = msg_data_ori['messages'][0]
# print('Received message: {}'.format(msg.value().decode('utf-8'))) messageid = msg_data.get('id')
method = "POST" taskid = msg_data.get('taskId')
url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy'
header = { # 读取文件内容进行检查
"Host": "www.opencomputing.cn", with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
# "Content-Type": "application/json" content = f.read()
}
data = { if messageid in content:
'msg': msg.value().decode('utf-8') print(f"文件中已存在 '{messageid}',跳过写入")
} continue
async with aiohttp_client.request( else:
method=method, # 追加写入目标内容
url=url, with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
headers=header, f.write(messageid + '\n')
data=data) as res: print(f"已写入 '{messageid}' 到文件")
data_ = await res.text()
# print("Response data:", data_) with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# 1. 去重检查messageid和taskid
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理")
continue
# consumer.close()
# return
# 2. 构建小写key的ns字典完整映射所有字段
ns_msg = {
'id': uuid(),
'messageid': messageid,
'taskid': taskid,
'userid': msg_data.get('userId'),
'accountid': msg_data.get('accountId'),
'usertype': msg_data.get('userType'),
'receiverid': msg_data.get('receiverId'),
'contentvar': msg_data.get('contentVar'),
'sendchannel': msg_data.get('sendChannel'),
'content': msg_data.get('content'),
'messagetemplateid': msg_data.get('messageTemplateId'),
'isvirtualstore': msg_data.get('isVirtualStore'),
'channelmessageid': msg_data.get('channelMessageId'),
'channelstatus': msg_data.get('channelStatus'),
'majorcategory': msg_data.get('majorCategory'),
'minorcategory': msg_data.get('minorCategory'),
'status': msg_data.get('status'),
'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
'valid': msg_data.get('valid'),
'complete': msg_data.get('complete'),
'disturbhold': msg_data.get('disturbHold'),
'sendcomplete': msg_data.get('sendComplete')
}
# 3. 执行存库
await sor.C('baidu_kafka_msg', ns_msg)
# print(f"消息id {messageid} 存储成功")
# 4. 触发短信发送当sendChannel为MOBILE时
send_channel = msg_data.get('sendChannel')
if send_channel == 'MOBILE':
msg_content = msg_data.get('content')
# 判断验证码类短信 | 通知类短信
if '验证码' in msg_content:
sms_stype = '百度kafka普通验证码'
else:
sms_stype = '百度kafka普通通知'
# 查询用户手机号
account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户")
except json.JSONDecodeError: except json.JSONDecodeError:
print("错误消息内容非有效JSON")
return { return {
'status': False, 'status': False,
'msg': '错误消息内容非有效JSON' 'msg': '错误消息内容非有效JSON'
} }
except Exception as e: except Exception as e:
import traceback print(f"处理异常: {str(e)}")
print(str(e)+ traceback.format_exc())
traceback.print_exc()
return { return {
'status': False, 'status': False,
'msg': f"处理异常: {str(e)}" 'msg': f"处理异常: {str(e)}"
} }
# 记录total_count
# with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
# f.write(f"本次轮询共处理消息数:{total_count}\n")
consumer.close() # 确保消费者关闭 consumer.close() # 确保消费者关闭
return { return {
@ -175,10 +267,10 @@ async def baidu_sms_kafka_consumer(ns={}):
if __name__ == '__main__': if __name__ == '__main__':
# p = os.getcwd() # p = os.getcwd()
# current_dir = os.getcwd() current_dir = os.getcwd()
# path_parts = current_dir.split('/') path_parts = current_dir.split('/')
# two_levels_up = '/'.join(path_parts[:-2]) two_levels_up = '/'.join(path_parts[:-2])
# config = getConfig(two_levels_up) config = getConfig(two_levels_up)
# DBPools(config.databases) DBPools(config.databases)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
print(loop.run_until_complete(baidu_sms_kafka_consumer())) print(loop.run_until_complete(baidu_sms_kafka_consumer()))