From d6a54bca578ee4412d8475692a47568a2fac7e55 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 15:03:47 +0800 Subject: [PATCH] update --- kgadget/src/more_task_scheduler.py | 226 +++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 kgadget/src/more_task_scheduler.py diff --git a/kgadget/src/more_task_scheduler.py b/kgadget/src/more_task_scheduler.py new file mode 100644 index 0000000..42b57c9 --- /dev/null +++ b/kgadget/src/more_task_scheduler.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +""" +多任务定时脚本 - 带进程检测功能 +适用于 Linux crontab 部署 +""" + +import os +import sys +import time +import logging +import requests +import subprocess +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.executors.pool import ThreadPoolExecutor +from datetime import datetime + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/tmp/multi_task_scheduler.log'), + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +class ProcessChecker: + """进程检测类""" + + def __init__(self, pid_file='/tmp/multi_task_scheduler.pid'): + self.pid_file = pid_file + + def is_running(self): + """检查进程是否已经在运行""" + if not os.path.exists(self.pid_file): + return False + + try: + with open(self.pid_file, 'r') as f: + pid = int(f.read().strip()) + + # 检查PID是否存在 + try: + os.kill(pid, 0) + return True + except OSError: + os.remove(self.pid_file) + return False + + except (ValueError, IOError) as e: + logger.warning(f"读取PID文件失败: {e}") + if os.path.exists(self.pid_file): + os.remove(self.pid_file) + return False + + def create_pid_file(self): + """创建PID文件""" + try: + with open(self.pid_file, 'w') as f: + f.write(str(os.getpid())) + logger.info(f"创建PID文件: {self.pid_file}") + except IOError as e: + logger.error(f"创建PID文件失败: {e}") + + def remove_pid_file(self): + """删除PID文件""" + try: + if os.path.exists(self.pid_file): + os.remove(self.pid_file) + logger.info(f"删除PID文件: {self.pid_file}") + except IOError as e: + logger.error(f"删除PID文件失败: {e}") + +class TaskExecutor: + """任务执行器""" + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + + def visit_baidu(self): + """任务1: 访问百度(每5秒)""" + start_time = time.time() + try: + response = self.session.get( + 'https://www.baidu.com/', + timeout=10, + verify=True + ) + duration = time.time() - start_time + + logger.info( + f"[百度访问] 成功 - 状态码: {response.status_code} - " + f"响应时间: {duration:.2f}秒" + ) + + return True + + except Exception as e: + logger.error(f"[百度访问] 失败 - 错误: {e}") + return False + + def execute_curl_command(self, url, task_name, log_file='/d/zhc/crontab_prod.log'): + """执行curl命令的通用方法""" + start_time = time.time() + try: + # 构建curl命令 + command = ['curl', '-s', url] + + # 执行命令 + result = subprocess.run( + command, + capture_output=True, + text=True, + timeout=300 # 5分钟超时 + ) + + duration = time.time() - start_time + + # 记录结果到日志文件 + with open(log_file, 'a', encoding='utf-8') as f: + f.write(f"\n{'='*50}\n") + f.write(f"任务: {task_name}\n") + f.write(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"URL: {url}\n") + f.write(f"返回码: {result.returncode}\n") + f.write(f"执行时间: {duration:.2f}秒\n") + f.write(f"输出: {result.stdout}\n") + if result.stderr: + f.write(f"错误: {result.stderr}\n") + f.write(f"{'='*50}\n") + + if result.returncode == 0: + logger.info(f"[{task_name}] 成功 - 执行时间: {duration:.2f}秒") + return True + else: + logger.error(f"[{task_name}] 失败 - 返回码: {result.returncode}, 错误: {result.stderr}") + return False + + except subprocess.TimeoutExpired: + logger.error(f"[{task_name}] 超时 - 执行超过5分钟") + return False + except Exception as e: + logger.error(f"[{task_name}] 异常 - 错误: {e}") + return False + + def get_baidu_kafka_info(self): + """任务2: 访问百度kafka信息源""" + return self.execute_curl_command( + 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy', + '访问百度kafka信息源' + ) + +def setup_scheduler(): + """设置和启动调度器""" + # 配置执行器 + executors = { + 'default': ThreadPoolExecutor(5) + } + + # 创建调度器 + scheduler = BlockingScheduler(executors=executors) + + # 创建任务执行器实例 + task_executor = TaskExecutor() + + # 添加任务1: 每5秒访问百度 + # scheduler.add_job( + # task_executor.visit_baidu, + # 'interval', + # seconds=5, + # id='visit_baidu_job', + # replace_existing=True + # ) + + # 添加任务2: + scheduler.add_job( + task_executor.get_baidu_kafka_info, + 'interval', + seconds=5, + id='get_baidu_kafka_info_job', + replace_existing=True + ) + return scheduler + +def main(): + """主函数""" + # 创建进程检测器 + process_checker = ProcessChecker() + + # 检查是否已有实例在运行 + if process_checker.is_running(): + logger.info("检测到脚本已在运行,退出当前实例") + sys.exit(0) + + # 创建PID文件 + process_checker.create_pid_file() + + logger.info("多任务定时脚本启动...") + logger.info(f"进程PID: {os.getpid()}") + logger.info("任务列表:") + logger.info("1. 每5秒访问百度kafka信息源") + logger.info("按 Ctrl+C 退出程序") + + scheduler = None + try: + # 设置并启动调度器 + scheduler = setup_scheduler() + scheduler.start() + + except KeyboardInterrupt: + logger.info("收到中断信号,正在关闭...") + except Exception as e: + logger.error(f"程序运行出错: {e}") + finally: + # 清理资源 + if scheduler and scheduler.running: + scheduler.shutdown() + process_checker.remove_pid_file() + logger.info("程序已退出") + +if __name__ == "__main__": + main() \ No newline at end of file