97 lines
3.3 KiB
Plaintext
97 lines
3.3 KiB
Plaintext
AK = 'ALTAKPk92fX9cgGDax83yNL8mP'
|
||
SK = '9b16b8efd4dc463d8bbd5462db1db8a5'
|
||
|
||
async def get_auth_header(method: str, url: str, header: dict, query={}, get_res=False):
|
||
import urllib
|
||
import hmac
|
||
# 解析uri
|
||
url_parse = urllib.parse.urlparse(url)
|
||
uri = url_parse.path
|
||
url_query = url_parse.query
|
||
|
||
# 获取query
|
||
if url_query:
|
||
query = dict(urllib.parse.parse_qsl(url_query))
|
||
|
||
# 2.x-bce-date
|
||
x_bce_date = time.gmtime()
|
||
x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date)
|
||
|
||
# 4.认证字符串前缀
|
||
authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" + "1800"
|
||
|
||
# windows下为urllib.parse.quote,Linux下为urllib.quote
|
||
# 5.生成CanonicalRequest
|
||
# 5.1生成CanonicalURI
|
||
CanonicalURI = urllib.parse.quote(uri)
|
||
|
||
# 如果您调用的接口的query比较复杂的话,需要做额外处理
|
||
# 5.2生成CanonicalQueryString
|
||
# CanonicalQueryString = query
|
||
result = ['%s=%s' % (k, urllib.parse.quote(str(v))) for k, v in query.items() if k.lower != 'authorization']
|
||
result.sort()
|
||
CanonicalQueryString = '&'.join(result)
|
||
|
||
# 5.3生成CanonicalHeaders
|
||
result = []
|
||
signedHeaders_list = []
|
||
for key,value in header.items():
|
||
tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe=""))
|
||
signedHeaders_list.append(str(urllib.parse.quote(key.lower(),safe="")))
|
||
result.append(tempStr)
|
||
result.sort()
|
||
signedHeaders = ';'.join(sorted(signedHeaders_list))
|
||
CanonicalHeaders = "\n".join(result)
|
||
|
||
# 5.4拼接得到CanonicalRequest
|
||
CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders
|
||
|
||
# 6.生成signingKey
|
||
signingKey = hmac.new(SK.encode('utf-8'), authStringPrefix.encode('utf-8'), hashlib.sha256)
|
||
|
||
# 7.生成Signature
|
||
Signature = hmac.new((signingKey.hexdigest()).encode('utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256)
|
||
|
||
# 8.生成Authorization并放到header里
|
||
header['Authorization'] = authStringPrefix + "/" + signedHeaders + "/" + Signature.hexdigest()
|
||
|
||
if get_res:
|
||
# 异步请求链接 返回结果
|
||
async with aiohttp_client.request(
|
||
method=method,
|
||
url=url,
|
||
headers=header,
|
||
allow_redirects=True,
|
||
json=query) as res:
|
||
return res
|
||
else:
|
||
return header
|
||
|
||
|
||
async def get_orderdetail_by_uuid(ns={}):
|
||
order_id = ns.get('order_id')
|
||
baidu_id = ns.get('baidu_id')
|
||
method = 'POST'
|
||
nss = {'uuids': [order_id], 'queryAccountId': baidu_id}
|
||
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
|
||
url = 'https://billing.baidubce.com/v1/order/getByUuid?%s' % ns_format
|
||
method = 'POST'
|
||
header = {
|
||
"Host": "billing.baidubce.com",
|
||
"ContentType": "application/json;charset=UTF-8"
|
||
}
|
||
header = await get_auth_header(method=method, url=url, header=header)
|
||
async with aiohttp_client.request(
|
||
method=method,
|
||
url=url,
|
||
headers=header,
|
||
json=nss) as res:
|
||
data_ = await res.json()
|
||
return {
|
||
'status': True,
|
||
'msg': 'get baidu order detail success',
|
||
'data': data_
|
||
}
|
||
|
||
ret = await get_orderdetail_by_uuid(params_kw)
|
||
return ret |