diff --git a/reallife_asset/rl_volcengine_client.py b/reallife_asset/rl_volcengine_client.py index 697bc18..55dda9f 100644 --- a/reallife_asset/rl_volcengine_client.py +++ b/reallife_asset/rl_volcengine_client.py @@ -1,13 +1,13 @@ """ Volcengine Ark API Client for Real Person Portrait Asset Management. -Implements V4 HMAC-SHA256 signing. -Uses StreamHttpClient for HTTP requests. -Reads AK/SK from rl_vendor_config table. +Implements V4 HMAC-SHA256 signing based on official Volcengine docs: +https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py """ import json import hashlib import hmac import datetime +from urllib.parse import quote from appPublic.log import debug, error from appPublic.streamhttpclient import StreamHttpClient from ahserver.serverenv import ServerEnv @@ -19,37 +19,37 @@ HOST = "open.volcengineapi.com" BASE_URL = f"https://{HOST}" -def _sign(key, msg): - """HMAC-SHA256 sign.""" - return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() +def _hmac_sha256(key: bytes, content: str): + return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() -def _get_signature_key(secret_key, date_stamp, region, service): - """Derive the signing key.""" - k_date = _sign(secret_key.encode("utf-8"), date_stamp) - k_region = _sign(k_date, region) - k_service = _sign(k_region, service) - k_signing = _sign(k_service, "request") - return k_signing +def _hash_sha256(content: str): + return hashlib.sha256(content.encode("utf-8")).hexdigest() -def _uri_encode(s, encode_slash=True): - """URI encode string per V4 spec.""" - import urllib.parse - if encode_slash: - return urllib.parse.quote(s, safe='') - else: - return urllib.parse.quote(s, safe='/') +def _norm_query(params): + """Build canonical query string per Volcengine V4 spec.""" + query = "" + for key in sorted(params.keys()): + if isinstance(params[key], list): + for v in params[key]: + query += quote(key, safe="-_.~") + "=" + quote(v, safe="-_.~") + "&" + else: + query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&" + query = query[:-1] # remove trailing & + return query.replace("+", "%20") async def call_volcengine_api(vendor, operation, params, api_mapping): """ Call Volcengine Ark API with V4 signing. + Follows official signing code: + https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py """ action = api_mapping.get(operation) if not action: return {"error": f"未配置操作: {operation}"} - + # Read AK/SK from vendor config from sqlor.dbpools import DBPools dbname = ServerEnv().get_module_dbname("reallife_asset") @@ -61,93 +61,92 @@ async def call_volcengine_api(vendor, operation, params, api_mapping): rec = recs[0] ak = getattr(rec, "ak", "") sk = getattr(rec, "sk", "") - + if not ak or not sk: return {"error": "AK/SK未配置"} - + # Timestamps now = datetime.datetime.utcnow() - date_stamp = now.strftime("%Y%m%d") - amz_date = now.strftime("%Y%m%dT%H%M%SZ") - - # Query string (sorted and URI-encoded) + x_date = now.strftime("%Y%m%dT%H%M%SZ") + short_x_date = x_date[:8] + + # Payload + body = json.dumps(params, ensure_ascii=False) + x_content_sha256 = _hash_sha256(body) + + # Query params: Action + Version query_params = { "Action": action, "Version": VERSION, } - canonical_querystring = "&".join( - f"{_uri_encode(k)}={_uri_encode(v)}" - for k, v in sorted(query_params.items()) + canonical_querystring = _norm_query(query_params) + + # Signed headers (hardcoded order matches official example) + content_type = "application/json" + signed_headers_str = ";".join( + ["content-type", "host", "x-content-sha256", "x-date"] ) - - # Payload - payload = json.dumps(params, ensure_ascii=False, separators=(',', ':')) - payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest() - - # Headers to sign (lowercase keys, trimmed values) - headers_to_sign = { - "content-type": "application/json", - "host": HOST, - "x-content-sha256": payload_hash, - "x-date": amz_date, - } - - # Canonical headers (sorted, each ends with \n) - canonical_headers = "" - for key in sorted(headers_to_sign.keys()): - canonical_headers += f"{key}:{headers_to_sign[key]}\n" - - # Signed headers list - signed_headers = ";".join(sorted(headers_to_sign.keys())) - + # Canonical request - canonical_request = "\n".join([ + canonical_request_str = "\n".join([ "POST", "/", canonical_querystring, - canonical_headers, - signed_headers, - payload_hash, + "\n".join([ + "content-type:" + content_type, + "host:" + HOST, + "x-content-sha256:" + x_content_sha256, + "x-date:" + x_date, + ]), + "", + signed_headers_str, + x_content_sha256, ]) - + + hashed_canonical_request = _hash_sha256(canonical_request_str) + # String to sign - credential_scope = f"{date_stamp}/{REGION}/{SERVICE}/request" + credential_scope = "/".join([short_x_date, REGION, SERVICE, "request"]) string_to_sign = "\n".join([ "HMAC-SHA256", - amz_date, + x_date, credential_scope, - hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(), + hashed_canonical_request, ]) - - # Signing key and signature - signing_key = _get_signature_key(sk, date_stamp, REGION, SERVICE) - signature = hmac.new( - signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 - ).hexdigest() - + + # Signing key derivation + k_date = _hmac_sha256(sk.encode("utf-8"), short_x_date) + k_region = _hmac_sha256(k_date, REGION) + k_service = _hmac_sha256(k_region, SERVICE) + k_signing = _hmac_sha256(k_service, "request") + + # Signature + signature = _hmac_sha256(k_signing, string_to_sign).hex() + # Authorization header - authorization = ( - f"HMAC-SHA256 " - f"Credential={ak}/{credential_scope}, " - f"SignedHeaders={signed_headers}, " - f"Signature={signature}" + authorization = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( + ak + "/" + credential_scope, + signed_headers_str, + signature, ) - - # Build request + + # Build HTTP request url = f"{BASE_URL}/?{canonical_querystring}" req_headers = { "Host": HOST, - "X-Date": amz_date, - "X-Content-Sha256": payload_hash, - "Content-Type": "application/json", + "X-Date": x_date, + "X-Content-Sha256": x_content_sha256, + "Content-Type": content_type, "Authorization": authorization, } - - # Make HTTP request + + debug(f"volcengine {operation} url={url}") + debug(f"volcengine {operation} canonical_request:\n{canonical_request_str}") + try: hc = StreamHttpClient() - raw = await hc.request("POST", url, headers=req_headers, data=payload.encode("utf-8")) - + raw = await hc.request("POST", url, headers=req_headers, data=body.encode("utf-8")) + if isinstance(raw, bytes): raw = raw.decode("utf-8") result = json.loads(raw) if raw else {}