kboss/b/product_smart/batch_add_zhisuan.dspy
2025-07-16 14:27:17 +08:00

110 lines
6.0 KiB
Plaintext

async def batch_add_zhisuan(ns):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# file_path_detail = 'C:/Users/86183/Desktop/' + ns.get('filepath')
file_path = ns.get('filepath') # 获取文件路径
file_path_detail = file_realpath(file_path)
workbook = openpyxl.load_workbook(filename=file_path_detail)
sheet = workbook.active
success_count = 0
error_count = 0
update_success_count = 0
existing_serial_numbers = set()
for col in range(2, sheet.max_column + 1):
serial_number = sheet.cell(row=1, column=col).value
existing_serial_numbers.add(serial_number)
if not serial_number:
return {'status': False, 'msg': '必填标头为空'}
if sheet.max_row != 24:
return {'status': False, 'msg': '标头数量有误'}
existing_data = await sor.R('product_smart', {'id': serial_number})
expected_headers = ["序号", "产品型号", "设备所在地", "规模", "交付时间", "设备品牌", "资源类型",
"算卡型号",
"算卡接口", "CPU", "内存", "系统盘", "数据盘", "数据网卡", "管理网卡", "外网带宽",
"共享存储", "IP地址", "测试要求", "年租金(元/月)", "季度租金(元/月)", "月租金(元/月)",
"备注", "类型"]
for i in range(len(expected_headers)):
header = str(sheet.cell(row=i + 1, column=1).value)
if header != expected_headers[i]:
return {'status': False, 'msg': f'第{i + 1}行标头应为 "{expected_headers[i]}"'}
if existing_data:
data = {
'id': sheet.cell(row=1, column=col).value,
'product_model': sheet.cell(row=2, column=col).value,
'location': sheet.cell(row=3, column=col).value,
'scale': sheet.cell(row=4, column=col).value,
'leadtime': sheet.cell(row=5, column=col).value,
'brand': sheet.cell(row=6, column=col).value,
'resource_type': sheet.cell(row=7, column=col).value,
'card_type': sheet.cell(row=8, column=col).value,
'card_interface': sheet.cell(row=9, column=col).value,
'cpu': sheet.cell(row=10, column=col).value,
'memory': sheet.cell(row=11, column=col).value,
'system_disk': sheet.cell(row=12, column=col).value,
'data_disk': sheet.cell(row=13, column=col).value,
'data_network': sheet.cell(row=14, column=col).value,
'card_network': sheet.cell(row=15, column=col).value,
'outer_bandwidth': sheet.cell(row=16, column=col).value,
'shared_storage': sheet.cell(row=17, column=col).value,
'address': sheet.cell(row=18, column=col).value,
'test_claim': sheet.cell(row=19, column=col).value,
'yearly_rant': sheet.cell(row=20, column=col).value,
'quarterly_rent': sheet.cell(row=21, column=col).value,
'monthly_rent': sheet.cell(row=22, column=col).value,
'remark': sheet.cell(row=23, column=col).value,
'type': sheet.cell(row=24, column=col).value,
'del_flg':0
}
try:
await sor.U('product_smart', data)
update_success_count += 1
except Exception as e:
error_count += 1
print(f"更新数据失败:{e}")
else:
# 添加新数据
data = {
'id': sheet.cell(row=1, column=col).value,
'product_model': sheet.cell(row=2, column=col).value,
'location': sheet.cell(row=3, column=col).value,
'scale': sheet.cell(row=4, column=col).value,
'leadtime': sheet.cell(row=5, column=col).value,
'brand': sheet.cell(row=6, column=col).value,
'resource_type': sheet.cell(row=7, column=col).value,
'card_type': sheet.cell(row=8, column=col).value,
'card_interface': sheet.cell(row=9, column=col).value,
'cpu': sheet.cell(row=10, column=col).value,
'memory': sheet.cell(row=11, column=col).value,
'system_disk': sheet.cell(row=12, column=col).value,
'data_disk': sheet.cell(row=13, column=col).value,
'data_network': sheet.cell(row=14, column=col).value,
'card_network': sheet.cell(row=15, column=col).value,
'outer_bandwidth': sheet.cell(row=16, column=col).value,
'shared_storage': sheet.cell(row=17, column=col).value,
'address': sheet.cell(row=18, column=col).value,
'test_claim': sheet.cell(row=19, column=col).value,
'yearly_rant': sheet.cell(row=20, column=col).value,
'quarterly_rent': sheet.cell(row=21, column=col).value,
'monthly_rent': sheet.cell(row=22, column=col).value,
'remark': sheet.cell(row=23, column=col).value,
'type': sheet.cell(row=24, column=col).value,
'del_flg': 0
}
try:
await sor.C('product_smart', data)
success_count += 1
except Exception as e:
error_count += 1
print(f"添加数据失败:{e}")
return {
'status': True,
'msg': '数据更新成功',
'success_count': success_count,
'update_success_count': update_success_count,
'error_count': error_count
}
ret = await batch_add_zhisuan(params_kw)
return ret