pcapi/app/slurm/parse_job.py
2025-07-16 14:46:24 +08:00

202 lines
6.2 KiB
Python

import re
from datetime import datetime
from typing import List
import json
class jobJsonVO:
def __init__(self):
self.jobId = None
self.jobName = None
self.accountUserName = None
self.status = None
self.queueName = None
self.execHosts = None
self.numProcessors = None
self.submitTime = None
self.startTime = None
self.endTime = None
self.userGroup = None
self.workDir = None
self.userPriority = None
self.gpuCardNum = None
self.runningTime = None
self.formatRunningTime = None
self.jobProcessorTime = None
self.jobGpuCardTime = None
def get_start_time(self):
return self.startTime
def set_start_time(self, startTime):
self.startTime = startTime
def get_end_time(self):
return self.endTime
def set_end_time(self, endTime):
self.endTime = endTime
def get_status(self):
return self.status
def getNumProcessors(self):
return self.numProcessors
def getRunningTime(self):
return self.runningTime
class JobConstants:
DONE = "DONE"
EXIT = "EXIT"
RUN = "RUN"
PEND = "PEND"
CANCELLED = "CANCELLED"
def parse_status(status: str) -> str:
if status == "COMPLETED":
return JobConstants.DONE
if status == "FAILED":
return JobConstants.EXIT
if status == "RUNNING":
return JobConstants.RUN
if status == "PENDING":
return JobConstants.PEND
if "CANCELLED" in status:
return JobConstants.CANCELLED
return status
def parse_slurm_str_to_str(date_str: str) -> str:
# Implement your date parsing logic here
date_str=date_str.replace("T"," ")
date_str=date_str.replace("Z"," ")
return date_str
def handle_alloc_tres_get_gpus(tres_str: str) -> int:
# Implement your GPU card number extraction logic here
return 0
def calculate_processor_time(job: jobJsonVO) -> float:
# Implement your processor time calculation logic here
processors = job.getNumProcessors()
runningTime = job.getRunningTime()
processorsRunningTime = processors * runningTime
return processorsRunningTime
def calculate_gpu_card_time(job: jobJsonVO) -> float:
# Implement your GPU card time calculation logic here
return 0.0
def parse_date(date_str: str) -> datetime:
# 假设这是一个将字符串解析为 datetime 对象的函数
# 这里使用默认的日期格式,您可以根据实际需求调整
return datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
def get_now_date() -> datetime:
# 返回当前时间的 datetime 对象
return datetime.now()
def get_running_time(item: jobJsonVO) -> int:
t = item.get_start_time()
if t is None:
return 0
start_time_temp = parse_slurm_str_to_str(item.get_start_time())
item.set_start_time(start_time_temp)
if start_time_temp == "-":
return 0
start_time_date = datetime.now() # 默认值为当前时间
if item.get_status() != JobConstants.PEND:
start_time_date = parse_date(start_time_temp)
running_time = 0
status = item.get_status()
if status in [JobConstants.DONE, JobConstants.EXIT]:
end_time_temp = start_time_temp if item.get_end_time() is None else parse_slurm_str_to_str(item.get_end_time())
end_time_date = parse_date(end_time_temp)
item.set_end_time(end_time_temp)
running_time = int((end_time_date - start_time_date).total_seconds() * 1000) # 转换为毫秒
elif status == JobConstants.RUN:
running_time = int((get_now_date() - start_time_date).total_seconds() * 1000) # 转换为毫秒
elif status == JobConstants.CANCELLED:
end_time_date = parse_date(item.get_end_time()) if item.get_end_time() else start_time_date
running_time = int((end_time_date - start_time_date).total_seconds() * 1000) # 转换为毫秒
elif status == JobConstants.PEND:
running_time = 0
return running_time
def format_running_time(job: jobJsonVO) -> str:
# Implement your running time formatting logic here
return ""
def process_data(data: dict) -> List[jobJsonVO]:
try:
item_list = data["stdout"].split("\n")
job_json_list = []
for i in range(len(item_list)):
if len(item_list) < 1:
return []
if i < 1:
continue
words = item_list[i].split("|")
word_list = [word.strip() for word in words if word.strip()]
if len(word_list) < 14:
continue
job_json = jobJsonVO()
try:
jobId = int(word_list[0])
except ValueError:
continue
job_json.jobId = word_list[0]
job_json.jobName = word_list[1]
job_json.accountUserName = word_list[2]
job_json.status = parse_status(word_list[3])
job_json.queueName = word_list[4]
job_json.execHosts = word_list[5]
try:
job_json.numProcessors = int(word_list[6])
job_json.submitTime = parse_slurm_str_to_str(word_list[7])
job_json.startTime = parse_slurm_str_to_str(word_list[8])
except (ValueError, IndexError):
continue
if job_json.status == "DONE" or len(word_list) >= 10:
if word_list[9] != "Unknown":
job_json.endTime = parse_slurm_str_to_str(word_list[9])
job_json.userGroup = word_list[10]
job_json.workDir = word_list[11]
job_json.userPriority = int(word_list[12])
job_json.gpuCardNum = handle_alloc_tres_get_gpus(word_list[13])
job_json.runningTime = get_running_time(job_json)
job_json.formatRunningTime = format_running_time(job_json)
job_json.jobProcessorTime = calculate_processor_time(job_json)
job_json.jobGpuCardTime = calculate_gpu_card_time(job_json)
job_dict = job_json.__dict__
job_json_list.append(job_dict)
job_json_list.reverse()
return job_json_list
except Exception as e:
print(f"An error occurred: {e}")
raise Exception("CLUSTER_ERROR")