fix: rewrite V4 signing to match official Volcengine Python example

This commit is contained in:
yumoqing 2026-05-29 18:43:10 +08:00
parent 073bd711c8
commit 2fd15eeb1e

View File

@ -1,13 +1,13 @@
""" """
Volcengine Ark API Client for Real Person Portrait Asset Management. Volcengine Ark API Client for Real Person Portrait Asset Management.
Implements V4 HMAC-SHA256 signing. Implements V4 HMAC-SHA256 signing based on official Volcengine docs:
Uses StreamHttpClient for HTTP requests. https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py
Reads AK/SK from rl_vendor_config table.
""" """
import json import json
import hashlib import hashlib
import hmac import hmac
import datetime import datetime
from urllib.parse import quote
from appPublic.log import debug, error from appPublic.log import debug, error
from appPublic.streamhttpclient import StreamHttpClient from appPublic.streamhttpclient import StreamHttpClient
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
@ -19,37 +19,37 @@ HOST = "open.volcengineapi.com"
BASE_URL = f"https://{HOST}" BASE_URL = f"https://{HOST}"
def _sign(key, msg): def _hmac_sha256(key: bytes, content: str):
"""HMAC-SHA256 sign.""" return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(secret_key, date_stamp, region, service): def _hash_sha256(content: str):
"""Derive the signing key.""" return hashlib.sha256(content.encode("utf-8")).hexdigest()
k_date = _sign(secret_key.encode("utf-8"), date_stamp)
k_region = _sign(k_date, region)
k_service = _sign(k_region, service)
k_signing = _sign(k_service, "request")
return k_signing
def _uri_encode(s, encode_slash=True): def _norm_query(params):
"""URI encode string per V4 spec.""" """Build canonical query string per Volcengine V4 spec."""
import urllib.parse query = ""
if encode_slash: for key in sorted(params.keys()):
return urllib.parse.quote(s, safe='') if isinstance(params[key], list):
else: for v in params[key]:
return urllib.parse.quote(s, safe='/') query += quote(key, safe="-_.~") + "=" + quote(v, safe="-_.~") + "&"
else:
query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&"
query = query[:-1] # remove trailing &
return query.replace("+", "%20")
async def call_volcengine_api(vendor, operation, params, api_mapping): async def call_volcengine_api(vendor, operation, params, api_mapping):
""" """
Call Volcengine Ark API with V4 signing. Call Volcengine Ark API with V4 signing.
Follows official signing code:
https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py
""" """
action = api_mapping.get(operation) action = api_mapping.get(operation)
if not action: if not action:
return {"error": f"未配置操作: {operation}"} return {"error": f"未配置操作: {operation}"}
# Read AK/SK from vendor config # Read AK/SK from vendor config
from sqlor.dbpools import DBPools from sqlor.dbpools import DBPools
dbname = ServerEnv().get_module_dbname("reallife_asset") dbname = ServerEnv().get_module_dbname("reallife_asset")
@ -61,93 +61,92 @@ async def call_volcengine_api(vendor, operation, params, api_mapping):
rec = recs[0] rec = recs[0]
ak = getattr(rec, "ak", "") ak = getattr(rec, "ak", "")
sk = getattr(rec, "sk", "") sk = getattr(rec, "sk", "")
if not ak or not sk: if not ak or not sk:
return {"error": "AK/SK未配置"} return {"error": "AK/SK未配置"}
# Timestamps # Timestamps
now = datetime.datetime.utcnow() now = datetime.datetime.utcnow()
date_stamp = now.strftime("%Y%m%d") x_date = now.strftime("%Y%m%dT%H%M%SZ")
amz_date = now.strftime("%Y%m%dT%H%M%SZ") short_x_date = x_date[:8]
# Query string (sorted and URI-encoded) # Payload
body = json.dumps(params, ensure_ascii=False)
x_content_sha256 = _hash_sha256(body)
# Query params: Action + Version
query_params = { query_params = {
"Action": action, "Action": action,
"Version": VERSION, "Version": VERSION,
} }
canonical_querystring = "&".join( canonical_querystring = _norm_query(query_params)
f"{_uri_encode(k)}={_uri_encode(v)}"
for k, v in sorted(query_params.items()) # Signed headers (hardcoded order matches official example)
content_type = "application/json"
signed_headers_str = ";".join(
["content-type", "host", "x-content-sha256", "x-date"]
) )
# Payload
payload = json.dumps(params, ensure_ascii=False, separators=(',', ':'))
payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()
# Headers to sign (lowercase keys, trimmed values)
headers_to_sign = {
"content-type": "application/json",
"host": HOST,
"x-content-sha256": payload_hash,
"x-date": amz_date,
}
# Canonical headers (sorted, each ends with \n)
canonical_headers = ""
for key in sorted(headers_to_sign.keys()):
canonical_headers += f"{key}:{headers_to_sign[key]}\n"
# Signed headers list
signed_headers = ";".join(sorted(headers_to_sign.keys()))
# Canonical request # Canonical request
canonical_request = "\n".join([ canonical_request_str = "\n".join([
"POST", "POST",
"/", "/",
canonical_querystring, canonical_querystring,
canonical_headers, "\n".join([
signed_headers, "content-type:" + content_type,
payload_hash, "host:" + HOST,
"x-content-sha256:" + x_content_sha256,
"x-date:" + x_date,
]),
"",
signed_headers_str,
x_content_sha256,
]) ])
hashed_canonical_request = _hash_sha256(canonical_request_str)
# String to sign # String to sign
credential_scope = f"{date_stamp}/{REGION}/{SERVICE}/request" credential_scope = "/".join([short_x_date, REGION, SERVICE, "request"])
string_to_sign = "\n".join([ string_to_sign = "\n".join([
"HMAC-SHA256", "HMAC-SHA256",
amz_date, x_date,
credential_scope, credential_scope,
hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(), hashed_canonical_request,
]) ])
# Signing key and signature # Signing key derivation
signing_key = _get_signature_key(sk, date_stamp, REGION, SERVICE) k_date = _hmac_sha256(sk.encode("utf-8"), short_x_date)
signature = hmac.new( k_region = _hmac_sha256(k_date, REGION)
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 k_service = _hmac_sha256(k_region, SERVICE)
).hexdigest() k_signing = _hmac_sha256(k_service, "request")
# Signature
signature = _hmac_sha256(k_signing, string_to_sign).hex()
# Authorization header # Authorization header
authorization = ( authorization = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
f"HMAC-SHA256 " ak + "/" + credential_scope,
f"Credential={ak}/{credential_scope}, " signed_headers_str,
f"SignedHeaders={signed_headers}, " signature,
f"Signature={signature}"
) )
# Build request # Build HTTP request
url = f"{BASE_URL}/?{canonical_querystring}" url = f"{BASE_URL}/?{canonical_querystring}"
req_headers = { req_headers = {
"Host": HOST, "Host": HOST,
"X-Date": amz_date, "X-Date": x_date,
"X-Content-Sha256": payload_hash, "X-Content-Sha256": x_content_sha256,
"Content-Type": "application/json", "Content-Type": content_type,
"Authorization": authorization, "Authorization": authorization,
} }
# Make HTTP request debug(f"volcengine {operation} url={url}")
debug(f"volcengine {operation} canonical_request:\n{canonical_request_str}")
try: try:
hc = StreamHttpClient() hc = StreamHttpClient()
raw = await hc.request("POST", url, headers=req_headers, data=payload.encode("utf-8")) raw = await hc.request("POST", url, headers=req_headers, data=body.encode("utf-8"))
if isinstance(raw, bytes): if isinstance(raw, bytes):
raw = raw.decode("utf-8") raw = raw.decode("utf-8")
result = json.loads(raw) if raw else {} result = json.loads(raw) if raw else {}