- 支持火山方舟(Volcengine Ark)真人人像素材API - AK/SK HMAC-SHA256签名(纯stdlib实现) - 素材组合(Asset Group)管理: 创建认证、查询、删除 - 素材资产(Asset)管理: 上传、状态同步、删除 - 多供应商可扩展架构 - 完整CRUD + 前端UI + uapi SQL配置 - 12个API端点 + 6个前端页面 - 数据库表: rl_asset_group, rl_asset
270 lines
9.0 KiB
Python
270 lines
9.0 KiB
Python
"""
|
|
Volcengine Ark API Client for Real Person Portrait Asset Management.
|
|
Implements AK/SK HMAC-SHA256 signing (Volcengine V4 signature).
|
|
Uses only Python stdlib (no external dependencies).
|
|
"""
|
|
import json
|
|
import hashlib
|
|
import hmac
|
|
import datetime
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SERVICE = "ark"
|
|
REGION = "cn-beijing"
|
|
VERSION = "2024-01-01"
|
|
HOST = "open.volcengineapi.com"
|
|
BASE_URL = f"https://{HOST}"
|
|
|
|
|
|
def _sign(key, msg):
|
|
"""HMAC-SHA256 sign."""
|
|
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
|
|
|
|
|
def _get_signature_key(secret_key, date_stamp, region, service):
|
|
"""Derive the signing key."""
|
|
k_date = _sign(secret_key.encode("utf-8"), date_stamp)
|
|
k_region = _sign(k_date, region)
|
|
k_service = _sign(k_region, service)
|
|
k_signing = _sign(k_service, "request")
|
|
return k_signing
|
|
|
|
|
|
class VolcengineArkClient:
|
|
"""Client for Volcengine Ark API (Real Person Portrait Assets)."""
|
|
|
|
def __init__(self, access_key, secret_key, region=REGION):
|
|
self.access_key = access_key
|
|
self.secret_key = secret_key
|
|
self.region = region
|
|
|
|
def _build_signed_request(self, action, body_dict, method="POST"):
|
|
"""Build a signed urllib Request for the given API action."""
|
|
now = datetime.datetime.utcnow()
|
|
date_stamp = now.strftime("%Y%m%d")
|
|
amz_date = now.strftime("%Y%m%dT%H%M%SZ")
|
|
|
|
# Query string with Action and Version
|
|
query_params = {
|
|
"Action": action,
|
|
"Version": VERSION,
|
|
}
|
|
canonical_querystring = urllib.parse.urlencode(sorted(query_params.items()))
|
|
|
|
# Headers
|
|
content_type = "application/json"
|
|
payload = json.dumps(body_dict, ensure_ascii=False)
|
|
payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()
|
|
|
|
headers_to_sign = {
|
|
"host": HOST,
|
|
"x-date": amz_date,
|
|
"x-content-sha256": payload_hash,
|
|
"content-type": content_type,
|
|
}
|
|
signed_headers = ";".join(sorted(headers_to_sign.keys()))
|
|
canonical_headers = "".join(
|
|
f"{k}:{v}\n" for k, v in sorted(headers_to_sign.items())
|
|
)
|
|
|
|
# Canonical request
|
|
canonical_request = "\n".join([
|
|
method,
|
|
"/",
|
|
canonical_querystring,
|
|
canonical_headers,
|
|
signed_headers,
|
|
payload_hash,
|
|
])
|
|
|
|
# String to sign
|
|
credential_scope = f"{date_stamp}/{self.region}/{SERVICE}/request"
|
|
string_to_sign = "\n".join([
|
|
"HMAC-SHA256",
|
|
amz_date,
|
|
credential_scope,
|
|
hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
|
|
])
|
|
|
|
# Signing key and signature
|
|
signing_key = _get_signature_key(
|
|
self.secret_key, date_stamp, self.region, SERVICE
|
|
)
|
|
signature = hmac.new(
|
|
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
|
|
).hexdigest()
|
|
|
|
# Authorization header
|
|
authorization = (
|
|
f"HMAC-SHA256 "
|
|
f"Credential={self.access_key}/{credential_scope}, "
|
|
f"SignedHeaders={signed_headers}, "
|
|
f"Signature={signature}"
|
|
)
|
|
|
|
# Build request
|
|
url = f"{BASE_URL}/?{canonical_querystring}"
|
|
req_headers = {
|
|
"Host": HOST,
|
|
"X-Date": amz_date,
|
|
"X-Content-Sha256": payload_hash,
|
|
"Content-Type": content_type,
|
|
"Authorization": authorization,
|
|
}
|
|
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=payload.encode("utf-8"),
|
|
headers=req_headers,
|
|
method=method,
|
|
)
|
|
return req
|
|
|
|
def _call(self, action, body_dict):
|
|
"""Execute a signed API call and return parsed JSON response."""
|
|
req = self._build_signed_request(action, body_dict)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
data = json.loads(resp.read().decode("utf-8"))
|
|
return data
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8", errors="replace")
|
|
logger.error("Volcengine API error: %s %s -> %s", action, e.code, body)
|
|
try:
|
|
return json.loads(body)
|
|
except json.JSONDecodeError:
|
|
return {"error": body, "code": e.code}
|
|
except Exception as e:
|
|
logger.error("Volcengine API exception: %s -> %s", action, e)
|
|
return {"error": str(e)}
|
|
|
|
# ---- Real Person Portrait Asset APIs ----
|
|
|
|
def create_visual_validate_session(self, callback_url, project_name="default"):
|
|
"""Create H5 verification page link for real person authentication."""
|
|
return self._call("CreateVisualValidateSession", {
|
|
"CallbackURL": callback_url,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def get_visual_validate_result(self, byted_token, project_name="default"):
|
|
"""Get Asset Group ID after real person authentication."""
|
|
return self._call("GetVisualValidateResult", {
|
|
"BytedToken": byted_token,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def create_asset(self, group_id, url, asset_type="Image",
|
|
name="", project_name="default"):
|
|
"""Upload a new asset (async - returns asset ID immediately)."""
|
|
body = {
|
|
"GroupId": group_id,
|
|
"URL": url,
|
|
"AssetType": asset_type,
|
|
"ProjectName": project_name,
|
|
}
|
|
if name:
|
|
body["Name"] = name
|
|
return self._call("CreateAsset", body)
|
|
|
|
def get_asset(self, asset_id, project_name="default"):
|
|
"""Get single asset info including status."""
|
|
return self._call("GetAsset", {
|
|
"Id": asset_id,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def list_assets(self, group_ids=None, statuses=None, name=None,
|
|
group_type="LivenessFace", page_number=1, page_size=10,
|
|
sort_by="CreateTime", sort_order="Desc"):
|
|
"""List assets with optional filters."""
|
|
filter_obj = {"GroupType": group_type}
|
|
if group_ids:
|
|
filter_obj["GroupIds"] = group_ids
|
|
if statuses:
|
|
filter_obj["Statuses"] = statuses
|
|
if name:
|
|
filter_obj["Name"] = name
|
|
return self._call("ListAssets", {
|
|
"Filter": filter_obj,
|
|
"PageNumber": page_number,
|
|
"PageSize": page_size,
|
|
"SortBy": sort_by,
|
|
"SortOrder": sort_order,
|
|
})
|
|
|
|
def list_asset_groups(self, name=None, group_ids=None,
|
|
group_type="LivenessFace",
|
|
page_number=1, page_size=10):
|
|
"""List asset groups with optional filters."""
|
|
filter_obj = {"GroupType": group_type}
|
|
if name:
|
|
filter_obj["Name"] = name
|
|
if group_ids:
|
|
filter_obj["GroupIds"] = group_ids
|
|
return self._call("ListAssetGroups", {
|
|
"Filter": filter_obj,
|
|
"PageNumber": page_number,
|
|
"PageSize": page_size,
|
|
})
|
|
|
|
def get_asset_group(self, group_id, project_name="default"):
|
|
"""Get single asset group info."""
|
|
return self._call("GetAssetGroup", {
|
|
"Id": group_id,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def update_asset(self, asset_id, name=None, project_name="default"):
|
|
"""Update asset info (e.g. name)."""
|
|
body = {"Id": asset_id, "ProjectName": project_name}
|
|
if name is not None:
|
|
body["Name"] = name
|
|
return self._call("UpdateAsset", body)
|
|
|
|
def update_asset_group(self, group_id, name=None, title=None,
|
|
description=None, project_name="default"):
|
|
"""Update asset group info."""
|
|
body = {"Id": group_id, "ProjectName": project_name}
|
|
if name is not None:
|
|
body["Name"] = name
|
|
if title is not None:
|
|
body["Title"] = title
|
|
if description is not None:
|
|
body["Description"] = description
|
|
return self._call("UpdateAssetGroup", body)
|
|
|
|
def delete_asset(self, asset_id, project_name="default"):
|
|
"""Delete a single asset."""
|
|
return self._call("DeleteAsset", {
|
|
"Id": asset_id,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
def delete_asset_group(self, group_id, project_name="default"):
|
|
"""Delete an asset group."""
|
|
return self._call("DeleteAssetGroup", {
|
|
"Id": group_id,
|
|
"ProjectName": project_name,
|
|
})
|
|
|
|
|
|
# ---- Provider Registry (extensible for future vendors) ----
|
|
|
|
_PROVIDERS = {
|
|
"volcengine": VolcengineArkClient,
|
|
}
|
|
|
|
|
|
def get_vendor_client(vendor, access_key, secret_key, **kwargs):
|
|
"""Factory: get API client for a given vendor."""
|
|
cls = _PROVIDERS.get(vendor)
|
|
if cls is None:
|
|
raise ValueError(f"Unsupported vendor: {vendor}. Available: {list(_PROVIDERS.keys())}")
|
|
return cls(access_key, secret_key, **kwargs)
|