140 lines
5.4 KiB
Plaintext
140 lines
5.4 KiB
Plaintext
import datetime
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
from urllib.parse import quote
|
||
import aiohttp
|
||
import asyncio
|
||
import requests
|
||
|
||
# 以下参数视服务不同而不同,一个服务内通常是一致的
|
||
Service = "iam"
|
||
# Version = "2021-08-01"
|
||
Version = "2018-01-01"
|
||
Region = "cn-beijing"
|
||
Host = "iam.volcengineapi.com"
|
||
ContentType = "application/x-www-form-urlencoded"
|
||
|
||
# 请求的凭证,从IAM或者STS服务中获取
|
||
AK = "AKLTZTllYTNlMTUxOGNlNDQ2NmIzZDYwM2I0ZTExNjEwNGU"
|
||
SK = "TXpjNVpXWTFNVGd6TkRsa05EWmpPVGs0T1RVeU1EUm1Oekl4TVRNNFpURQ=="
|
||
# 当使用临时凭证时,需要使用到SessionToken传入Header,并计算进SignedHeader中,请自行在header参数中添加X-Security-Token头
|
||
# SessionToken = ""
|
||
|
||
|
||
async def norm_query(params):
|
||
query = ""
|
||
for key in sorted(params.keys()):
|
||
if type(params[key]) == list:
|
||
for k in params[key]:
|
||
query = (
|
||
query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
|
||
)
|
||
else:
|
||
query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
|
||
query = query[:-1]
|
||
return query.replace("+", "%20")
|
||
|
||
|
||
# 第一步:准备辅助函数。
|
||
# sha256 非对称加密
|
||
async def hmac_sha256(key: bytes, content: str):
|
||
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
|
||
|
||
|
||
# sha256 hash算法
|
||
async def hash_sha256(content: str):
|
||
return hashlib.sha256(content.encode("utf-8")).hexdigest()
|
||
|
||
|
||
# 第二步:签名请求函数
|
||
async def volcengine_requestapi(method=None, query={}, header={}, action=None, body=None, version=None):
|
||
# 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
|
||
# AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
|
||
# 初始化身份证明结构体
|
||
date = datetime.datetime.utcnow()
|
||
credential = {
|
||
"access_key_id": "AKLTZjgwZjlkNGIwMzEyNDZlZDgyN2E3NWViYTJiMjk4OTY",
|
||
"secret_access_key": "T0dOaVpXWm1OMlZtTURBek5HWTBPR0V6TUdWa09EbGpZMlEwTUdFME5ESQ==",
|
||
"service": Service,
|
||
"region": Region,
|
||
}
|
||
# 初始化签名结构体
|
||
request_param = {
|
||
"body": body,
|
||
"host": Host,
|
||
"path": "/",
|
||
"method": method,
|
||
"content_type": ContentType,
|
||
"date": date,
|
||
"query": {"Action": action, "Version": version, **query},
|
||
}
|
||
if body is None:
|
||
request_param["body"] = ""
|
||
# 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
|
||
# 初始化签名结果的结构体
|
||
x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
|
||
short_x_date = x_date[:8]
|
||
x_content_sha256 = await hash_sha256(request_param["body"])
|
||
sign_result = {
|
||
"Host": request_param["host"],
|
||
"X-Content-Sha256": x_content_sha256,
|
||
"X-Date": x_date,
|
||
"Content-Type": request_param["content_type"],
|
||
}
|
||
# 第五步:计算 Signature 签名。
|
||
signed_headers_str = ";".join(
|
||
["content-type", "host", "x-content-sha256", "x-date"]
|
||
)
|
||
# signed_headers_str = signed_headers_str + ";x-security-token"
|
||
canonical_request_str = "\n".join(
|
||
[request_param["method"].upper(),
|
||
request_param["path"],
|
||
await norm_query(request_param["query"]),
|
||
"\n".join(
|
||
[
|
||
"content-type:" + request_param["content_type"],
|
||
"host:" + request_param["host"],
|
||
"x-content-sha256:" + x_content_sha256,
|
||
"x-date:" + x_date,
|
||
]
|
||
),
|
||
"",
|
||
signed_headers_str,
|
||
x_content_sha256,
|
||
]
|
||
)
|
||
|
||
# 打印正规化的请求用于调试比对
|
||
# print(canonical_request_str)
|
||
hashed_canonical_request = await hash_sha256(canonical_request_str)
|
||
|
||
# 打印hash值用于调试比对
|
||
# print(hashed_canonical_request)
|
||
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
|
||
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
|
||
|
||
# 打印最终计算的签名字符串用于调试比对
|
||
# print(string_to_sign)
|
||
k_date = await hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
|
||
k_region = await hmac_sha256(k_date, credential["region"])
|
||
k_service = await hmac_sha256(k_region, credential["service"])
|
||
k_signing = await hmac_sha256(k_service, "request")
|
||
signature = await hmac_sha256(k_signing, string_to_sign)
|
||
signature = signature.hex()
|
||
|
||
sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
|
||
credential["access_key_id"] + "/" + credential_scope,
|
||
signed_headers_str,
|
||
signature,
|
||
)
|
||
header = {**header, **sign_result}
|
||
# header = {**header, **{"X-Security-Token": SessionToken}}
|
||
# 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.request(method=method,
|
||
url="https://{}{}".format(request_param["host"], request_param["path"]),
|
||
headers=header,
|
||
params=request_param["query"],
|
||
data=request_param["body"]) as resp:
|
||
return await resp.json() |