update
This commit is contained in:
parent
ea432c25f8
commit
d6a54bca57
226
kgadget/src/more_task_scheduler.py
Normal file
226
kgadget/src/more_task_scheduler.py
Normal file
@ -0,0 +1,226 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
多任务定时脚本 - 带进程检测功能
|
||||
适用于 Linux crontab 部署
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import requests
|
||||
import subprocess
|
||||
from apscheduler.schedulers.blocking import BlockingScheduler
|
||||
from apscheduler.executors.pool import ThreadPoolExecutor
|
||||
from datetime import datetime
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
handlers=[
|
||||
logging.FileHandler('/tmp/multi_task_scheduler.log'),
|
||||
logging.StreamHandler(sys.stdout)
|
||||
]
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ProcessChecker:
|
||||
"""进程检测类"""
|
||||
|
||||
def __init__(self, pid_file='/tmp/multi_task_scheduler.pid'):
|
||||
self.pid_file = pid_file
|
||||
|
||||
def is_running(self):
|
||||
"""检查进程是否已经在运行"""
|
||||
if not os.path.exists(self.pid_file):
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(self.pid_file, 'r') as f:
|
||||
pid = int(f.read().strip())
|
||||
|
||||
# 检查PID是否存在
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
return True
|
||||
except OSError:
|
||||
os.remove(self.pid_file)
|
||||
return False
|
||||
|
||||
except (ValueError, IOError) as e:
|
||||
logger.warning(f"读取PID文件失败: {e}")
|
||||
if os.path.exists(self.pid_file):
|
||||
os.remove(self.pid_file)
|
||||
return False
|
||||
|
||||
def create_pid_file(self):
|
||||
"""创建PID文件"""
|
||||
try:
|
||||
with open(self.pid_file, 'w') as f:
|
||||
f.write(str(os.getpid()))
|
||||
logger.info(f"创建PID文件: {self.pid_file}")
|
||||
except IOError as e:
|
||||
logger.error(f"创建PID文件失败: {e}")
|
||||
|
||||
def remove_pid_file(self):
|
||||
"""删除PID文件"""
|
||||
try:
|
||||
if os.path.exists(self.pid_file):
|
||||
os.remove(self.pid_file)
|
||||
logger.info(f"删除PID文件: {self.pid_file}")
|
||||
except IOError as e:
|
||||
logger.error(f"删除PID文件失败: {e}")
|
||||
|
||||
class TaskExecutor:
|
||||
"""任务执行器"""
|
||||
|
||||
def __init__(self):
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||||
})
|
||||
|
||||
def visit_baidu(self):
|
||||
"""任务1: 访问百度(每5秒)"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
response = self.session.get(
|
||||
'https://www.baidu.com/',
|
||||
timeout=10,
|
||||
verify=True
|
||||
)
|
||||
duration = time.time() - start_time
|
||||
|
||||
logger.info(
|
||||
f"[百度访问] 成功 - 状态码: {response.status_code} - "
|
||||
f"响应时间: {duration:.2f}秒"
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[百度访问] 失败 - 错误: {e}")
|
||||
return False
|
||||
|
||||
def execute_curl_command(self, url, task_name, log_file='/d/zhc/crontab_prod.log'):
|
||||
"""执行curl命令的通用方法"""
|
||||
start_time = time.time()
|
||||
try:
|
||||
# 构建curl命令
|
||||
command = ['curl', '-s', url]
|
||||
|
||||
# 执行命令
|
||||
result = subprocess.run(
|
||||
command,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=300 # 5分钟超时
|
||||
)
|
||||
|
||||
duration = time.time() - start_time
|
||||
|
||||
# 记录结果到日志文件
|
||||
with open(log_file, 'a', encoding='utf-8') as f:
|
||||
f.write(f"\n{'='*50}\n")
|
||||
f.write(f"任务: {task_name}\n")
|
||||
f.write(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write(f"URL: {url}\n")
|
||||
f.write(f"返回码: {result.returncode}\n")
|
||||
f.write(f"执行时间: {duration:.2f}秒\n")
|
||||
f.write(f"输出: {result.stdout}\n")
|
||||
if result.stderr:
|
||||
f.write(f"错误: {result.stderr}\n")
|
||||
f.write(f"{'='*50}\n")
|
||||
|
||||
if result.returncode == 0:
|
||||
logger.info(f"[{task_name}] 成功 - 执行时间: {duration:.2f}秒")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"[{task_name}] 失败 - 返回码: {result.returncode}, 错误: {result.stderr}")
|
||||
return False
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error(f"[{task_name}] 超时 - 执行超过5分钟")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"[{task_name}] 异常 - 错误: {e}")
|
||||
return False
|
||||
|
||||
def get_baidu_kafka_info(self):
|
||||
"""任务2: 访问百度kafka信息源"""
|
||||
return self.execute_curl_command(
|
||||
'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy',
|
||||
'访问百度kafka信息源'
|
||||
)
|
||||
|
||||
def setup_scheduler():
|
||||
"""设置和启动调度器"""
|
||||
# 配置执行器
|
||||
executors = {
|
||||
'default': ThreadPoolExecutor(5)
|
||||
}
|
||||
|
||||
# 创建调度器
|
||||
scheduler = BlockingScheduler(executors=executors)
|
||||
|
||||
# 创建任务执行器实例
|
||||
task_executor = TaskExecutor()
|
||||
|
||||
# 添加任务1: 每5秒访问百度
|
||||
# scheduler.add_job(
|
||||
# task_executor.visit_baidu,
|
||||
# 'interval',
|
||||
# seconds=5,
|
||||
# id='visit_baidu_job',
|
||||
# replace_existing=True
|
||||
# )
|
||||
|
||||
# 添加任务2:
|
||||
scheduler.add_job(
|
||||
task_executor.get_baidu_kafka_info,
|
||||
'interval',
|
||||
seconds=5,
|
||||
id='get_baidu_kafka_info_job',
|
||||
replace_existing=True
|
||||
)
|
||||
return scheduler
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
# 创建进程检测器
|
||||
process_checker = ProcessChecker()
|
||||
|
||||
# 检查是否已有实例在运行
|
||||
if process_checker.is_running():
|
||||
logger.info("检测到脚本已在运行,退出当前实例")
|
||||
sys.exit(0)
|
||||
|
||||
# 创建PID文件
|
||||
process_checker.create_pid_file()
|
||||
|
||||
logger.info("多任务定时脚本启动...")
|
||||
logger.info(f"进程PID: {os.getpid()}")
|
||||
logger.info("任务列表:")
|
||||
logger.info("1. 每5秒访问百度kafka信息源")
|
||||
logger.info("按 Ctrl+C 退出程序")
|
||||
|
||||
scheduler = None
|
||||
try:
|
||||
# 设置并启动调度器
|
||||
scheduler = setup_scheduler()
|
||||
scheduler.start()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("收到中断信号,正在关闭...")
|
||||
except Exception as e:
|
||||
logger.error(f"程序运行出错: {e}")
|
||||
finally:
|
||||
# 清理资源
|
||||
if scheduler and scheduler.running:
|
||||
scheduler.shutdown()
|
||||
process_checker.remove_pid_file()
|
||||
logger.info("程序已退出")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user