726 lines
25 KiB
Python

"""
reallife_asset module - Real Person Portrait Asset Management.
Supports multiple vendors via direct API calls with V4 signing.
Vendor API routing: rl_vendor_config.api_mapping JSON maps internal
operations to API action names. AK/SK stored in rl_vendor_config.
"""
import json
from datetime import datetime
from traceback import format_exc
from sqlor.dbpools import DBPools
from ahserver.serverenv import ServerEnv
from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from .rl_volcengine_client import call_volcengine_api
MODULE_NAME = "reallife_asset"
def _get_dbname():
f = ServerEnv().get_module_dbname
return f(MODULE_NAME)
async def _get_vendor_config(vendor):
"""Look up vendor config: api_mapping + callback_url from rl_vendor_config."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_vendor_config", {"vendor": vendor})
if not recs:
return {"success": False, "message": f"供应商配置不存在: {vendor}"}
rec = recs[0]
if rec.status != "active":
return {"success": False, "message": f"供应商服务已停用: {vendor}"}
try:
api_mapping = json.loads(rec.api_mapping) if rec.api_mapping else {}
except (json.JSONDecodeError, TypeError):
api_mapping = {}
return {
"success": True,
"api_mapping": api_mapping,
"callback_url": getattr(rec, "callback_url", ""),
}
async def _call_vendor(vendor, operation, params={}):
"""
Call vendor API with V4 signing.
Reads AK/SK and api_mapping from rl_vendor_config.
Returns parsed dict from response.
"""
cfg = await _get_vendor_config(vendor)
if not cfg.get("success"):
return cfg
api_mapping = cfg["api_mapping"]
try:
result = await call_volcengine_api(vendor, operation, params, api_mapping)
return result
except Exception as e:
error(f"_call_vendor {vendor}/{operation} error: {e}\n{format_exc()}")
return {"error": str(e)}
# ============================================================
# Asset Group operations (admin-side, uses vendor config)
# ============================================================
async def rl_create_validate_session(org_id, vendor, callback_url,
project_name="default",
user_id=None):
"""Create H5 verification session for real person auth."""
params = {
"CallbackURL": callback_url or "",
"ProjectName": project_name,
}
result = await _call_vendor(vendor, "create_session", params)
if "error" in result or "Error" in result:
return {"success": False, "message": result.get("error", result.get("Message", "API调用失败"))}
byted_token = result.get("BytedToken", result.get("Result", {}).get("BytedToken", ""))
h5_link = result.get("H5Link", result.get("Result", {}).get("H5Link", ""))
# Save to local DB
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
gid = getID()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await sor.C("rl_asset_group", {
"id": gid,
"org_id": org_id,
"vendor": vendor,
"name": f"待认证-{now}",
"title": f"待认证-{now}",
"group_type": "LivenessFace",
"project_name": project_name,
"status": "pending",
"byted_token": byted_token,
"h5_link": h5_link,
"callback_url": callback_url or "",
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
return {
"success": True,
"id": gid,
"byted_token": byted_token,
"h5_link": h5_link,
}
async def rl_check_validate_result(local_group_id, vendor, project_name="default"):
"""Check real person validation result and get vendor Group ID."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "本地记录不存在"}
rec = recs[0]
byted_token = rec.byted_token
params = {
"BytedToken": byted_token,
"ProjectName": project_name,
}
result = await _call_vendor(vendor, "check_session", params)
if "error" in result or "Error" in result:
return {"success": False, "message": result.get("error", result.get("Message", "查询失败"))}
r = result.get("Result", result)
vendor_group_id = r.get("GroupId", result.get("GroupId", ""))
if not vendor_group_id:
return {"success": False, "message": "尚未完成认证或认证失败"}
# Update local record
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async with db.sqlorContext(dbname) as sor:
await sor.U("rl_asset_group", {
"id": local_group_id,
"vendor_group_id": vendor_group_id,
"status": "active",
"name": f"已认证-{vendor_group_id}",
"title": f"已认证-{vendor_group_id}",
"update_time": now,
})
return {"success": True, "vendor_group_id": vendor_group_id}
async def rl_create_asset(org_id, vendor_group_id, source_url,
asset_type="Image", name="",
vendor=None, project_name="default",
user_id=None):
"""Upload asset to vendor and create local record."""
dbname = _get_dbname()
db = DBPools()
# Find the group to get vendor and local_group_id
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"vendor_group_id": vendor_group_id})
if not recs:
return {"success": False, "message": "素材组合不存在"}
grp = recs[0]
vendor = vendor or grp.vendor
local_group_id = grp.id
project_name = grp.project_name or project_name
# Call vendor API
params = {
"GroupId": vendor_group_id,
"URL": source_url,
"AssetType": asset_type,
"ProjectName": project_name,
}
if name:
params["Name"] = name
result = await _call_vendor(vendor, "upload_asset", params)
vendor_asset_id = result.get("Id", result.get("Result", {}).get("Id", ""))
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
asset_id = getID()
asset_uri = f"asset://{vendor_asset_id}" if vendor_asset_id else ""
async with db.sqlorContext(dbname) as sor:
await sor.C("rl_asset", {
"id": asset_id,
"org_id": org_id,
"group_id": local_group_id,
"vendor": vendor,
"vendor_asset_id": vendor_asset_id,
"asset_type": asset_type,
"name": name or source_url.split("/")[-1][:50],
"status": "Processing",
"source_url": source_url,
"asset_uri": asset_uri,
"project_name": project_name,
"vendor_response": json.dumps(result, ensure_ascii=False),
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
has_error = "error" in result or "Error" in result
return {
"success": not has_error,
"id": asset_id,
"vendor_asset_id": vendor_asset_id,
"status": "Processing",
"message": result.get("error", result.get("Message", "")),
}
async def rl_sync_asset_status(asset_id):
"""Sync asset status from vendor."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset", {"id": asset_id})
if not recs:
return {"success": False, "message": "素材不存在"}
rec = recs[0]
vendor = rec.vendor
vendor_asset_id = rec.vendor_asset_id
project_name = rec.project_name or "default"
if not vendor_asset_id:
return {"success": False, "message": "无供应商端资产ID"}
params = {
"Id": vendor_asset_id,
"ProjectName": project_name,
}
result = await _call_vendor(vendor, "get_asset", params)
if "error" in result or "Error" in result:
return {"success": False, "message": result.get("error", result.get("Message", "查询失败"))}
r = result.get("Result", result)
status = r.get("Status", result.get("Status", ""))
url = r.get("URL", result.get("URL", ""))
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async with db.sqlorContext(dbname) as sor:
upd = {
"id": asset_id,
"status": status,
"update_time": now,
"vendor_response": json.dumps(result, ensure_ascii=False),
}
if url:
upd["url"] = url
await sor.U("rl_asset", upd)
return {"success": True, "status": status, "url": url}
async def rl_delete_asset(asset_id):
"""Delete asset from vendor and local DB."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset", {"id": asset_id})
if not recs:
return {"success": False, "message": "素材不存在"}
rec = recs[0]
vendor = rec.vendor
vendor_asset_id = rec.vendor_asset_id
project_name = rec.project_name or "default"
# Delete from vendor
if vendor_asset_id:
params = {"Id": vendor_asset_id, "ProjectName": project_name}
await _call_vendor(vendor, "delete_asset", params)
# Delete local
async with db.sqlorContext(dbname) as sor:
await sor.D("rl_asset", {"id": asset_id})
return {"success": True}
async def rl_delete_group(local_group_id):
"""Delete asset group from vendor and local DB (cascade)."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "素材组合不存在"}
rec = recs[0]
vendor = rec.vendor
vendor_group_id = rec.vendor_group_id
project_name = rec.project_name or "default"
# Delete from vendor
if vendor_group_id:
params = {"Id": vendor_group_id, "ProjectName": project_name}
await _call_vendor(vendor, "delete_group", params)
# Delete local (cascade)
async with db.sqlorContext(dbname) as sor:
await sor.D("rl_asset", {"group_id": local_group_id})
await sor.D("rl_asset_group", {"id": local_group_id})
return {"success": True}
async def rl_sync_group_from_vendor(org_id, vendor, project_name="default"):
"""Sync asset groups from vendor to local DB."""
params = {
"Filter": {"GroupType": "LivenessFace"},
"PageNumber": 1,
"PageSize": 100,
}
result = await _call_vendor(vendor, "list_groups", params)
items = result.get("Items", result.get("Result", {}).get("Items", []))
synced = 0
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
for item in items:
vgid = item.get("Id", "")
if not vgid:
continue
existing = await sor.R("rl_asset_group", {
"vendor": vendor,
"vendor_group_id": vgid,
})
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if existing:
await sor.U("rl_asset_group", {
"id": existing[0].id,
"name": item.get("Name", ""),
"title": item.get("Title", item.get("Name", "")),
"description": item.get("Description", ""),
"update_time": now,
})
else:
gid = getID()
await sor.C("rl_asset_group", {
"id": gid,
"org_id": org_id,
"vendor": vendor,
"vendor_group_id": vgid,
"name": item.get("Name", ""),
"title": item.get("Title", item.get("Name", "")),
"description": item.get("Description", ""),
"group_type": item.get("GroupType", "LivenessFace"),
"project_name": item.get("ProjectName", project_name),
"status": "active",
"create_time": item.get("CreateTime", now),
"update_time": now,
})
synced += 1
return {"success": True, "synced": synced}
async def rl_sync_assets_from_vendor(org_id, local_group_id):
"""Sync assets for a group from vendor to local DB."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "素材组合不存在"}
grp = recs[0]
vendor = grp.vendor
vendor_group_id = grp.vendor_group_id
project_name = grp.project_name or "default"
if not vendor_group_id:
return {"success": False, "message": "无供应商端组合ID"}
params = {
"Filter": {"GroupType": "LivenessFace", "GroupIds": [vendor_group_id]},
"PageNumber": 1,
"PageSize": 100,
}
result = await _call_vendor(vendor, "list_assets", params)
items = result.get("Items", result.get("Result", {}).get("Items", []))
synced = 0
async with db.sqlorContext(dbname) as sor:
for item in items:
vaid = item.get("Id", "")
if not vaid:
continue
existing = await sor.R("rl_asset", {
"vendor": vendor,
"vendor_asset_id": vaid,
})
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if existing:
await sor.U("rl_asset", {
"id": existing[0].id,
"status": item.get("Status", ""),
"url": item.get("URL", ""),
"name": item.get("Name", existing[0].name),
"update_time": now,
})
else:
aid = getID()
await sor.C("rl_asset", {
"id": aid,
"org_id": org_id,
"group_id": local_group_id,
"vendor": vendor,
"vendor_asset_id": vaid,
"asset_type": item.get("AssetType", "Image"),
"name": item.get("Name", ""),
"status": item.get("Status", "Processing"),
"url": item.get("URL", ""),
"asset_uri": f"asset://{vaid}",
"project_name": item.get("ProjectName", project_name),
"create_time": item.get("CreateTime", now),
"update_time": now,
})
synced += 1
return {"success": True, "synced": synced}
# ============================================================
# Downapp User API Proxies (client-facing via dapi Bearer auth)
# ============================================================
async def rl_verify_user(org_id, user_id, vendor, project_name="default", name=""):
"""User proxy: Create H5 verification session via uapi gateway."""
callback_cfg = await _get_vendor_config(vendor)
if not callback_cfg.get("success"):
return callback_cfg
callback_url = callback_cfg.get("callback_url", "")
params = {
"CallbackURL": callback_url,
"ProjectName": project_name,
}
result = await _call_vendor(vendor, "create_session", params)
if "error" in result or "Error" in result:
return {"success": False, "message": result.get("error", result.get("Message", "API调用失败"))}
byted_token = result.get("BytedToken", result.get("Result", {}).get("BytedToken", ""))
h5_link = result.get("H5Link", result.get("Result", {}).get("H5Link", ""))
display_name = name or f"待认证-{user_id}"
gid = getID()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db = DBPools()
async with db.sqlorContext(_get_dbname()) as sor:
await sor.C("rl_asset_group", {
"id": gid,
"org_id": org_id,
"vendor": vendor,
"name": display_name,
"title": display_name,
"group_type": "LivenessFace",
"project_name": project_name,
"status": "pending",
"byted_token": byted_token,
"h5_link": h5_link,
"callback_url": callback_url,
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
return {
"success": True,
"id": gid,
"byted_token": byted_token,
"h5_link": h5_link,
}
async def rl_upload_user(org_id, vendor_group_id, source_url, asset_type, name, user_id):
"""User proxy: Validate Org-Group mapping -> Upload via uapi."""
dbname = _get_dbname()
db = DBPools()
# 1. Validate: vendor_group_id belongs to this org
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_org_group", {
"org_id": org_id,
"vendor_group_id": vendor_group_id,
})
if not recs:
return {"success": False, "message": "无效的素材组合ID或无权访问"}
local_group_id = recs[0].local_group_id
vendor = recs[0].vendor
# 2. Upload via uapi
params = {
"GroupId": vendor_group_id,
"URL": source_url,
"AssetType": asset_type,
"ProjectName": "default",
}
if name:
params["Name"] = name
result = await _call_vendor(vendor, "upload_asset", params)
vendor_asset_id = result.get("Id", result.get("Result", {}).get("Id", ""))
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
asset_id = getID()
asset_uri = f"asset://{vendor_asset_id}" if vendor_asset_id else ""
async with db.sqlorContext(dbname) as sor:
await sor.C("rl_asset", {
"id": asset_id,
"org_id": org_id,
"group_id": local_group_id,
"vendor": vendor,
"vendor_asset_id": vendor_asset_id,
"asset_type": asset_type,
"name": name or source_url.split("/")[-1][:50],
"status": "Processing",
"source_url": source_url,
"asset_uri": asset_uri,
"project_name": "default",
"vendor_response": json.dumps(result, ensure_ascii=False),
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
has_error = "error" in result or "Error" in result
return {
"success": not has_error,
"id": asset_id,
"vendor_asset_id": vendor_asset_id,
"status": "Processing",
"message": result.get("error", result.get("Message", "")),
}
async def rl_sync_asset_status_user(org_id, asset_id, user_id):
"""User proxy: Validate ownership -> Sync Status via uapi."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset", {"id": asset_id, "org_id": org_id})
if not recs:
return {"success": False, "message": "素材不存在或无权访问"}
rec = recs[0]
vendor = rec.vendor
vendor_asset_id = rec.vendor_asset_id
project_name = rec.project_name or "default"
params = {
"Id": vendor_asset_id,
"ProjectName": project_name,
}
result = await _call_vendor(vendor, "get_asset", params)
if "error" in result or "Error" in result:
return {"success": False, "message": result.get("error", result.get("Message", "查询失败"))}
r = result.get("Result", result)
status = r.get("Status", result.get("Status", ""))
url = r.get("URL", result.get("URL", ""))
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async with db.sqlorContext(dbname) as sor:
upd = {"id": asset_id, "status": status, "update_time": now, "vendor_response": json.dumps(result, ensure_ascii=False)}
if url:
upd["url"] = url
await sor.U("rl_asset", upd)
return {"success": True, "status": status, "url": url}
async def rl_handle_callback(byted_token, project_name="default"):
"""
Callback handler: vendor POSTs here after H5 auth completes.
Looks up local group by byted_token (which includes vendor field),
queries vendor for result, then registers rl_org_group mapping.
"""
dbname = _get_dbname()
db = DBPools()
# 1. Find local group by byted_token — vendor comes from this record
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"byted_token": byted_token})
if not recs:
return {"success": False, "message": "未找到对应的认证会话"}
rec = recs[0]
local_group_id = rec.id
org_id = rec.org_id
vendor = rec.vendor
group_name = getattr(rec, "name", "") or getattr(rec, "title", "")
if rec.status == "active" and rec.vendor_group_id:
debug(f"callback already processed for group {local_group_id}")
return {
"success": True,
"local_group_id": local_group_id,
"vendor_group_id": rec.vendor_group_id,
"message": "已处理",
}
# 2. Query vendor for result via uapi
params = {
"BytedToken": byted_token,
"ProjectName": project_name,
}
result = await _call_vendor(vendor, "check_session", params)
if "error" in result or "Error" in result:
error(f"callback query vendor error: {result}")
return {"success": False, "message": result.get("error", result.get("Message", "查询失败"))}
r = result.get("Result", result)
vendor_group_id = r.get("GroupId", result.get("GroupId", ""))
if not vendor_group_id:
return {"success": False, "message": "尚未完成认证或认证失败"}
# 3. Register rl_org_group mapping
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
mapping_id = getID()
async with db.sqlorContext(dbname) as sor:
existing = await sor.R("rl_org_group", {
"org_id": org_id,
"vendor": vendor,
})
if existing:
await sor.U("rl_org_group", {
"id": existing[0].id,
"vendor_group_id": vendor_group_id,
"local_group_id": local_group_id,
"name": group_name,
"update_time": now,
})
else:
await sor.C("rl_org_group", {
"id": mapping_id,
"org_id": org_id,
"vendor": vendor,
"vendor_group_id": vendor_group_id,
"local_group_id": local_group_id,
"name": group_name,
"status": "active",
"create_time": now,
})
# 4. Update rl_asset_group status
async with db.sqlorContext(dbname) as sor:
await sor.U("rl_asset_group", {
"id": local_group_id,
"vendor_group_id": vendor_group_id,
"status": "active",
"update_time": now,
})
debug(f"callback processed: org={org_id}, group={local_group_id}, "
f"vendor_group={vendor_group_id}")
return {
"success": True,
"vendor_group_id": vendor_group_id,
}
async def rl_query_groups(org_id):
"""
Client API: Query authenticated vendor_group_ids for an org.
"""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_org_group", {"org_id": org_id})
if not recs:
return {"success": True, "groups": []}
groups = []
for r in recs:
groups.append({
"vendor_group_id": r.vendor_group_id,
"vendor": r.vendor,
"name": getattr(r, "name", ""),
"status": r.status,
"create_time": getattr(r, "create_time", ""),
})
return {"success": True, "groups": groups}
# ============================================================
# Module loader
# ============================================================
def load_reallife_asset():
"""Register all functions with ServerEnv."""
g = ServerEnv()
g.rl_create_validate_session = rl_create_validate_session
g.rl_check_validate_result = rl_check_validate_result
g.rl_create_asset = rl_create_asset
g.rl_sync_asset_status = rl_sync_asset_status
g.rl_delete_asset = rl_delete_asset
g.rl_delete_group = rl_delete_group
g.rl_sync_group_from_vendor = rl_sync_group_from_vendor
g.rl_sync_assets_from_vendor = rl_sync_assets_from_vendor
# Downapp user APIs
g.rl_verify_user = rl_verify_user
g.rl_upload_user = rl_upload_user
g.rl_sync_asset_status_user = rl_sync_asset_status_user
g.rl_handle_callback = rl_handle_callback
g.rl_query_groups = rl_query_groups
return True