diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy
index d73988d..dbefa7c 100644
--- a/b/baiduc/baidu_sms_kafka_consumer.dspy
+++ b/b/baiduc/baidu_sms_kafka_consumer.dspy
@@ -6,29 +6,30 @@ async def time_convert(resoucetime=None):
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}):
- import os
- consumer = BaiduKafKaConsumer({
- # 接入点
- 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
- # 接入协议
- 'security.protocol': 'SASL_SSL',
- 'ssl.endpoint.identification.algorithm': 'none',
- # 证书文件路径
- 'ssl.ca.location': 'baidu_kafka_ca.pem',
- # SASL 机制
- 'sasl.mechanism': 'SCRAM-SHA-512',
- # SASL 用户名
- 'sasl.username': 'kaiyuanyun',
- # SASL 用户密码
- 'sasl.password': 'Kyy250609#',
- # 消费组id
- 'group.id': 'kaiyuanyun_msg_group',
- 'auto.offset.reset': 'latest',
- 'fetch.message.max.bytes': '1024*512',
- })
+ # consumer = BaiduKafKaConsumer({
+ # # 接入点
+ # 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
+ # # 接入协议
+ # 'security.protocol': 'SASL_SSL',
+ # 'ssl.endpoint.identification.algorithm': 'none',
+ # # 证书文件路径
+ # 'ssl.ca.location': 'baidu_kafka_ca.pem',
+ # # SASL 机制
+ # 'sasl.mechanism': 'SCRAM-SHA-512',
+ # # SASL 用户名
+ # 'sasl.username': 'kaiyuanyun',
+ # # SASL 用户密码
+ # 'sasl.password': 'Kyy250609#',
+ # # 消费组id
+ # 'group.id': 'kaiyuanyun_msg_group',
+ # 'auto.offset.reset': 'latest',
+ # 'fetch.message.max.bytes': '1024*512',
+ # })
- # 订阅的主题名称
- consumer.subscribe(['kaiyuanyun_msg_topic'])
+ # # 订阅的主题名称
+ # consumer.subscribe(['kaiyuanyun_msg_topic'])
+
+ import os
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
for filename in files:
@@ -38,143 +39,130 @@ async def baidu_sms_kafka_consumer(ns={}):
else:
pass
- total_count = 0
- for i in range(10):
- # if i == 0:
- # # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
- # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- # f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
-
- msg = consumer.poll(0.01) # 单次轮询获取消息
-
- if msg is None:
- if i == 10:
- with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
- continue
- elif msg.error():
- # 写入日志文件记录错误信息
- with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
- f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
- else:
- total_count += 1
- try:
- # 解析消息内容为字典(避免变量名冲突)
- msg_data_ori = json.loads(msg.value().decode('utf-8'))
- msg_data = msg_data_ori['messages'][0]
- messageid = msg_data.get('id')
- taskid = msg_data.get('taskId')
-
- # 读取文件内容进行检查
- with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
- content = f.read()
-
- if messageid in content:
- print(f"文件中已存在 '{messageid}',跳过写入")
- continue
- else:
- # 追加写入目标内容
- with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
- f.write(messageid + '\n')
- print(f"已写入 '{messageid}' 到文件")
-
- with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- f.write(str(msg_data) + '\n')
-
- db = DBPools()
- async with db.sqlorContext('kboss') as sor:
- # 1. 去重检查(messageid和taskid)
- exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
- if exist_msg:
- print(f"消息id {messageid} 已存在,跳过处理")
- continue
- # consumer.close()
- # return
-
- # 2. 构建小写key的ns字典(完整映射所有字段)
- ns_msg = {
- 'id': uuid(),
- 'messageid': messageid,
- 'taskid': taskid,
- 'userid': msg_data.get('userId'),
- 'accountid': msg_data.get('accountId'),
- 'usertype': msg_data.get('userType'),
- 'receiverid': msg_data.get('receiverId'),
- 'contentvar': msg_data.get('contentVar'),
- 'sendchannel': msg_data.get('sendChannel'),
- 'content': msg_data.get('content'),
- 'messagetemplateid': msg_data.get('messageTemplateId'),
- 'isvirtualstore': msg_data.get('isVirtualStore'),
- 'channelmessageid': msg_data.get('channelMessageId'),
- 'channelstatus': msg_data.get('channelStatus'),
- 'majorcategory': msg_data.get('majorCategory'),
- 'minorcategory': msg_data.get('minorCategory'),
- 'status': msg_data.get('status'),
- 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
- 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
- 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
- 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
- 'valid': msg_data.get('valid'),
- 'complete': msg_data.get('complete'),
- 'disturbhold': msg_data.get('disturbHold'),
- 'sendcomplete': msg_data.get('sendComplete')
- }
-
- # 3. 执行存库
- await sor.C('baidu_kafka_msg', ns_msg)
- print(f"消息id {messageid} 存储成功")
-
- # 4. 触发短信发送(当sendChannel为MOBILE时)
- send_channel = msg_data.get('sendChannel')
- if send_channel == 'MOBILE':
- msg_content = msg_data.get('content')
-
- # 判断验证码类短信 | 通知类短信
- if '验证码' in msg_content:
- sms_stype = '百度kafka普通验证码'
- else:
- sms_stype = '百度kafka普通通知'
-
- # 查询用户手机号
- account_id = msg_data.get('accountId')
- user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
- if user_local_id_li:
- user_local_id = user_local_id_li[0]['user_id']
- user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
- mobile = user_mobile_li[0]['mobile']
-
- # 调用短信发送接口
- kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
- if kyy_send_status.get('status'):
- await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
- else:
- # 记录错误日志 短信发送失败
- with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
- f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
- print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
- else:
- # 记录错误日志 用户未找到
- with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
- f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
- print(f"未找到百度用户ID {account_id} 对应的本地用户")
-
- except json.JSONDecodeError:
- print("错误:消息内容非有效JSON")
- return {
- 'status': False,
- 'msg': '错误:消息内容非有效JSON'
- }
- except Exception as e:
- print(f"处理异常: {str(e)}")
- return {
- 'status': False,
- 'msg': f"处理异常: {str(e)}"
- }
- # 记录total_count
- # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- # f.write(f"本次轮询共处理消息数:{total_count}\n")
+ msg = ns.get('msg')
- consumer.close() # 确保消费者关闭
+ # if msg.error():
+ # # 写入日志文件记录错误信息
+ # with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
+ # f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
+ # else:
+ try:
+ # 解析消息内容为字典(避免变量名冲突)
+ # msg_data_ori = json.loads(msg.value().decode('utf-8'))
+ msg_data_ori = json.loads(msg)
+ msg_data = msg_data_ori['messages'][0]
+ messageid = msg_data.get('id')
+ taskid = msg_data.get('taskId')
+
+ # 读取文件内容进行检查
+ with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
+ content = f.read()
+
+ if messageid in content:
+ print(f"文件中已存在 '{messageid}',跳过写入")
+ return
+ else:
+ # 追加写入目标内容
+ with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
+ f.write(messageid + '\n')
+ print(f"已写入 '{messageid}' 到文件")
+
+ with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
+ f.write(str(msg_data) + '\n')
+
+ db = DBPools()
+ async with db.sqlorContext('kboss') as sor:
+ # 1. 去重检查(messageid和taskid)
+ exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
+ if exist_msg:
+ print(f"消息id {messageid} 已存在,跳过处理")
+ return
+
+ # 2. 构建小写key的ns字典(完整映射所有字段)
+ ns_msg = {
+ 'id': uuid(),
+ 'messageid': messageid,
+ 'taskid': taskid,
+ 'userid': msg_data.get('userId'),
+ 'accountid': msg_data.get('accountId'),
+ 'usertype': msg_data.get('userType'),
+ 'receiverid': msg_data.get('receiverId'),
+ 'contentvar': msg_data.get('contentVar'),
+ 'sendchannel': msg_data.get('sendChannel'),
+ 'content': msg_data.get('content'),
+ 'messagetemplateid': msg_data.get('messageTemplateId'),
+ 'isvirtualstore': msg_data.get('isVirtualStore'),
+ 'channelmessageid': msg_data.get('channelMessageId'),
+ 'channelstatus': msg_data.get('channelStatus'),
+ 'majorcategory': msg_data.get('majorCategory'),
+ 'minorcategory': msg_data.get('minorCategory'),
+ 'status': msg_data.get('status'),
+ 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
+ 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
+ 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
+ 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
+ 'valid': msg_data.get('valid'),
+ 'complete': msg_data.get('complete'),
+ 'disturbhold': msg_data.get('disturbHold'),
+ 'sendcomplete': msg_data.get('sendComplete')
+ }
+
+ # 3. 执行存库
+ await sor.C('baidu_kafka_msg', ns_msg)
+ print(f"消息id {messageid} 存储成功")
+
+ # 4. 触发短信发送(当sendChannel为MOBILE时)
+ send_channel = msg_data.get('sendChannel')
+ if send_channel == 'MOBILE':
+ msg_content = msg_data.get('content')
+
+ # 判断验证码类短信 | 通知类短信
+ if '验证码' in msg_content:
+ sms_stype = '百度kafka普通验证码'
+ else:
+ sms_stype = '百度kafka普通通知'
+
+ # 查询用户手机号
+ account_id = msg_data.get('accountId')
+ user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
+ if user_local_id_li:
+ user_local_id = user_local_id_li[0]['user_id']
+ user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
+ mobile = user_mobile_li[0]['mobile']
+
+ # 调用短信发送接口
+ kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
+ if kyy_send_status.get('status'):
+ await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
+ else:
+ # 记录错误日志 短信发送失败
+ with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
+ f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
+ print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
+ else:
+ # 记录错误日志 用户未找到
+ with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
+ f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
+ print(f"未找到百度用户ID {account_id} 对应的本地用户")
+
+ except json.JSONDecodeError:
+ print("错误:消息内容非有效JSON")
+ return {
+ 'status': False,
+ 'msg': '错误:消息内容非有效JSON'
+ }
+ except Exception as e:
+ print(f"处理异常: {str(e)}")
+ import traceback
+ with open('baidu_kafka_error.txt', 'w') as f:
+ f.write(str(e)+ traceback.format_exc())
+ traceback.print_exc()
+ return {
+ 'status': False,
+ 'msg': f"处理异常: {str(e)}"
+ }
+
+ # consumer.close() # 确保消费者关闭
return {
'status': True,
'msg': '获取信息执行结束'
diff --git a/f/web-kboss/src/main.js b/f/web-kboss/src/main.js
index 7cab792..9796b5d 100644
--- a/f/web-kboss/src/main.js
+++ b/f/web-kboss/src/main.js
@@ -100,7 +100,7 @@ Vue.use(HappyScroll)
// });
// // 禁止F12和开发者工具快捷键
-// document.addEventListener('keydown', function(e) {
+// document.addEventListener('keydown', function(e ) {
// // 禁止F12
// if (e.key === 'F12') {
// e.preventDefault();
diff --git a/f/web-kboss/src/views/homePage/detail/img/1.png b/f/web-kboss/src/views/homePage/detail/img/1.png
index 85e5a4a..3c1d090 100644
Binary files a/f/web-kboss/src/views/homePage/detail/img/1.png and b/f/web-kboss/src/views/homePage/detail/img/1.png differ
diff --git a/f/web-kboss/src/views/homePage/detail/img/4.png b/f/web-kboss/src/views/homePage/detail/img/4.png
index 64172fc..4363d65 100644
Binary files a/f/web-kboss/src/views/homePage/detail/img/4.png and b/f/web-kboss/src/views/homePage/detail/img/4.png differ
diff --git a/f/web-kboss/src/views/homePage/detail/img/5.png b/f/web-kboss/src/views/homePage/detail/img/5.png
index de53a08..8e1a5f1 100644
Binary files a/f/web-kboss/src/views/homePage/detail/img/5.png and b/f/web-kboss/src/views/homePage/detail/img/5.png differ
diff --git a/f/web-kboss/src/views/homePage/detail/img/6.png b/f/web-kboss/src/views/homePage/detail/img/6.png
index c74c671..d1fcd78 100644
Binary files a/f/web-kboss/src/views/homePage/detail/img/6.png and b/f/web-kboss/src/views/homePage/detail/img/6.png differ
diff --git a/f/web-kboss/src/views/homePage/detail/img/7.png b/f/web-kboss/src/views/homePage/detail/img/7.png
index e838b20..15e949b 100644
Binary files a/f/web-kboss/src/views/homePage/detail/img/7.png and b/f/web-kboss/src/views/homePage/detail/img/7.png differ
diff --git a/f/web-kboss/src/views/homePage/detail/index.vue b/f/web-kboss/src/views/homePage/detail/index.vue
index eeaa2e4..88278a0 100644
--- a/f/web-kboss/src/views/homePage/detail/index.vue
+++ b/f/web-kboss/src/views/homePage/detail/index.vue
@@ -550,8 +550,8 @@ export default Vue.extend({
}
.function-icon {
- width: 30px;
- height: 30px;
+ width: 100%;
+ height: 100%;
}
.function-content {
diff --git a/f/web-kboss/src/views/product/mainPage/index.vue b/f/web-kboss/src/views/product/mainPage/index.vue
index 130f0a9..30ed540 100644
--- a/f/web-kboss/src/views/product/mainPage/index.vue
+++ b/f/web-kboss/src/views/product/mainPage/index.vue
@@ -83,7 +83,7 @@
v-for="(item, index) in todoList"
:key="index"
class="todo-item"
- @click="handleTodoClick(item.name)"
+ @click="handleTodoClick(item.name)"
>
{{ item.name }}
{{ item.count }}
@@ -142,9 +142,9 @@ export default Vue.extend({
}
},
created() {
- // 移除 initMybalance 调用,因为现在使用 computed 的 mybalance
- this.initMybalance(); // ❌ 删除这一行
- this.getUnreadMsgCount();
+
+ this.initMybalance();
+ this.getUnreadMsgCount();
this.fetchTodoCount();
},
mounted() {
@@ -186,8 +186,8 @@ export default Vue.extend({
const iconIndex = index % this.navIcons.length;
return this.navIcons[iconIndex];
},
- // 移除 initMybalance 方法,因为现在使用 computed 的 mybalance
- async initMybalance() { // ❌ 删除这个方法
+
+ async initMybalance() {
const res = await editReachargelogAPI()
if (res.status) {
this.mybalance = res.data
diff --git a/kgadget/src/baidu_sms_kafka_consumer.py b/kgadget/src/baidu_sms_kafka_consumer.py
index 90d2153..f79c419 100644
--- a/kgadget/src/baidu_sms_kafka_consumer.py
+++ b/kgadget/src/baidu_sms_kafka_consumer.py
@@ -4,8 +4,8 @@
import os
import json
import asyncio
-import datetime
-from sqlor.dbpools import DBPools
+# import datetime
+# from sqlor.dbpools import DBPools
# from appPublic.jsonConfig import getConfig
from appPublic.uniqueID import getID as uuid
from confluent_kafka import Consumer as BaiduKafKaConsumer
@@ -18,76 +18,78 @@ from appPublic.Singleton import SingletonDecorator
from appPublic.folderUtils import ProgramPath
from appPublic.argsConvert import ArgsConvert
-from baiDuSmsClient import send_baidu_vcode as send_vcode
+# from baiDuSmsClient import send_baidu_vcode as send_vcode
+
+from aiohttp import client as aiohttp_client
-def key2ansi(dict):
- # print dict
- return dict
- a = {}
- for k, v in dict.items():
- k = k.encode('utf-8')
- # if type(v) == type(u" "):
- # v = v.encode('utf-8')
- a[k] = v
+# def key2ansi(dict):
+# # print dict
+# return dict
+# a = {}
+# for k, v in dict.items():
+# k = k.encode('utf-8')
+# # if type(v) == type(u" "):
+# # v = v.encode('utf-8')
+# a[k] = v
- return a
+# return a
-class JsonObject(DictObject):
- """
- JsonObject class load json from a json file
- """
+# class JsonObject(DictObject):
+# """
+# JsonObject class load json from a json file
+# """
- def __init__(self, jsonholder, keytype='ansi', NS=None):
- jhtype = type(jsonholder)
- if jhtype == type("") or jhtype == type(u''):
- f = open(jsonholder, 'r')
- else:
- f = jsonholder
- try:
- a = json.load(f)
- except Exception as e:
- print("exception:", self.__jsonholder__, e)
- raise e
- finally:
- if type(jsonholder) == type(""):
- f.close()
+# def __init__(self, jsonholder, keytype='ansi', NS=None):
+# jhtype = type(jsonholder)
+# if jhtype == type("") or jhtype == type(u''):
+# f = open(jsonholder, 'r')
+# else:
+# f = jsonholder
+# try:
+# a = json.load(f)
+# except Exception as e:
+# print("exception:", self.__jsonholder__, e)
+# raise e
+# finally:
+# if type(jsonholder) == type(""):
+# f.close()
- if NS is not None:
- ac = ArgsConvert('$[', ']$')
- a = ac.convert(a, NS)
- a['__jsonholder__'] = jsonholder
- a['NS'] = NS
- DictObject.__init__(self, **a)
+# if NS is not None:
+# ac = ArgsConvert('$[', ']$')
+# a = ac.convert(a, NS)
+# a['__jsonholder__'] = jsonholder
+# a['NS'] = NS
+# DictObject.__init__(self, **a)
-@SingletonDecorator
-class JsonConfig(JsonObject):
- pass
+# @SingletonDecorator
+# class JsonConfig(JsonObject):
+# pass
-def getConfig(path=None, NS=None):
- pp = ProgramPath()
- if path == None:
- path = os.getcwd()
- cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json"))
- # print __name__,cfname
- ns = {
- 'home': str(Path.home()),
- 'workdir': path,
- 'ProgramPath': pp
- }
- if NS is not None:
- ns.update(NS)
- a = JsonConfig(cfname, NS=ns)
- return a
+# def getConfig(path=None, NS=None):
+# pp = ProgramPath()
+# if path == None:
+# path = os.getcwd()
+# cfname = os.path.abspath(os.path.join(path, "conf", "config.prod.json"))
+# # print __name__,cfname
+# ns = {
+# 'home': str(Path.home()),
+# 'workdir': path,
+# 'ProgramPath': pp
+# }
+# if NS is not None:
+# ns.update(NS)
+# a = JsonConfig(cfname, NS=ns)
+# return a
-async def time_convert(resoucetime=None):
- if not resoucetime:
- return
- utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
- beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
- return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
+# async def time_convert(resoucetime=None):
+# if not resoucetime:
+# return
+# utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc)
+# beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
+# return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}):
import os
@@ -98,7 +100,7 @@ async def baidu_sms_kafka_consumer(ns={}):
'security.protocol': 'SASL_SSL',
'ssl.endpoint.identification.algorithm': 'none',
# 证书文件路径
- 'ssl.ca.location': 'baidu_kafka_ca.pem',
+ 'ssl.ca.location': 'D:/Code/kboss/kgadget/src/baidu_kafka_ca.pem',
# SASL 机制
'sasl.mechanism': 'SCRAM-SHA-512',
# SASL 用户名
@@ -110,154 +112,60 @@ async def baidu_sms_kafka_consumer(ns={}):
'auto.offset.reset': 'latest',
'fetch.message.max.bytes': '1024*512',
})
-
+
+ # 创建日志文件
+ files = "baidu_kafka_msg.txt"
+ if not os.path.exists(filename):
+ with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
+ pass # 创建空文件
+ else:
+ pass
+
# 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic'])
-
- files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
- for filename in files:
- if not os.path.exists(filename):
- with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
- pass # 创建空文件
- else:
- pass
-
- # total_count = 0
while True:
- i = 0
- if i == 0:
- # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
- with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
-
- msg = consumer.poll(0.1) # 单次轮询获取消息
+ msg = consumer.poll(2) # 单次轮询获取消息
if msg is None:
- if i == 10:
- with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
continue
elif msg.error():
- # 写入日志文件记录错误信息
- with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
- f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
+ pass
else:
- # total_count += 1
try:
- # 解析消息内容为字典(避免变量名冲突)
- msg_data_ori = json.loads(msg.value().decode('utf-8'))
- msg_data = msg_data_ori['messages'][0]
- messageid = msg_data.get('id')
- taskid = msg_data.get('taskId')
-
- # 读取文件内容进行检查
- with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
- content = f.read()
-
- if messageid in content:
- print(f"文件中已存在 '{messageid}',跳过写入")
- continue
- else:
- # 追加写入目标内容
- with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
- f.write(messageid + '\n')
- print(f"已写入 '{messageid}' 到文件")
-
- with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- f.write(str(msg_data) + '\n')
-
- db = DBPools()
- async with db.sqlorContext('kboss') as sor:
- # 1. 去重检查(messageid和taskid)
- exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
- if exist_msg:
- print(f"消息id {messageid} 已存在,跳过处理")
- continue
- # consumer.close()
- # return
-
- # 2. 构建小写key的ns字典(完整映射所有字段)
- ns_msg = {
- 'id': uuid(),
- 'messageid': messageid,
- 'taskid': taskid,
- 'userid': msg_data.get('userId'),
- 'accountid': msg_data.get('accountId'),
- 'usertype': msg_data.get('userType'),
- 'receiverid': msg_data.get('receiverId'),
- 'contentvar': msg_data.get('contentVar'),
- 'sendchannel': msg_data.get('sendChannel'),
- 'content': msg_data.get('content'),
- 'messagetemplateid': msg_data.get('messageTemplateId'),
- 'isvirtualstore': msg_data.get('isVirtualStore'),
- 'channelmessageid': msg_data.get('channelMessageId'),
- 'channelstatus': msg_data.get('channelStatus'),
- 'majorcategory': msg_data.get('majorCategory'),
- 'minorcategory': msg_data.get('minorCategory'),
- 'status': msg_data.get('status'),
- 'createtime': await time_convert(msg_data.get('createTime')) if msg_data.get('createTime') else None,
- 'updatetime': await time_convert(msg_data.get('updateTime')) if msg_data.get('updateTime') else None,
- 'expiretime': await time_convert(msg_data.get('expireTime')) if msg_data.get('expireTime') else None,
- 'windowtime': await time_convert(msg_data.get('windowTime')) if msg_data.get('windowTime') else None,
- 'valid': msg_data.get('valid'),
- 'complete': msg_data.get('complete'),
- 'disturbhold': msg_data.get('disturbHold'),
- 'sendcomplete': msg_data.get('sendComplete')
- }
-
- # 3. 执行存库
- await sor.C('baidu_kafka_msg', ns_msg)
- # print(f"消息id {messageid} 存储成功")
-
- # 4. 触发短信发送(当sendChannel为MOBILE时)
- send_channel = msg_data.get('sendChannel')
- if send_channel == 'MOBILE':
- msg_content = msg_data.get('content')
-
- # 判断验证码类短信 | 通知类短信
- if '验证码' in msg_content:
- sms_stype = '百度kafka普通验证码'
- else:
- sms_stype = '百度kafka普通通知'
-
- # 查询用户手机号
- account_id = msg_data.get('accountId')
- user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
- if user_local_id_li:
- user_local_id = user_local_id_li[0]['user_id']
- user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
- mobile = user_mobile_li[0]['mobile']
-
- # 调用短信发送接口
- kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
- if kyy_send_status.get('status'):
- await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
- else:
- # 记录错误日志 短信发送失败
- with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
- f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
- print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
- else:
- # 记录错误日志 用户未找到
- with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
- f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
- print(f"未找到百度用户ID {account_id} 对应的本地用户")
+ with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
+ f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}-{msg.value().decode('utf-8')}\n")
+
+ # print('Received message: {}'.format(msg.value().decode('utf-8')))
+ method = "POST"
+ url = 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy'
+ header = {
+ "Host": "www.opencomputing.cn",
+ # "Content-Type": "application/json"
+ }
+ data = {
+ 'msg': msg.value().decode('utf-8')
+ }
+ async with aiohttp_client.request(
+ method=method,
+ url=url,
+ headers=header,
+ data=data) as res:
+ data_ = await res.text()
+ # print("Response data:", data_)
except json.JSONDecodeError:
- print("错误:消息内容非有效JSON")
return {
'status': False,
'msg': '错误:消息内容非有效JSON'
}
except Exception as e:
- print(f"处理异常: {str(e)}")
+ import traceback
+ print(str(e)+ traceback.format_exc())
+ traceback.print_exc()
return {
'status': False,
'msg': f"处理异常: {str(e)}"
}
- # 记录total_count
- # with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
- # f.write(f"本次轮询共处理消息数:{total_count}\n")
consumer.close() # 确保消费者关闭
return {
@@ -267,10 +175,10 @@ async def baidu_sms_kafka_consumer(ns={}):
if __name__ == '__main__':
# p = os.getcwd()
- current_dir = os.getcwd()
- path_parts = current_dir.split('/')
- two_levels_up = '/'.join(path_parts[:-2])
- config = getConfig(two_levels_up)
- DBPools(config.databases)
+ # current_dir = os.getcwd()
+ # path_parts = current_dir.split('/')
+ # two_levels_up = '/'.join(path_parts[:-2])
+ # config = getConfig(two_levels_up)
+ # DBPools(config.databases)
loop = asyncio.get_event_loop()
print(loop.run_until_complete(baidu_sms_kafka_consumer()))