#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import time import urllib import hmac import hashlib # 1.AK/SK、host、method、URL绝对路径、querystring # AK = 'ALTAKHxKxmjjm7Z19TpUxPAinY' # SK = '5db9420661a549ee84440284ad07c229' AK = 'ALTAKPk92fX9cgGDax83yNL8mP' SK = '9b16b8efd4dc463d8bbd5462db1db8a5' # 2.x-bce-date x_bce_date = time.gmtime() x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date) async def get_auth_header(method: str, url: str, header: dict, query={}, get_res=False): # 解析uri url_parse = urllib.parse.urlparse(url) uri = url_parse.path url_query = url_parse.query # 获取query if url_query: query = dict(urllib.parse.parse_qsl(url_query)) # 4.认证字符串前缀 authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" +"1800" # windows下为urllib.parse.quote,Linux下为urllib.quote # 5.生成CanonicalRequest # 5.1生成CanonicalURI CanonicalURI = urllib.parse.quote(uri) # 如果您调用的接口的query比较复杂的话,需要做额外处理 # 5.2生成CanonicalQueryString # CanonicalQueryString = query result = ['%s=%s' % (k, urllib.parse.quote(str(v))) for k, v in query.items() if k.lower != 'authorization'] result.sort() CanonicalQueryString = '&'.join(result) # 5.3生成CanonicalHeaders result = [] signedHeaders_list = [] for key,value in header.items(): tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe="")) signedHeaders_list.append(str(urllib.parse.quote(key.lower(),safe=""))) result.append(tempStr) result.sort() signedHeaders = ';'.join(sorted(signedHeaders_list)) CanonicalHeaders = "\n".join(result) # 5.4拼接得到CanonicalRequest CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders # 6.生成signingKey signingKey = hmac.new(SK.encode('utf-8'), authStringPrefix.encode('utf-8'), hashlib.sha256) # 7.生成Signature Signature = hmac.new((signingKey.hexdigest()).encode('utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256) # 8.生成Authorization并放到header里 header['Authorization'] = authStringPrefix + "/" + signedHeaders + "/" + Signature.hexdigest() if get_res: # 异步请求链接 返回结果 async with aiohttp_client.request( method=method, url=url, headers=header, allow_redirects=True, json=query) as res: return res else: return header