kboss/kgadget/src/more_task_scheduler.py
2025-11-20 17:43:11 +08:00

229 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
多任务定时脚本 - 带进程检测功能
适用于 Linux crontab 部署
"""
import os
import sys
import time
import logging
import requests
import subprocess
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/tmp/multi_task_scheduler.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 屏蔽APScheduler框架日志避免记录框架执行信息
logging.getLogger('apscheduler').setLevel(logging.WARNING)
class ProcessChecker:
"""进程检测类"""
def __init__(self, pid_file='/tmp/multi_task_scheduler.pid'):
self.pid_file = pid_file
def is_running(self):
"""检查进程是否已经在运行"""
if not os.path.exists(self.pid_file):
return False
try:
with open(self.pid_file, 'r') as f:
pid = int(f.read().strip())
# 检查PID是否存在
try:
os.kill(pid, 0)
return True
except OSError:
os.remove(self.pid_file)
return False
except (ValueError, IOError) as e:
logger.warning(f"读取PID文件失败: {e}")
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
return False
def create_pid_file(self):
"""创建PID文件"""
try:
with open(self.pid_file, 'w') as f:
f.write(str(os.getpid()))
logger.info(f"创建PID文件: {self.pid_file}")
except IOError as e:
logger.error(f"创建PID文件失败: {e}")
def remove_pid_file(self):
"""删除PID文件"""
try:
if os.path.exists(self.pid_file):
os.remove(self.pid_file)
logger.info(f"删除PID文件: {self.pid_file}")
except IOError as e:
logger.error(f"删除PID文件失败: {e}")
class TaskExecutor:
"""任务执行器"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def visit_baidu(self):
"""任务1: 访问百度每5秒"""
start_time = time.time()
try:
response = self.session.get(
'https://www.baidu.com/',
timeout=10,
verify=True
)
duration = time.time() - start_time
logger.info(
f"[百度访问] 成功 - 状态码: {response.status_code} - "
f"响应时间: {duration:.2f}"
)
return True
except Exception as e:
logger.error(f"[百度访问] 失败 - 错误: {e}")
return False
def execute_curl_command(self, url, task_name, log_file='/d/zhc/crontab_prod.log'):
"""执行curl命令的通用方法"""
start_time = time.time()
try:
# 构建curl命令
command = ['curl', '-s', url]
# 执行命令
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
duration = time.time() - start_time
# 记录结果到日志文件
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"\n{'='*50}\n")
f.write(f"任务: {task_name}\n")
f.write(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"URL: {url}\n")
f.write(f"返回码: {result.returncode}\n")
f.write(f"执行时间: {duration:.2f}\n")
f.write(f"输出: {result.stdout}\n")
if result.stderr:
f.write(f"错误: {result.stderr}\n")
f.write(f"{'='*50}\n")
if result.returncode == 0:
logger.info(f"[{task_name}] 成功 - 执行时间: {duration:.2f}")
return True
else:
logger.error(f"[{task_name}] 失败 - 返回码: {result.returncode}, 错误: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error(f"[{task_name}] 超时 - 执行超过5分钟")
return False
except Exception as e:
logger.error(f"[{task_name}] 异常 - 错误: {e}")
return False
def get_baidu_kafka_info(self):
"""任务2: 访问百度kafka信息源"""
return self.execute_curl_command(
'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy',
'访问百度kafka信息源'
)
def setup_scheduler():
"""设置和启动调度器"""
# 配置执行器
executors = {
'default': ThreadPoolExecutor(5)
}
# 创建调度器
scheduler = BlockingScheduler(executors=executors)
# 创建任务执行器实例
task_executor = TaskExecutor()
# 添加任务1: 每5秒访问百度
# scheduler.add_job(
# task_executor.visit_baidu,
# 'interval',
# seconds=5,
# id='visit_baidu_job',
# replace_existing=True
# )
# 添加任务2:
scheduler.add_job(
task_executor.get_baidu_kafka_info,
'interval',
seconds=2,
id='get_baidu_kafka_info_job',
replace_existing=True
)
return scheduler
def main():
"""主函数"""
# 创建进程检测器
process_checker = ProcessChecker()
# 检查是否已有实例在运行
if process_checker.is_running():
logger.info("检测到脚本已在运行,退出当前实例")
sys.exit(0)
# 创建PID文件
process_checker.create_pid_file()
logger.info("多任务定时脚本启动...")
logger.info(f"进程PID: {os.getpid()}")
logger.info("任务列表:")
logger.info("1. 每5秒访问百度kafka信息源")
logger.info("按 Ctrl+C 退出程序")
scheduler = None
try:
# 设置并启动调度器
scheduler = setup_scheduler()
scheduler.start()
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭...")
except Exception as e:
logger.error(f"程序运行出错: {e}")
finally:
# 清理资源
if scheduler and scheduler.running:
scheduler.shutdown()
process_checker.remove_pid_file()
logger.info("程序已退出")
if __name__ == "__main__":
main()