kboss/b/baiduc/get_orderdetail_by_uuid.dspy
2025-12-08 16:30:30 +08:00

97 lines
3.3 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

AK = 'ALTAKPk92fX9cgGDax83yNL8mP'
SK = '9b16b8efd4dc463d8bbd5462db1db8a5'
async def get_auth_header(method: str, url: str, header: dict, query={}, get_res=False):
import urllib
import hmac
# 解析uri
url_parse = urllib.parse.urlparse(url)
uri = url_parse.path
url_query = url_parse.query
# 获取query
if url_query:
query = dict(urllib.parse.parse_qsl(url_query))
# 2.x-bce-date
x_bce_date = time.gmtime()
x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date)
# 4.认证字符串前缀
authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" + "1800"
# windows下为urllib.parse.quoteLinux下为urllib.quote
# 5.生成CanonicalRequest
# 5.1生成CanonicalURI
CanonicalURI = urllib.parse.quote(uri)
# 如果您调用的接口的query比较复杂的话需要做额外处理
# 5.2生成CanonicalQueryString
# CanonicalQueryString = query
result = ['%s=%s' % (k, urllib.parse.quote(str(v))) for k, v in query.items() if k.lower != 'authorization']
result.sort()
CanonicalQueryString = '&'.join(result)
# 5.3生成CanonicalHeaders
result = []
signedHeaders_list = []
for key,value in header.items():
tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe=""))
signedHeaders_list.append(str(urllib.parse.quote(key.lower(),safe="")))
result.append(tempStr)
result.sort()
signedHeaders = ';'.join(sorted(signedHeaders_list))
CanonicalHeaders = "\n".join(result)
# 5.4拼接得到CanonicalRequest
CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders
# 6.生成signingKey
signingKey = hmac.new(SK.encode('utf-8'), authStringPrefix.encode('utf-8'), hashlib.sha256)
# 7.生成Signature
Signature = hmac.new((signingKey.hexdigest()).encode('utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256)
# 8.生成Authorization并放到header里
header['Authorization'] = authStringPrefix + "/" + signedHeaders + "/" + Signature.hexdigest()
if get_res:
# 异步请求链接 返回结果
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
allow_redirects=True,
json=query) as res:
return res
else:
return header
async def get_orderdetail_by_uuid(ns={}):
order_id = ns.get('order_id')
baidu_id = ns.get('baidu_id')
method = 'POST'
nss = {'uuids': [order_id], 'queryAccountId': baidu_id}
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
url = 'https://billing.baidubce.com/v1/order/getByUuid?%s' % ns_format
method = 'POST'
header = {
"Host": "billing.baidubce.com",
"ContentType": "application/json;charset=UTF-8"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=nss) as res:
data_ = await res.json()
return {
'status': True,
'msg': 'get baidu order detail success',
'data': data_
}
ret = await get_orderdetail_by_uuid(params_kw)
return ret