kboss/b/cpcc/utils/stock_overflow.py
2025-07-16 14:27:17 +08:00

61 lines
2.2 KiB
Python

import unittest
import logging
class MyTestCase(unittest.TestCase):
def test_something(self):
self.assertEqual(True, False) # add assertion here
def test_something2():
# 输入数据
produce_pm = [{'type': 'cpu', 'model': 'STANDARD', 'amount': '1000m'},
{'type': 'memory', 'model': 'STANDARD', 'amount': '7Gi'},
{'type': 'gpu', 'model': 'STANDARD', 'amount': 0},
{'type': 'disk', 'model': 'SYS', 'amount': 10},
{'type': 'disk', 'model': 'DATA', 'amount': '20Gi'}]
result = [{'type': 'disk', 'model': 'SYS', 'stock': 99, "consumed": 30},
{'type': 'disk', 'model': 'DATA', 'stock': 99, "consumed": 31},
{'type': 'memory', 'model': 'STANDARD', 'stock': 16, "consumed": 1},
{'type': 'cpu', 'model': 'INTEL(R) XEON(R) PLATINUM 8582C', 'stock': 4, "consumed": 1}]
# 预处理库存数据
for res in result:
if res['type'] != 'disk':
res['model'] = 'STANDARD'
usable_map = {(item['type'], item['model']): (item['stock']-item['consumed']) for item in result}
print(f"部件剩余库存数:{usable_map}")
# 单位转换函数
def convert_unit(value):
if isinstance(value, int):
return value
if isinstance(value, str):
if value.endswith('m'): # CPU millicores
return float(value[:-1]) / 1000
elif value.endswith('Gi'): # Memory GiB
return float(value[:-2])
return float(value)
# 检查超出的资源
exceeded_resources = []
for req in produce_pm:
key = (req['type'], req['model'])
usable = usable_map.get(key, 0)
amount = convert_unit(req['amount'])
# 这里规定,不能完全占满,只能略小于,毕竟节点上可能还有其他服务会动态占用资源
if amount >= usable:
exceeded_resources.append({
'type': req['type'],
'model': req['model'],
'requested': amount,
'usable(stock-consumed)': usable
})
print("超出库存的部件请求:", exceeded_resources)
return exceeded_resources
if __name__ == '__main__':
# unittest.main()
test_something2()