This commit is contained in:
yumoqing 2025-09-23 12:15:24 +08:00
parent ad2b47a890
commit 38d44e61f3

View File

@ -12,18 +12,18 @@ Host = "visual.volcengineapi.com"
ContentType = "application/json" ContentType = "application/json"
def utc_now(): def utc_now():
try: try:
from datetime import timezone from datetime import timezone
return datetime.datetime.now(timezone.utc) return datetime.datetime.now(timezone.utc)
except ImportError: except ImportError:
class UTC(datetime.tzinfo): class UTC(datetime.tzinfo):
def utcoffset(self, dt): def utcoffset(self, dt):
return datetime.timedelta(0) return datetime.timedelta(0)
def tzname(self, dt): def tzname(self, dt):
return "UTC" return "UTC"
def dst(self, dt): def dst(self, dt):
return datetime.timedelta(0) return datetime.timedelta(0)
return datetime.datetime.now(UTC()) return datetime.datetime.now(UTC())
def jm_timestamp(): def jm_timestamp():
dt = utc_new() dt = utc_new()
@ -31,11 +31,11 @@ def jm_timestamp():
# sha256 非对称加密 # sha256 非对称加密
def hmac_sha256(key: bytes, content: str): def hmac_sha256(key: bytes, content: str):
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
# sha256 hash算法 # sha256 hash算法
def hash_sha256(content: str): def hash_sha256(content: str):
return hashlib.sha256(content.encode("utf-8")).hexdigest() return hashlib.sha256(content.encode("utf-8")).hexdigest()
def jimeng_auth_headers(opts): def jimeng_auth_headers(opts):
apikey = opts.get('apikey') apikey = opts.get('apikey')
@ -48,51 +48,51 @@ def jimeng_auth_headers(opts):
content_type = headers.get('Content-Type') content_type = headers.get('Content-Type')
x_date = jm_timestamp() x_date = jm_timestamp()
short_x_date = DT[:8] short_x_date = DT[:8]
credential = { credential = {
"access_key_id": apikey, "access_key_id": apikey,
"secret_access_key": secretkey, "secret_access_key": secretkey,
"service": Service, "service": Service,
"region": Region, "region": Region,
} }
x_content_sha256 = hash_sha256(body) x_content_sha256 = hash_sha256(body)
sign_result = { sign_result = {
"Host": Host, "Host": Host,
"X-Content-Sha256": x_content_sha256, "X-Content-Sha256": x_content_sha256,
"X-Date": x_date, "X-Date": x_date,
"Content-Type": ContentType "Content-Type": ContentType
} }
headers.update(sign_result) headers.update(sign_result)
signed_headers_str = ";".join( signed_headers_str = ";".join(
["content-type", "host", "x-content-sha256", "x-date"] ["content-type", "host", "x-content-sha256", "x-date"]
) )
canonical_request_str = "\n".join( canonical_request_str = "\n".join(
[method.upper(), [method.upper(),
path, path,
norm_query(params), norm_query(params),
"\n".join( "\n".join(
[ [
"content-type:" + content_type, "content-type:" + content_type,
"host:" + Host, "host:" + Host,
"x-content-sha256:" + x_content_sha256, "x-content-sha256:" + x_content_sha256,
"x-date:" + x_date, "x-date:" + x_date,
] ]
), ),
"", "",
signed_headers_str, signed_headers_str,
x_content_sha256, x_content_sha256,
] ]
) )
hashed_canonical_request = hash_sha256(canonical_request_str) hashed_canonical_request = hash_sha256(canonical_request_str)
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
k_date = hmac_sha256(secretkey.encode("utf-8"), short_x_date) k_date = hmac_sha256(secretkey.encode("utf-8"), short_x_date)
k_region = hmac_sha256(k_date, credential["region"]) k_region = hmac_sha256(k_date, credential["region"])
k_service = hmac_sha256(k_region, credential["service"]) k_service = hmac_sha256(k_region, credential["service"])
k_signing = hmac_sha256(k_service, "request") k_signing = hmac_sha256(k_service, "request")
signature = hmac_sha256(k_signing, string_to_sign).hex() signature = hmac_sha256(k_signing, string_to_sign).hex()
headers['Authorization'] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( headers['Authorization'] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
apikey + "/" + credential_scope, apikey + "/" + credential_scope,
signed_headers_str, signed_headers_str,
signature, signature,
) )