kboss/b/pub/get_auth_header_res.py
2025-07-16 14:27:17 +08:00

79 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import time
import urllib
import hmac
import hashlib
# 1.AK/SK、host、method、URL绝对路径、querystring
# AK = 'ALTAKHxKxmjjm7Z19TpUxPAinY'
# SK = '5db9420661a549ee84440284ad07c229'
AK = 'ALTAKPk92fX9cgGDax83yNL8mP'
SK = '9b16b8efd4dc463d8bbd5462db1db8a5'
# 2.x-bce-date
x_bce_date = time.gmtime()
x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date)
async def get_auth_header(method: str, url: str, header: dict, query={}, get_res=False):
# 解析uri
url_parse = urllib.parse.urlparse(url)
uri = url_parse.path
url_query = url_parse.query
# 获取query
if url_query:
query = dict(urllib.parse.parse_qsl(url_query))
# 4.认证字符串前缀
authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" +"1800"
# windows下为urllib.parse.quoteLinux下为urllib.quote
# 5.生成CanonicalRequest
# 5.1生成CanonicalURI
CanonicalURI = urllib.parse.quote(uri)
# 如果您调用的接口的query比较复杂的话需要做额外处理
# 5.2生成CanonicalQueryString
# CanonicalQueryString = query
result = ['%s=%s' % (k, urllib.parse.quote(str(v))) for k, v in query.items() if k.lower != 'authorization']
result.sort()
CanonicalQueryString = '&'.join(result)
# 5.3生成CanonicalHeaders
result = []
signedHeaders_list = []
for key,value in header.items():
tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe=""))
signedHeaders_list.append(str(urllib.parse.quote(key.lower(),safe="")))
result.append(tempStr)
result.sort()
signedHeaders = ';'.join(sorted(signedHeaders_list))
CanonicalHeaders = "\n".join(result)
# 5.4拼接得到CanonicalRequest
CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders
# 6.生成signingKey
signingKey = hmac.new(SK.encode('utf-8'), authStringPrefix.encode('utf-8'), hashlib.sha256)
# 7.生成Signature
Signature = hmac.new((signingKey.hexdigest()).encode('utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256)
# 8.生成Authorization并放到header里
header['Authorization'] = authStringPrefix + "/" + signedHeaders + "/" + Signature.hexdigest()
if get_res:
# 异步请求链接 返回结果
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
allow_redirects=True,
json=query) as res:
return res
else:
return header