151 lines
8.5 KiB
Plaintext
151 lines
8.5 KiB
Plaintext
async def get_font_style(cell):
|
|
"""获取单元格的字体样式信息"""
|
|
font = cell.font
|
|
style = []
|
|
if font.bold:
|
|
style.append("font-weight: bold;")
|
|
if font.italic:
|
|
style.append("font-style: italic;")
|
|
if font.underline:
|
|
style.append("text-decoration: underline;")
|
|
if font.color and font.color.rgb:
|
|
style.append(f"color: #{font.color.rgb};")
|
|
if font.size:
|
|
style.append(f"font-size: {font.size}pt;")
|
|
return " ".join(style)
|
|
|
|
async def excel_to_html_with_merges(nss={}):
|
|
try:
|
|
update_id = nss.get('update_id')
|
|
excel_file = file_realpath(nss.get('filepath'))
|
|
# excel_file = 'C:/Users/Admin/Desktop/算力业务供应商评估表-模版.xlsx'
|
|
|
|
workbook = openpyxl.load_workbook(filename=excel_file, data_only=True)
|
|
sheet = workbook.active
|
|
basic_info_init = [
|
|
{"id": "1", "label": "算力型号", "type": "select", "options": ["4090", "A100", "A800", "H100", "H800", "A100-40G", "A800-40G", "A100-PCIE-40G", "A100-PCIE-80G", "A800-PCIE-40G", "A800-PCIE-80G"], "value": sheet['B3'].value, "is_new": 0},
|
|
{"id": "2", "label": "计算网络类型", "type": "select", "options": ["IB", "ROCE", "以太"], "value": sheet['F3'].value, "is_new": 0},
|
|
{"id": "3", "label": "资源类型", "type": "select", "options": ["可控", "临时"], "value": sheet['J3'].value, "is_new": 0},
|
|
{"id": "4", "label": "集群物理位置", "type": "input", "value": sheet['N3'].value, "is_new": 0},
|
|
{"id": "5", "label": "算力提供方式", "type": "select", "options": ["裸金属", "容器化"], "value": sheet['B4'].value, "is_new": 0},
|
|
{"id": "6", "label": "剩余GPU数量", "type": "input", "value": sheet['F4'].value, "is_new": 0},
|
|
{"id": "7", "label": "是否现货", "type": "input", "value": sheet['J4'].value, "is_new": 0},
|
|
{"id": "8", "label": "资源入库时间", "type": "input", "value": sheet['N4'].value, "is_new": 0},
|
|
{"id": "9", "label": "起始租期", "type": "input", "value": sheet['B5'].value, "is_new": 0},
|
|
{"id": "10", "label": "付款方式", "type": "input", "value": sheet['F5'].value, "is_new": 0},
|
|
{"id": "11", "label": "月租(建议价)", "type": "input", "value": sheet['J5'].value, "is_new": 0},
|
|
{"id": "12", "label": "交付周期", "type": "input", "value": sheet['N5'].value, "is_new": 0},
|
|
{"id": "13", "label": "基本配置", "type": "input", "value": sheet['B6'].value, "is_new": 0},
|
|
{"id": "14", "label": "集群GPU总量", "type": "input", "value": sheet['B7'].value, "is_new": 0},
|
|
{"id": "15", "label": "是否有配套存储", "type": "select", "options": ["有", "无"], "value": sheet['F7'].value, "is_new": 0},
|
|
{"id": "16", "label": "存储类型", "type": "select", "options": ["全闪", "混闪"], "value": sheet['J7'].value, "is_new": 0},
|
|
{"id": "17", "label": "存储总量", "type": "input", "value": sheet['N7'].value, "is_new": 0},
|
|
{"id": "18", "label": "剩余存储数量", "type": "input", "value": sheet['B8'].value, "is_new": 0},
|
|
{"id": "19", "label": "有无云平台", "type": "select", "options": ["有", "无"], "value": sheet['F8'].value, "is_new": 0},
|
|
{"id": "20", "label": "月租(成本)", "type": "input", "value": sheet['J8'].value, "is_new": 0},
|
|
{"id": "21", "label": "集群是否支持改造", "type": "select", "options": ["支持", "不支持"], "value": sheet['N8'].value, "is_new": 0},
|
|
{"id": "22", "label": "是否支持测试", "type": "select", "options": ["支持", "不支持"], "value": sheet['B9'].value, "is_new": 0},
|
|
{"id": "23", "label": "最大测试台数", "type": "input", "value": sheet['F9'].value, "is_new": 0},
|
|
{"id": "24", "label": "是否已经测试验证", "type": "select", "options": ["是", "否"], "value": sheet['J9'].value, "is_new": 0}
|
|
]
|
|
index_id = update_id if update_id else uuid()
|
|
db = DBPools()
|
|
async with db.sqlorContext('kboss') as sor:
|
|
ns_dic = {
|
|
'id': index_id,
|
|
'orgid': nss.get('orgid'),
|
|
'basic_info': json.dumps(basic_info_init) if not isinstance(basic_info_init, str) else basic_info_init,
|
|
'remark': nss.get('remark'),
|
|
'del_flg': '0'
|
|
}
|
|
if update_id:
|
|
await sor.U('kyy_supply', ns_dic)
|
|
else:
|
|
await sor.C('kyy_supply', ns_dic)
|
|
|
|
# 加载工作簿和活动工作表
|
|
# wb = openpyxl.load_workbook(excel_file)
|
|
# sheet = wb.active
|
|
|
|
# 获取合并单元格的信息
|
|
merged_ranges = sheet.merged_cells.ranges
|
|
html_content = '<table border="1" style="border-collapse:collapse;">'
|
|
|
|
# 逐行处理
|
|
for row in sheet.iter_rows():
|
|
html_content += '<tr>'
|
|
for cell in row:
|
|
# 获取字体样式
|
|
font_style = await get_font_style(cell)
|
|
|
|
# 获取当前单元格的行列数
|
|
cell_row, cell_col = cell.row, cell.column
|
|
|
|
# 检查当前单元格是否是合并单元格的起始单元格
|
|
if any((cell_row, cell_col) == (merge.min_row, merge.min_col) for merge in merged_ranges):
|
|
# 获取合并单元格的范围
|
|
for merge in merged_ranges:
|
|
if (cell_row, cell_col) == (merge.min_row, merge.min_col):
|
|
rowspan = merge.max_row - merge.min_row + 1
|
|
colspan = merge.max_col - merge.min_col + 1
|
|
cell_value = cell.value if cell.value is not None else ""
|
|
cell_value = str(cell_value).replace('\n', '<br>')
|
|
html_content += f'<td rowspan="{rowspan}" colspan="{colspan}" style="text-align: center; {font_style}">{cell_value}</td>'
|
|
break
|
|
elif not any((cell_row, cell_col) in [(merge.min_row + r, merge.min_col + c) for r in
|
|
range(merge.max_row - merge.min_row + 1) for c in
|
|
range(merge.max_col - merge.min_col + 1)] for merge in merged_ranges):
|
|
# 不是合并单元格的后续单元格
|
|
cell_value = cell.value if cell.value is not None else ""
|
|
html_content += f'<td style="text-align: center; {font_style}">{cell_value}</td>'
|
|
html_content += '</tr>'
|
|
html_content += '</table>'
|
|
html_content = html_content.replace("color: #Values must be of type <class 'str'>; ", '')
|
|
await sor.U('kyy_supply', {'id': index_id, 'excel_info': html_content})
|
|
return {
|
|
'status': True,
|
|
'msg': 'excel覆盖填写信息成功, 上传文档成功'
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': False,
|
|
'msg': 'excel覆盖填写信息报错, 上传文档报错, %s' % str(e)
|
|
}
|
|
|
|
async def kyy_add_supply(ns={}):
|
|
try:
|
|
if ns.get('filepath'):
|
|
# excel_file = ns.get('filepath')
|
|
excel_file = file_realpath(ns.get('filepath'))
|
|
nss = {
|
|
'orgid': ns.get('orgid'),
|
|
'update_id': ns.get('update_id'),
|
|
'filepath': ns.get('filepath'),
|
|
'remark': ns.get('remark')
|
|
}
|
|
res = await excel_to_html_with_merges(nss)
|
|
return res
|
|
|
|
db = DBPools()
|
|
async with db.sqlorContext('kboss') as sor:
|
|
ns_dic = {
|
|
'id': uuid(),
|
|
'orgid': ns.get('orgid'),
|
|
'basic_info': json.dumps(ns.get('basic_info')) if not isinstance(ns.get('basic_info'), str) else ns.get('basic_info'),
|
|
'file_path': ns.get('filepath'),
|
|
'remark': ns.get('remark'),
|
|
'del_flg': '0'
|
|
}
|
|
await sor.C('kyy_supply', ns_dic)
|
|
return {
|
|
'status': True,
|
|
'msg': '文档上传成功, id: %s' % ns_dic['id']
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'status': False,
|
|
'msg': '文档上传失败, %s' % str(e)
|
|
}
|
|
|
|
ret = await kyy_add_supply(params_kw)
|
|
return ret |