import datetime import hashlib import hmac import json from urllib.parse import quote import aiohttp import asyncio import requests # 以下参数视服务不同而不同,一个服务内通常是一致的 Service = "iam" # Version = "2021-08-01" Version = "2018-01-01" Region = "cn-beijing" Host = "iam.volcengineapi.com" ContentType = "application/x-www-form-urlencoded" # 请求的凭证,从IAM或者STS服务中获取 AK = "AKLTZTllYTNlMTUxOGNlNDQ2NmIzZDYwM2I0ZTExNjEwNGU" SK = "TXpjNVpXWTFNVGd6TkRsa05EWmpPVGs0T1RVeU1EUm1Oekl4TVRNNFpURQ==" # 当使用临时凭证时,需要使用到SessionToken传入Header,并计算进SignedHeader中,请自行在header参数中添加X-Security-Token头 # SessionToken = "" async def norm_query(params): query = "" for key in sorted(params.keys()): if type(params[key]) == list: for k in params[key]: query = ( query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&" ) else: query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&") query = query[:-1] return query.replace("+", "%20") # 第一步:准备辅助函数。 # sha256 非对称加密 async def hmac_sha256(key: bytes, content: str): return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() # sha256 hash算法 async def hash_sha256(content: str): return hashlib.sha256(content.encode("utf-8")).hexdigest() # 第二步:签名请求函数 async def volcengine_requestapi(method=None, query={}, header={}, action=None, body=None, version=None): # 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表 # AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。 # 初始化身份证明结构体 date = datetime.datetime.utcnow() credential = { "access_key_id": "AKLTZjgwZjlkNGIwMzEyNDZlZDgyN2E3NWViYTJiMjk4OTY", "secret_access_key": "T0dOaVpXWm1OMlZtTURBek5HWTBPR0V6TUdWa09EbGpZMlEwTUdFME5ESQ==", "service": Service, "region": Region, } # 初始化签名结构体 request_param = { "body": body, "host": Host, "path": "/", "method": method, "content_type": ContentType, "date": date, "query": {"Action": action, "Version": version, **query}, } if body is None: request_param["body"] = "" # 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。 # 初始化签名结果的结构体 x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8] x_content_sha256 = await hash_sha256(request_param["body"]) sign_result = { "Host": request_param["host"], "X-Content-Sha256": x_content_sha256, "X-Date": x_date, "Content-Type": request_param["content_type"], } # 第五步:计算 Signature 签名。 signed_headers_str = ";".join( ["content-type", "host", "x-content-sha256", "x-date"] ) # signed_headers_str = signed_headers_str + ";x-security-token" canonical_request_str = "\n".join( [request_param["method"].upper(), request_param["path"], await norm_query(request_param["query"]), "\n".join( [ "content-type:" + request_param["content_type"], "host:" + request_param["host"], "x-content-sha256:" + x_content_sha256, "x-date:" + x_date, ] ), "", signed_headers_str, x_content_sha256, ] ) # 打印正规化的请求用于调试比对 # print(canonical_request_str) hashed_canonical_request = await hash_sha256(canonical_request_str) # 打印hash值用于调试比对 # print(hashed_canonical_request) credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) # 打印最终计算的签名字符串用于调试比对 # print(string_to_sign) k_date = await hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date) k_region = await hmac_sha256(k_date, credential["region"]) k_service = await hmac_sha256(k_region, credential["service"]) k_signing = await hmac_sha256(k_service, "request") signature = await hmac_sha256(k_signing, string_to_sign) signature = signature.hex() sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( credential["access_key_id"] + "/" + credential_scope, signed_headers_str, signature, ) header = {**header, **sign_result} # header = {**header, **{"X-Security-Token": SessionToken}} # 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。 async with aiohttp.ClientSession() as session: async with session.request(method=method, url="https://{}{}".format(request_param["host"], request_param["path"]), headers=header, params=request_param["query"], data=request_param["body"]) as resp: return await resp.json()