salescrm/b/pub/volcengine_requestapi.dspy
2025-10-27 15:50:44 +08:00

140 lines
5.4 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import hashlib
import hmac
import json
from urllib.parse import quote
import aiohttp
import asyncio
import requests
# 以下参数视服务不同而不同,一个服务内通常是一致的
Service = "iam"
# Version = "2021-08-01"
Version = "2018-01-01"
Region = "cn-beijing"
Host = "iam.volcengineapi.com"
ContentType = "application/x-www-form-urlencoded"
# 请求的凭证从IAM或者STS服务中获取
AK = "AKLTZTllYTNlMTUxOGNlNDQ2NmIzZDYwM2I0ZTExNjEwNGU"
SK = "TXpjNVpXWTFNVGd6TkRsa05EWmpPVGs0T1RVeU1EUm1Oekl4TVRNNFpURQ=="
# 当使用临时凭证时需要使用到SessionToken传入Header并计算进SignedHeader中请自行在header参数中添加X-Security-Token头
# SessionToken = ""
async def norm_query(params):
query = ""
for key in sorted(params.keys()):
if type(params[key]) == list:
for k in params[key]:
query = (
query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
)
else:
query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
query = query[:-1]
return query.replace("+", "%20")
# 第一步:准备辅助函数。
# sha256 非对称加密
async def hmac_sha256(key: bytes, content: str):
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
# sha256 hash算法
async def hash_sha256(content: str):
return hashlib.sha256(content.encode("utf-8")).hexdigest()
# 第二步:签名请求函数
async def volcengine_requestapi(method=None, query={}, header={}, action=None, body=None, version=None):
# 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
# AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
# 初始化身份证明结构体
date = datetime.datetime.utcnow()
credential = {
"access_key_id": "AKLTZjgwZjlkNGIwMzEyNDZlZDgyN2E3NWViYTJiMjk4OTY",
"secret_access_key": "T0dOaVpXWm1OMlZtTURBek5HWTBPR0V6TUdWa09EbGpZMlEwTUdFME5ESQ==",
"service": Service,
"region": Region,
}
# 初始化签名结构体
request_param = {
"body": body,
"host": Host,
"path": "/",
"method": method,
"content_type": ContentType,
"date": date,
"query": {"Action": action, "Version": version, **query},
}
if body is None:
request_param["body"] = ""
# 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
# 初始化签名结果的结构体
x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
short_x_date = x_date[:8]
x_content_sha256 = await hash_sha256(request_param["body"])
sign_result = {
"Host": request_param["host"],
"X-Content-Sha256": x_content_sha256,
"X-Date": x_date,
"Content-Type": request_param["content_type"],
}
# 第五步:计算 Signature 签名。
signed_headers_str = ";".join(
["content-type", "host", "x-content-sha256", "x-date"]
)
# signed_headers_str = signed_headers_str + ";x-security-token"
canonical_request_str = "\n".join(
[request_param["method"].upper(),
request_param["path"],
await norm_query(request_param["query"]),
"\n".join(
[
"content-type:" + request_param["content_type"],
"host:" + request_param["host"],
"x-content-sha256:" + x_content_sha256,
"x-date:" + x_date,
]
),
"",
signed_headers_str,
x_content_sha256,
]
)
# 打印正规化的请求用于调试比对
# print(canonical_request_str)
hashed_canonical_request = await hash_sha256(canonical_request_str)
# 打印hash值用于调试比对
# print(hashed_canonical_request)
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
# 打印最终计算的签名字符串用于调试比对
# print(string_to_sign)
k_date = await hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
k_region = await hmac_sha256(k_date, credential["region"])
k_service = await hmac_sha256(k_region, credential["service"])
k_signing = await hmac_sha256(k_service, "request")
signature = await hmac_sha256(k_signing, string_to_sign)
signature = signature.hex()
sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
credential["access_key_id"] + "/" + credential_scope,
signed_headers_str,
signature,
)
header = {**header, **sign_result}
# header = {**header, **{"X-Security-Token": SessionToken}}
# 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
async with aiohttp.ClientSession() as session:
async with session.request(method=method,
url="https://{}{}".format(request_param["host"], request_param["path"]),
headers=header,
params=request_param["query"],
data=request_param["body"]) as resp:
return await resp.json()