yumoqing dbf8473b1b fix: remove downapp_id parameter from all APIs, use Bearer token auth
- All APIs now identify caller via Bearer Token, dapi module
  automatically provides user_id and org_id
- rl_verify.dspy: use (await get_user()) instead of downapp_id param
- rl_verify_user: rename downapp_id -> user_id param
- rl_upload_user: rename downapp_id -> user_id param
- rl_sync_asset_status_user: rename downapp_id -> user_id param
- Update docs/api_downapp.md: remove all downapp_id references,
  add Bearer token auth description, add rl_query_groups and
  rl_callback documentation
2026-05-28 17:21:02 +08:00

776 lines
27 KiB
Python

"""
reallife_asset module - Real Person Portrait Asset Management.
Supports multiple vendors (Volcengine Ark, etc.) for managing
real person portrait asset groups and assets.
"""
import json
from datetime import datetime
from traceback import format_exc
from sqlor.dbpools import DBPools
from ahserver.serverenv import ServerEnv
from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from .volcengine_client import get_vendor_client
MODULE_NAME = "reallife_asset"
def _get_dbname():
f = ServerEnv().get_module_dbname
return f(MODULE_NAME)
def _get_client(vendor, apikey, secretkey):
"""Get vendor API client."""
return get_vendor_client(vendor, apikey, secretkey)
# ============================================================
# Asset Group operations
# ============================================================
async def rl_create_validate_session(org_id, vendor, callback_url,
project_name="default",
apikey=None, secretkey=None,
user_id=None):
"""Create H5 verification session for real person auth."""
client = _get_client(vendor, apikey, secretkey)
result = client.create_visual_validate_session(callback_url, project_name)
if "error" in result:
return {"success": False, "message": result.get("error", "API调用失败")}
byted_token = result.get("BytedToken", "")
h5_link = result.get("H5Link", "")
# Save to local DB
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
gid = getID()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
await sor.I("rl_asset_group", {
"id": gid,
"org_id": org_id,
"vendor": vendor,
"name": f"待认证-{now}",
"title": f"待认证-{now}",
"group_type": "LivenessFace",
"project_name": project_name,
"status": "pending",
"byted_token": byted_token,
"h5_link": h5_link,
"callback_url": callback_url,
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
return {
"success": True,
"id": gid,
"byted_token": byted_token,
"h5_link": h5_link,
}
async def rl_check_validate_result(local_group_id, vendor,
apikey=None, secretkey=None):
"""Check real person validation result and get vendor Group ID."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "本地记录不存在"}
rec = recs[0]
byted_token = rec.byted_token
project_name = rec.project_name or "default"
client = _get_client(vendor, apikey, secretkey)
result = client.get_visual_validate_result(byted_token, project_name)
if "error" in result:
return {"success": False, "message": result.get("error", "查询失败")}
vendor_group_id = result.get("GroupId", "")
if not vendor_group_id:
return {"success": False, "message": "尚未完成认证或认证失败"}
# Update local record
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async with db.sqlorContext(dbname) as sor:
await sor.U("rl_asset_group", {
"vendor_group_id": vendor_group_id,
"status": "active",
"name": f"已认证-{vendor_group_id}",
"title": f"已认证-{vendor_group_id}",
"update_time": now,
}, {"id": local_group_id})
return {"success": True, "vendor_group_id": vendor_group_id}
async def rl_create_asset(org_id, local_group_id, source_url,
asset_type="Image", name="",
vendor=None, apikey=None, secretkey=None,
user_id=None):
"""Upload asset to vendor and create local record."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "素材组合不存在"}
grp = recs[0]
vendor = vendor or grp.vendor
vendor_group_id = grp.vendor_group_id
project_name = grp.project_name or "default"
if not vendor_group_id:
return {"success": False, "message": "素材组合尚未完成真人认证"}
client = _get_client(vendor, apikey, secretkey)
result = client.create_asset(
vendor_group_id, source_url, asset_type, name, project_name
)
vendor_asset_id = result.get("Id", "")
if not vendor_asset_id and "error" not in result:
# Try nested result structure
r = result.get("Result", {})
vendor_asset_id = r.get("Id", "")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
asset_id = getID()
asset_uri = f"asset://{vendor_asset_id}" if vendor_asset_id else ""
async with db.sqlorContext(dbname) as sor:
await sor.I("rl_asset", {
"id": asset_id,
"org_id": org_id,
"group_id": local_group_id,
"vendor": vendor,
"vendor_asset_id": vendor_asset_id,
"asset_type": asset_type,
"name": name or source_url.split("/")[-1][:50],
"status": "Processing",
"source_url": source_url,
"asset_uri": asset_uri,
"project_name": project_name,
"vendor_response": json.dumps(result, ensure_ascii=False),
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
return {
"success": "error" not in result,
"id": asset_id,
"vendor_asset_id": vendor_asset_id,
"status": "Processing",
"message": result.get("error", ""),
}
async def rl_sync_asset_status(asset_id, vendor=None,
apikey=None, secretkey=None):
"""Sync asset status from vendor."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset", {"id": asset_id})
if not recs:
return {"success": False, "message": "素材不存在"}
rec = recs[0]
vendor = vendor or rec.vendor
vendor_asset_id = rec.vendor_asset_id
project_name = rec.project_name or "default"
if not vendor_asset_id:
return {"success": False, "message": "无供应商端资产ID"}
client = _get_client(vendor, apikey, secretkey)
result = client.get_asset(vendor_asset_id, project_name)
if "error" in result:
return {"success": False, "message": result.get("error", "查询失败")}
# Extract status from result (may be nested under Result)
status = result.get("Status", "")
if not status:
r = result.get("Result", {})
status = r.get("Status", result.get("status", ""))
url = result.get("URL", result.get("Result", {}).get("URL", ""))
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async with db.sqlorContext(dbname) as sor:
upd = {
"status": status,
"update_time": now,
"vendor_response": json.dumps(result, ensure_ascii=False),
}
if url:
upd["url"] = url
await sor.U("rl_asset", upd, {"id": asset_id})
return {"success": True, "status": status, "url": url}
async def rl_delete_asset(asset_id, vendor=None,
apikey=None, secretkey=None):
"""Delete asset from vendor and local DB."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset", {"id": asset_id})
if not recs:
return {"success": False, "message": "素材不存在"}
rec = recs[0]
vendor = vendor or rec.vendor
vendor_asset_id = rec.vendor_asset_id
project_name = rec.project_name or "default"
# Delete from vendor
if vendor_asset_id:
client = _get_client(vendor, apikey, secretkey)
result = client.delete_asset(vendor_asset_id, project_name)
debug(f"vendor delete asset: {result}")
# Delete local
async with db.sqlorContext(dbname) as sor:
await sor.D("rl_asset", {"id": asset_id})
return {"success": True}
async def rl_delete_group(local_group_id, vendor=None,
apikey=None, secretkey=None):
"""Delete asset group from vendor and local DB (cascade)."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "素材组合不存在"}
rec = recs[0]
vendor = vendor or rec.vendor
vendor_group_id = rec.vendor_group_id
project_name = rec.project_name or "default"
# Delete from vendor
if vendor_group_id:
client = _get_client(vendor, apikey, secretkey)
result = client.delete_asset_group(vendor_group_id, project_name)
debug(f"vendor delete group: {result}")
# Delete local (cascade)
async with db.sqlorContext(dbname) as sor:
await sor.D("rl_asset", {"group_id": local_group_id})
await sor.D("rl_asset_group", {"id": local_group_id})
return {"success": True}
async def rl_sync_group_from_vendor(org_id, vendor,
apikey=None, secretkey=None,
project_name="default"):
"""Sync asset groups from vendor to local DB."""
client = _get_client(vendor, apikey, secretkey)
result = client.list_asset_groups(project_name=project_name)
items = result.get("Items", result.get("Result", {}).get("Items", []))
synced = 0
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
for item in items:
vgid = item.get("Id", "")
if not vgid:
continue
# Check if exists
existing = await sor.R("rl_asset_group", {
"vendor": vendor,
"vendor_group_id": vgid,
})
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if existing:
await sor.U("rl_asset_group", {
"name": item.get("Name", ""),
"title": item.get("Title", item.get("Name", "")),
"description": item.get("Description", ""),
"update_time": now,
}, {"id": existing[0].id})
else:
gid = getID()
await sor.I("rl_asset_group", {
"id": gid,
"org_id": org_id,
"vendor": vendor,
"vendor_group_id": vgid,
"name": item.get("Name", ""),
"title": item.get("Title", item.get("Name", "")),
"description": item.get("Description", ""),
"group_type": item.get("GroupType", "LivenessFace"),
"project_name": item.get("ProjectName", project_name),
"status": "active",
"create_time": item.get("CreateTime", now),
"update_time": now,
})
synced += 1
return {"success": True, "synced": synced}
async def rl_sync_assets_from_vendor(org_id, local_group_id,
vendor=None, apikey=None, secretkey=None):
"""Sync assets for a group from vendor to local DB."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "素材组合不存在"}
grp = recs[0]
vendor = vendor or grp.vendor
vendor_group_id = grp.vendor_group_id
project_name = grp.project_name or "default"
if not vendor_group_id:
return {"success": False, "message": "无供应商端组合ID"}
client = _get_client(vendor, apikey, secretkey)
result = client.list_assets(group_ids=[vendor_group_id])
items = result.get("Items", result.get("Result", {}).get("Items", []))
synced = 0
async with db.sqlorContext(dbname) as sor:
for item in items:
vaid = item.get("Id", "")
if not vaid:
continue
existing = await sor.R("rl_asset", {
"vendor": vendor,
"vendor_asset_id": vaid,
})
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if existing:
await sor.U("rl_asset", {
"status": item.get("Status", ""),
"url": item.get("URL", ""),
"name": item.get("Name", existing[0].name),
"update_time": now,
}, {"id": existing[0].id})
else:
aid = getID()
await sor.I("rl_asset", {
"id": aid,
"org_id": org_id,
"group_id": local_group_id,
"vendor": vendor,
"vendor_asset_id": vaid,
"asset_type": item.get("AssetType", "Image"),
"name": item.get("Name", ""),
"status": item.get("Status", "Processing"),
"url": item.get("URL", ""),
"asset_uri": f"asset://{vaid}",
"project_name": item.get("ProjectName", project_name),
"create_time": item.get("CreateTime", now),
"update_time": now,
})
synced += 1
return {"success": True, "synced": synced}
# ============================================================
# Downapp User API Proxies
# ============================================================
async def _get_vendor_keys(vendor="volcengine"):
"""Helper: Get vendor AK/SK from config table."""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_vendor_config", {"vendor": vendor})
if not recs:
return {"success": False, "message": "供应商配置不存在"}
rec = recs[0]
if rec.status != "active":
return {"success": False, "message": f"供应商服务已停用"}
env = ServerEnv()
ak = env.password_decode(rec.ak)
sk = env.password_decode(rec.sk)
return {"success": True, "ak": ak, "sk": sk, "callback_url": rec.callback_url}
async def rl_verify_user(org_id, user_id, project_name="default"):
"""User proxy: Verify vendor config -> Call vendor -> Save org-group mapping."""
keys = await _get_vendor_keys()
if not keys.get("success"):
return keys
# Call Vendor API
client = _get_client("volcengine", keys["ak"], keys["sk"])
callback_url = keys.get("callback_url", "")
result = client.create_visual_validate_session(callback_url, project_name)
if "error" in result:
return {"success": False, "message": result.get("error", "API调用失败")}
byted_token = result.get("BytedToken", "")
h5_link = result.get("H5Link", "")
# Save local record for checking status later
# We use rl_asset_group for temporary state tracking
gid = getID()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db = DBPools()
async with db.sqlorContext(_get_dbname()) as sor:
await sor.I("rl_asset_group", {
"id": gid,
"org_id": org_id,
"vendor": "volcengine",
"name": f"待认证-{user_id}",
"title": f"待认证",
"group_type": "LivenessFace",
"project_name": project_name,
"status": "pending",
"byted_token": byted_token,
"h5_link": h5_link,
"callback_url": callback_url,
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
return {
"success": True,
"id": gid,
"byted_token": byted_token,
"h5_link": h5_link,
}
async def rl_check_validate_and_map(local_group_id, project_name="default"):
"""Internal: Check validation result and update rl_org_group mapping."""
keys = await _get_vendor_keys()
if not keys.get("success"):
return keys
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"id": local_group_id})
if not recs:
return {"success": False, "message": "本地记录不存在"}
rec = recs[0]
byted_token = rec.byted_token
org_id = rec.org_id
# Call Vendor
client = _get_client("volcengine", keys["ak"], keys["sk"])
result = client.get_visual_validate_result(byted_token, project_name)
if "error" in result:
return {"success": False, "message": result.get("error", "查询失败")}
vendor_group_id = result.get("GroupId", "")
if not vendor_group_id:
return {"success": False, "message": "尚未完成认证或认证失败"}
# Update mapping table rl_org_group
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
mapping_id = getID()
async with db.sqlorContext(dbname) as sor:
# Check if mapping exists
existing = await sor.R("rl_org_group", {"org_id": org_id, "vendor": "volcengine"})
if existing:
await sor.U("rl_org_group", {
"vendor_group_id": vendor_group_id,
"local_group_id": local_group_id,
"update_time": now # add update_time field if needed, or just update
}, {"id": existing[0].id})
else:
await sor.I("rl_org_group", {
"id": mapping_id,
"org_id": org_id,
"vendor": "volcengine",
"vendor_group_id": vendor_group_id,
"local_group_id": local_group_id,
"status": "active",
"create_time": now
})
# Update rl_asset_group status
async with db.sqlorContext(dbname) as sor:
await sor.U("rl_asset_group", {
"vendor_group_id": vendor_group_id,
"status": "active",
"update_time": now,
}, {"id": local_group_id})
return {"success": True, "vendor_group_id": vendor_group_id}
async def rl_upload_user(org_id, group_id, source_url, asset_type, name, user_id):
"""User proxy: Validate Org-Group mapping -> Get Keys -> Upload."""
# 1. Validate Group Ownership
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
# Check rl_org_group
recs = await sor.R("rl_org_group", {"org_id": org_id}) # Get all groups for org
valid = False
vendor_group_id = ""
for r in recs:
if r.local_group_id == group_id:
valid = True
vendor_group_id = r.vendor_group_id
break
if not valid:
return {"success": False, "message": "无效的素材组合ID或无权访问"}
# 2. Get Keys
keys = await _get_vendor_keys()
if not keys.get("success"):
return keys
# 3. Upload
client = _get_client("volcengine", keys["ak"], keys["sk"])
result = client.create_asset(
vendor_group_id, source_url, asset_type, name
)
vendor_asset_id = result.get("Id", "")
if not vendor_asset_id and "error" not in result:
r = result.get("Result", {})
vendor_asset_id = r.get("Id", "")
# 4. Save Asset Record
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
asset_id = getID()
asset_uri = f"asset://{vendor_asset_id}" if vendor_asset_id else ""
async with db.sqlorContext(dbname) as sor:
await sor.I("rl_asset", {
"id": asset_id,
"org_id": org_id,
"group_id": group_id,
"vendor": "volcengine",
"vendor_asset_id": vendor_asset_id,
"asset_type": asset_type,
"name": name or source_url.split("/")[-1][:50],
"status": "Processing",
"source_url": source_url,
"asset_uri": asset_uri,
"project_name": "default",
"vendor_response": json.dumps(result, ensure_ascii=False),
"created_by": user_id or "",
"create_time": now,
"update_time": now,
})
return {
"success": "error" not in result,
"id": asset_id,
"vendor_asset_id": vendor_asset_id,
"status": "Processing",
"message": result.get("error", ""),
}
async def rl_sync_asset_status_user(org_id, asset_id, user_id):
"""User proxy: Validate ownership -> Get Keys -> Sync Status."""
dbname = _get_dbname()
db = DBPools()
# Validate asset belongs to org
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset", {"id": asset_id, "org_id": org_id})
if not recs:
return {"success": False, "message": "素材不存在或无权访问"}
rec = recs[0]
vendor_asset_id = rec.vendor_asset_id
keys = await _get_vendor_keys()
if not keys.get("success"):
return keys
# Sync with vendor
client = _get_client("volcengine", keys["ak"], keys["sk"])
result = client.get_asset(vendor_asset_id)
if "error" in result:
return {"success": False, "message": result.get("error", "查询失败")}
status = result.get("Status", "")
if not status:
r = result.get("Result", {})
status = r.get("Status", result.get("status", ""))
url = result.get("URL", result.get("Result", {}).get("URL", ""))
# Update local
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
async with db.sqlorContext(dbname) as sor:
upd = {"status": status, "update_time": now, "vendor_response": json.dumps(result, ensure_ascii=False)}
if url: upd["url"] = url
await sor.U("rl_asset", upd, {"id": asset_id})
return {"success": True, "status": status, "url": url}
async def rl_handle_callback(byted_token, project_name="default"):
"""
Callback handler: Volcengine POSTs here after H5 auth completes.
Looks up local group by byted_token, queries vendor for result,
then registers rl_org_group mapping.
"""
dbname = _get_dbname()
db = DBPools()
# 1. Find local group by byted_token
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_asset_group", {"byted_token": byted_token})
if not recs:
return {"success": False, "message": "未找到对应的认证会话"}
rec = recs[0]
local_group_id = rec.id
org_id = rec.org_id
# Already processed?
if rec.status == "active" and rec.vendor_group_id:
debug(f"callback already processed for group {local_group_id}")
return {
"success": True,
"local_group_id": local_group_id,
"vendor_group_id": rec.vendor_group_id,
"message": "已处理",
}
# 2. Get vendor keys
async with db.sqlorContext(dbname) as sor:
vrecs = await sor.R("rl_vendor_config", {"vendor": "volcengine"})
if not vrecs:
return {"success": False, "message": "供应商配置不存在"}
vrec = vrecs[0]
if vrec.status != "active":
return {"success": False, "message": "供应商服务已停用"}
env = ServerEnv()
ak = env.password_decode(vrec.ak)
sk = env.password_decode(vrec.sk)
# 3. Query vendor for result
client = _get_client("volcengine", ak, sk)
result = client.get_visual_validate_result(byted_token, project_name)
if "error" in result:
error(f"callback query vendor error: {result}")
return {"success": False, "message": result.get("error", "查询失败")}
vendor_group_id = result.get("GroupId", "")
if not vendor_group_id:
return {"success": False, "message": "尚未完成认证或认证失败"}
# 4. Register rl_org_group mapping
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
mapping_id = getID()
async with db.sqlorContext(dbname) as sor:
existing = await sor.R("rl_org_group", {
"org_id": org_id,
"vendor": "volcengine",
})
if existing:
await sor.U("rl_org_group", {
"vendor_group_id": vendor_group_id,
"local_group_id": local_group_id,
"update_time": now,
}, {"id": existing[0].id})
else:
await sor.I("rl_org_group", {
"id": mapping_id,
"org_id": org_id,
"vendor": "volcengine",
"vendor_group_id": vendor_group_id,
"local_group_id": local_group_id,
"status": "active",
"create_time": now,
})
# 5. Update rl_asset_group status
async with db.sqlorContext(dbname) as sor:
await sor.U("rl_asset_group", {
"vendor_group_id": vendor_group_id,
"status": "active",
"update_time": now,
}, {"id": local_group_id})
debug(f"callback processed: org={org_id}, group={local_group_id}, "
f"vendor_group={vendor_group_id}")
return {
"success": True,
"local_group_id": local_group_id,
"vendor_group_id": vendor_group_id,
}
async def rl_query_groups(org_id):
"""
Client API: Query authenticated group_ids for an org.
Returns list of (local_group_id, vendor_group_id, status) mappings.
"""
dbname = _get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R("rl_org_group", {"org_id": org_id})
if not recs:
return {"success": True, "groups": []}
groups = []
for r in recs:
groups.append({
"local_group_id": r.local_group_id,
"vendor_group_id": r.vendor_group_id,
"vendor": r.vendor,
"status": r.status,
"create_time": getattr(r, "create_time", ""),
})
return {"success": True, "groups": groups}
# ============================================================
# Module loader
# ============================================================
def load_reallife_asset():
"""Register all functions with ServerEnv."""
g = ServerEnv()
g.rl_create_validate_session = rl_create_validate_session
g.rl_check_validate_result = rl_check_validate_result
g.rl_create_asset = rl_create_asset
g.rl_sync_asset_status = rl_sync_asset_status
g.rl_delete_asset = rl_delete_asset
g.rl_delete_group = rl_delete_group
g.rl_sync_group_from_vendor = rl_sync_group_from_vendor
g.rl_sync_assets_from_vendor = rl_sync_assets_from_vendor
# Downapp user APIs
g.rl_verify_user = rl_verify_user
g.rl_check_validate_and_map = rl_check_validate_and_map
g.rl_upload_user = rl_upload_user
g.rl_sync_asset_status_user = rl_sync_asset_status_user
g.rl_handle_callback = rl_handle_callback
g.rl_query_groups = rl_query_groups
return True