From 88b67eb5958f19237d40796362218450b4a6125a Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 14:47:37 +0800 Subject: [PATCH 1/5] update --- b/baiduc/baidu_sms_kafka_consumer.dspy | 76 ++++++++++++++++++-------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index 4181447..a03b978 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -6,6 +6,7 @@ async def time_convert(resoucetime=None): return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def baidu_sms_kafka_consumer(ns={}): + import os consumer = BaiduKafKaConsumer({ # 接入点 'bootstrap.servers': '120.48.10.223:9095,180.76.96.108:9095,180.76.147.36:9095', @@ -28,15 +29,35 @@ async def baidu_sms_kafka_consumer(ns={}): # 订阅的主题名称 consumer.subscribe(['kaiyuanyun_msg_topic']) - + + files = ["baidu_kafka_msg.txt", "baidu_kafka_id.txt", "baidu_kafka_error.txt"] + for filename in files: + if not os.path.exists(filename): + with open(filename, 'w', encoding='utf-8') as f: # 'w' 模式会覆盖已有文件,但检查后使用是安全的 + pass # 创建空文件 + else: + pass + + total_count = 0 for i in range(30): + if i == 0: + # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + msg = consumer.poll(0.1) # 单次轮询获取消息 if msg is None: + if i == 10: + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"轮询第10次消息为None,{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") continue elif msg.error(): - print(f"消费者错误: {msg.error()}") + # 写入日志文件记录错误信息 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 消费者错误: {msg.error()}\n") else: + total_count += 1 try: # 解析消息内容为字典(避免变量名冲突) msg_data_ori = json.loads(msg.value().decode('utf-8')) @@ -44,14 +65,8 @@ async def baidu_sms_kafka_consumer(ns={}): messageid = msg_data.get('id') taskid = msg_data.get('taskId') - filename = 'baidu_kafka_msg.txt' - # 检查文件是否存在,不存在则创建 - if not os.path.exists(filename): - with open(filename, 'w', encoding='utf-8') as f: - print(f"文件不存在,已创建 {filename}") - # 读取文件内容进行检查 - with open(filename, 'r', encoding='utf-8') as f: + with open('baidu_kafka_id.txt', 'r', encoding='utf-8') as f: content = f.read() if messageid in content: @@ -59,9 +74,12 @@ async def baidu_sms_kafka_consumer(ns={}): continue else: # 追加写入目标内容 - with open(filename, 'a', encoding='utf-8') as f: + with open('baidu_kafka_id.txt', 'a', encoding='utf-8') as f: f.write(messageid + '\n') print(f"已写入 '{messageid}' 到文件") + + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(str(msg_data) + '\n') db = DBPools() async with db.sqlorContext('kboss') as sor: @@ -69,8 +87,9 @@ async def baidu_sms_kafka_consumer(ns={}): exist_msg = await sor.R('baidu_kafka_msg', {'messageid': messageid, 'taskid': taskid}) if exist_msg: print(f"消息id {messageid} 已存在,跳过处理") - consumer.close() - return + continue + # consumer.close() + # return # 2. 构建小写key的ns字典(完整映射所有字段) ns_msg = { @@ -119,16 +138,26 @@ async def baidu_sms_kafka_consumer(ns={}): # 查询用户手机号 account_id = msg_data.get('accountId') user_local_id_li = await sor.R('baidu_users', {'baidu_id': account_id}) - user_local_id = user_local_id_li[0]['user_id'] - user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) - mobile = user_mobile_li[0]['mobile'] - - # 调用短信发送接口 - kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) - if kyy_send_status.get('status'): - await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) - print(f"已触发短信发送至 {mobile}") + if user_local_id_li: + user_local_id = user_local_id_li[0]['user_id'] + user_mobile_li = await sor.R('users', {'id': user_local_id, 'del_flg': '0'}) + mobile = user_mobile_li[0]['mobile'] + # 调用短信发送接口 + kyy_send_status = await send_vcode(mobile, sms_stype, {'content': msg_content}) + if kyy_send_status.get('status'): + await sor.U('baidu_kafka_msg', {'id': ns_msg['id'], 'kyysendstatus': '1'}) + else: + # 记录错误日志 短信发送失败 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 短信发送失败至 {mobile},状态: {kyy_send_status}\n") + print(f"短信发送失败至 {mobile},状态: {kyy_send_status}") + else: + # 记录错误日志 用户未找到 + with open('baidu_kafka_error.txt', 'a', encoding='utf-8') as f: + f.write(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 错误: 未找到百度用户ID {account_id} 对应的本地用户\n") + print(f"未找到百度用户ID {account_id} 对应的本地用户") + except json.JSONDecodeError: print("错误:消息内容非有效JSON") return { @@ -141,7 +170,10 @@ async def baidu_sms_kafka_consumer(ns={}): 'status': False, 'msg': f"处理异常: {str(e)}" } - + # 记录total_count + with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: + f.write(f"本次轮询共处理消息数:{total_count}\n") + consumer.close() # 确保消费者关闭 return { 'status': True, From d6a54bca578ee4412d8475692a47568a2fac7e55 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 15:03:47 +0800 Subject: [PATCH 2/5] update --- kgadget/src/more_task_scheduler.py | 226 +++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 kgadget/src/more_task_scheduler.py diff --git a/kgadget/src/more_task_scheduler.py b/kgadget/src/more_task_scheduler.py new file mode 100644 index 0000000..42b57c9 --- /dev/null +++ b/kgadget/src/more_task_scheduler.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +""" +多任务定时脚本 - 带进程检测功能 +适用于 Linux crontab 部署 +""" + +import os +import sys +import time +import logging +import requests +import subprocess +from apscheduler.schedulers.blocking import BlockingScheduler +from apscheduler.executors.pool import ThreadPoolExecutor +from datetime import datetime + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('/tmp/multi_task_scheduler.log'), + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +class ProcessChecker: + """进程检测类""" + + def __init__(self, pid_file='/tmp/multi_task_scheduler.pid'): + self.pid_file = pid_file + + def is_running(self): + """检查进程是否已经在运行""" + if not os.path.exists(self.pid_file): + return False + + try: + with open(self.pid_file, 'r') as f: + pid = int(f.read().strip()) + + # 检查PID是否存在 + try: + os.kill(pid, 0) + return True + except OSError: + os.remove(self.pid_file) + return False + + except (ValueError, IOError) as e: + logger.warning(f"读取PID文件失败: {e}") + if os.path.exists(self.pid_file): + os.remove(self.pid_file) + return False + + def create_pid_file(self): + """创建PID文件""" + try: + with open(self.pid_file, 'w') as f: + f.write(str(os.getpid())) + logger.info(f"创建PID文件: {self.pid_file}") + except IOError as e: + logger.error(f"创建PID文件失败: {e}") + + def remove_pid_file(self): + """删除PID文件""" + try: + if os.path.exists(self.pid_file): + os.remove(self.pid_file) + logger.info(f"删除PID文件: {self.pid_file}") + except IOError as e: + logger.error(f"删除PID文件失败: {e}") + +class TaskExecutor: + """任务执行器""" + + def __init__(self): + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' + }) + + def visit_baidu(self): + """任务1: 访问百度(每5秒)""" + start_time = time.time() + try: + response = self.session.get( + 'https://www.baidu.com/', + timeout=10, + verify=True + ) + duration = time.time() - start_time + + logger.info( + f"[百度访问] 成功 - 状态码: {response.status_code} - " + f"响应时间: {duration:.2f}秒" + ) + + return True + + except Exception as e: + logger.error(f"[百度访问] 失败 - 错误: {e}") + return False + + def execute_curl_command(self, url, task_name, log_file='/d/zhc/crontab_prod.log'): + """执行curl命令的通用方法""" + start_time = time.time() + try: + # 构建curl命令 + command = ['curl', '-s', url] + + # 执行命令 + result = subprocess.run( + command, + capture_output=True, + text=True, + timeout=300 # 5分钟超时 + ) + + duration = time.time() - start_time + + # 记录结果到日志文件 + with open(log_file, 'a', encoding='utf-8') as f: + f.write(f"\n{'='*50}\n") + f.write(f"任务: {task_name}\n") + f.write(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write(f"URL: {url}\n") + f.write(f"返回码: {result.returncode}\n") + f.write(f"执行时间: {duration:.2f}秒\n") + f.write(f"输出: {result.stdout}\n") + if result.stderr: + f.write(f"错误: {result.stderr}\n") + f.write(f"{'='*50}\n") + + if result.returncode == 0: + logger.info(f"[{task_name}] 成功 - 执行时间: {duration:.2f}秒") + return True + else: + logger.error(f"[{task_name}] 失败 - 返回码: {result.returncode}, 错误: {result.stderr}") + return False + + except subprocess.TimeoutExpired: + logger.error(f"[{task_name}] 超时 - 执行超过5分钟") + return False + except Exception as e: + logger.error(f"[{task_name}] 异常 - 错误: {e}") + return False + + def get_baidu_kafka_info(self): + """任务2: 访问百度kafka信息源""" + return self.execute_curl_command( + 'https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy', + '访问百度kafka信息源' + ) + +def setup_scheduler(): + """设置和启动调度器""" + # 配置执行器 + executors = { + 'default': ThreadPoolExecutor(5) + } + + # 创建调度器 + scheduler = BlockingScheduler(executors=executors) + + # 创建任务执行器实例 + task_executor = TaskExecutor() + + # 添加任务1: 每5秒访问百度 + # scheduler.add_job( + # task_executor.visit_baidu, + # 'interval', + # seconds=5, + # id='visit_baidu_job', + # replace_existing=True + # ) + + # 添加任务2: + scheduler.add_job( + task_executor.get_baidu_kafka_info, + 'interval', + seconds=5, + id='get_baidu_kafka_info_job', + replace_existing=True + ) + return scheduler + +def main(): + """主函数""" + # 创建进程检测器 + process_checker = ProcessChecker() + + # 检查是否已有实例在运行 + if process_checker.is_running(): + logger.info("检测到脚本已在运行,退出当前实例") + sys.exit(0) + + # 创建PID文件 + process_checker.create_pid_file() + + logger.info("多任务定时脚本启动...") + logger.info(f"进程PID: {os.getpid()}") + logger.info("任务列表:") + logger.info("1. 每5秒访问百度kafka信息源") + logger.info("按 Ctrl+C 退出程序") + + scheduler = None + try: + # 设置并启动调度器 + scheduler = setup_scheduler() + scheduler.start() + + except KeyboardInterrupt: + logger.info("收到中断信号,正在关闭...") + except Exception as e: + logger.error(f"程序运行出错: {e}") + finally: + # 清理资源 + if scheduler and scheduler.running: + scheduler.shutdown() + process_checker.remove_pid_file() + logger.info("程序已退出") + +if __name__ == "__main__": + main() \ No newline at end of file From 4f3ba55d4e5bdbb96e138314ce9e8e7275d45275 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 15:05:17 +0800 Subject: [PATCH 3/5] update crontab --- crontab/crontab.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crontab/crontab.txt b/crontab/crontab.txt index a004546..e044ba0 100644 --- a/crontab/crontab.txt +++ b/crontab/crontab.txt @@ -33,4 +33,4 @@ */1 * * * * curl https://www.opencomputing.cn/account/email_info.dspy > /d/zhc/crontab_prod.log 2>&1 */1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1 * * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1 -* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1 +#* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1 From de9e3c56e7de7af758bb64aef485dd9761e27de5 Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 15:11:13 +0800 Subject: [PATCH 4/5] update crontab --- crontab/crontab.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/crontab/crontab.txt b/crontab/crontab.txt index e044ba0..975e172 100644 --- a/crontab/crontab.txt +++ b/crontab/crontab.txt @@ -34,3 +34,4 @@ */1 * * * * curl https://www.opencomputing.cn/jdcloud/get_partner_order_list.dspy?page=1 > /d/zhc/crontab_prod.log 2>&1 * * * * * curl https://www.opencomputing.cn/baiduc/baidu_new_update_resouce.dspy > /d/zhc/crontab_prod.log 2>&1 #* * * * * curl https://www.opencomputing.cn/baiduc/baidu_sms_kafka_consumer.dspy > /d/zhc/crontab_dev.log 2>&1 +*/5 * * * * /usr/bin/python3 /d/zhc/kboss_prod/kgadget/src/more_task_scheduler.py >> /tmp/multi_task_scheduler_cron.log 2>&1 From 3fa768f35a6c85aaa0268b7f3a7ae471f713731e Mon Sep 17 00:00:00 2001 From: ping <1017253325@qq.com> Date: Thu, 20 Nov 2025 15:17:11 +0800 Subject: [PATCH 5/5] update --- b/baiduc/baidu_sms_kafka_consumer.dspy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/b/baiduc/baidu_sms_kafka_consumer.dspy b/b/baiduc/baidu_sms_kafka_consumer.dspy index a03b978..7729cbc 100644 --- a/b/baiduc/baidu_sms_kafka_consumer.dspy +++ b/b/baiduc/baidu_sms_kafka_consumer.dspy @@ -39,13 +39,13 @@ async def baidu_sms_kafka_consumer(ns={}): pass total_count = 0 - for i in range(30): + for i in range(2): if i == 0: # 写入文件记录轮询开始时间 时间格式: YYYY-MM-DD HH:MM:SS with open('baidu_kafka_msg.txt', 'a', encoding='utf-8') as f: f.write(f"轮询开始时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") - msg = consumer.poll(0.1) # 单次轮询获取消息 + msg = consumer.poll(0.2) # 单次轮询获取消息 if msg is None: if i == 10: