This commit is contained in:
hrx 2025-11-20 15:23:50 +08:00
commit 668da1b96a
3 changed files with 284 additions and 25 deletions

View File

@ -6,6 +6,7 @@ async def time_convert(resoucetime=None):
return beijing_time.strftime("%Y-%m-%d %H:%M:%S")
async def baidu_sms_kafka_consumer(ns={}):
import os
consumer = BaiduKafKaConsumer({
# 接入点
'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095',
@ -28,15 +29,35 @@ async def baidu_sms_kafka_consumer(ns={}):
# 订阅的主题名称
consumer.subscribe(['kaiyuanyun_msg_topic'])
for i in range(30):
msg = consumer.poll(0.1) # 单次轮询获取消息
files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"]
for filename in files:
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的
pass # 创建空文件
else:
pass
total_count = 0
for i in range(2):
if i == 0:
# 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
msg = consumer.poll(0.2) # 单次轮询获取消息
if msg is None:
if i == 10:
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"轮询第10次消息为None{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
continue
elif msg.error():
print(f"消费者错误: {msg.error()}")
# 写入日志文件记录错误信息
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n")
else:
total_count += 1
try:
# 解析消息内容为字典(避免变量名冲突)
msg_data_ori = json.loads(msg.value().decode('utf-8'))
@ -44,14 +65,8 @@ async def baidu_sms_kafka_consumer(ns={}):
messageid = msg_data.get('id')
taskid = msg_data.get('taskId')
filename = 'baidu_kafka_msg.txt'
# 检查文件是否存在,不存在则创建
if not os.path.exists(filename):
with open(filename, 'w', encoding='utf-8') as f:
print(f"文件不存在,已创建 {filename}")
# 读取文件内容进行检查
with open(filename, 'r', encoding='utf-8') as f:
with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f:
content = f.read()
if messageid in content:
@ -59,9 +74,12 @@ async def baidu_sms_kafka_consumer(ns={}):
continue
else:
# 追加写入目标内容
with open(filename, 'a', encoding='utf-8') as f:
with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f:
f.write(messageid + '\n')
print(f"已写入 '{messageid}' 到文件")
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(str(msg_data) + '\n')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
@ -69,8 +87,9 @@ async def baidu_sms_kafka_consumer(ns={}):
exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid})
if exist_msg:
print(f"消息id {messageid} 已存在,跳过处理")
consumer.close()
return
continue
# consumer.close()
# return
# 2. 构建小写key的ns字典完整映射所有字段
ns_msg = {
@ -119,16 +138,26 @@ async def baidu_sms_kafka_consumer(ns={}):
# 查询用户手机号
account_id = msg_data.get('accountId')
user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id})
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
print(f"已触发短信发送至 {mobile}")
if user_local_id_li:
user_local_id = user_local_id_li[0]['user_id']
user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'})
mobile = user_mobile_li[0]['mobile']
# 调用短信发送接口
kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content})
if kyy_send_status.get('status'):
await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'})
else:
# 记录错误日志 短信发送失败
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n")
print(f"短信发送失败至 {mobile},状态: {kyy_send_status}")
else:
# 记录错误日志 用户未找到
with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f:
f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n")
print(f"未找到百度用户ID {account_id} 对应的本地用户")
except json.JSONDecodeError:
print("错误消息内容非有效JSON")
return {
@ -141,7 +170,10 @@ async def baidu_sms_kafka_consumer(ns={}):
'status': False,
'msg': f"处理异常: {str(e)}"
}
# 记录total_count
with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f:
f.write(f"本次轮询共处理消息数:{total_count}\n")
consumer.close() # 确保消费者关闭
return {
'status': True,

View File

@ -33,4 +33,5 @@
*/1 * * * * curl https://www.opencomputing.cn/account/email_info.dspy > /d/zhc/crontab_prod.log 2>&1
*/1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1
* * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1
* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1
#* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1
*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py >> /tmp/multi_task_scheduler_cron.log 2>&1

View File

@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
多任务定时脚本 - 带进程检测功能
适用于 Linux crontab 部署
"""
import os
import sys
import time
import logging
import requests
import subprocess
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/tmp/multi_task_scheduler.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class ProcessChecker:
"""进程检测类"""
def __init__(self, pid_file='/tmp/multi_task_scheduler.pid'):
self.pid_file = pid_file
def is_running(self):
"""检查进程是否已经在运行"""
if not os.path.exists(self.pid_file):
return False
try:
with open(self.pid_file, 'r') as f:
pid = int(f.read().strip())
# 检查PID是否存在
try:
os.kill(pid, 0)
return True
except OSError:
os.remove(self.pid_file)
return False
except (ValueError, IOError) as e:
logger.warning(f"读取PID文件失败: {e}")
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
return False
def create_pid_file(self):
"""创建PID文件"""
try:
with open(self.pid_file, 'w') as f:
f.write(str(os.getpid()))
logger.info(f"创建PID文件: {self.pid_file}")
except IOError as e:
logger.error(f"创建PID文件失败: {e}")
def remove_pid_file(self):
"""删除PID文件"""
try:
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
logger.info(f"删除PID文件: {self.pid_file}")
except IOError as e:
logger.error(f"删除PID文件失败: {e}")
class TaskExecutor:
"""任务执行器"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def visit_baidu(self):
"""任务1: 访问百度每5秒"""
start_time = time.time()
try:
response = self.session.get(
'https://www.baidu.com/',
timeout=10,
verify=True
)
duration = time.time() - start_time
logger.info(
f"[百度访问] 成功 - 状态码: {response.status_code} - "
f"响应时间: {duration:.2f}"
)
return True
except Exception as e:
logger.error(f"[百度访问] 失败 - 错误: {e}")
return False
def execute_curl_command(self, url, task_name, log_file='/d/zhc/crontab_prod.log'):
"""执行curl命令的通用方法"""
start_time = time.time()
try:
# 构建curl命令
command = ['curl', '-s', url]
# 执行命令
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
duration = time.time() - start_time
# 记录结果到日志文件
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"\n{'='*50}\n")
f.write(f"任务: {task_name}\n")
f.write(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"URL: {url}\n")
f.write(f"返回码: {result.returncode}\n")
f.write(f"执行时间: {duration:.2f}\n")
f.write(f"输出: {result.stdout}\n")
if result.stderr:
f.write(f"错误: {result.stderr}\n")
f.write(f"{'='*50}\n")
if result.returncode == 0:
logger.info(f"[{task_name}] 成功 - 执行时间: {duration:.2f}")
return True
else:
logger.error(f"[{task_name}] 失败 - 返回码: {result.returncode}, 错误: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error(f"[{task_name}] 超时 - 执行超过5分钟")
return False
except Exception as e:
logger.error(f"[{task_name}] 异常 - 错误: {e}")
return False
def get_baidu_kafka_info(self):
"""任务2: 访问百度kafka信息源"""
return self.execute_curl_command(
'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy',
'访问百度kafka信息源'
)
def setup_scheduler():
"""设置和启动调度器"""
# 配置执行器
executors = {
'default': ThreadPoolExecutor(5)
}
# 创建调度器
scheduler = BlockingScheduler(executors=executors)
# 创建任务执行器实例
task_executor = TaskExecutor()
# 添加任务1: 每5秒访问百度
# scheduler.add_job(
# task_executor.visit_baidu,
# 'interval',
# seconds=5,
# id='visit_baidu_job',
# replace_existing=True
# )
# 添加任务2:
scheduler.add_job(
task_executor.get_baidu_kafka_info,
'interval',
seconds=5,
id='get_baidu_kafka_info_job',
replace_existing=True
)
return scheduler
def main():
"""主函数"""
# 创建进程检测器
process_checker = ProcessChecker()
# 检查是否已有实例在运行
if process_checker.is_running():
logger.info("检测到脚本已在运行,退出当前实例")
sys.exit(0)
# 创建PID文件
process_checker.create_pid_file()
logger.info("多任务定时脚本启动...")
logger.info(f"进程PID: {os.getpid()}")
logger.info("任务列表:")
logger.info("1. 每5秒访问百度kafka信息源")
logger.info("按 Ctrl+C 退出程序")
scheduler = None
try:
# 设置并启动调度器
scheduler = setup_scheduler()
scheduler.start()
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭...")
except Exception as e:
logger.error(f"程序运行出错: {e}")
finally:
# 清理资源
if scheduler and scheduler.running:
scheduler.shutdown()
process_checker.remove_pid_file()
logger.info("程序已退出")
if __name__ == "__main__":
main()