""" reallife_asset module - Real Person Portrait Asset Management. Supports multiple vendors (Volcengine Ark, etc.) for managing real person portrait asset groups and assets. """ import json from datetime import datetime from traceback import format_exc from sqlor.dbpools import DBPools from ahserver.serverenv import ServerEnv from appPublic.log import debug, exception, error from appPublic.uniqueID import getID from appPublic.dictObject import DictObject from .volcengine_client import get_vendor_client MODULE_NAME = "reallife_asset" def _get_dbname(): f = ServerEnv().get_module_dbname return f(MODULE_NAME) def _get_client(vendor, apikey, secretkey): """Get vendor API client.""" return get_vendor_client(vendor, apikey, secretkey) # ============================================================ # Asset Group operations # ============================================================ async def rl_create_validate_session(org_id, vendor, callback_url, project_name="default", apikey=None, secretkey=None, user_id=None): """Create H5 verification session for real person auth.""" client = _get_client(vendor, apikey, secretkey) result = client.create_visual_validate_session(callback_url, project_name) if "error" in result: return {"success": False, "message": result.get("error", "API调用失败")} byted_token = result.get("BytedToken", "") h5_link = result.get("H5Link", "") # Save to local DB dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: gid = getID() now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") await sor.I("rl_asset_group", { "id": gid, "org_id": org_id, "vendor": vendor, "name": f"待认证-{now}", "title": f"待认证-{now}", "group_type": "LivenessFace", "project_name": project_name, "status": "pending", "byted_token": byted_token, "h5_link": h5_link, "callback_url": callback_url, "created_by": user_id or "", "create_time": now, "update_time": now, }) return { "success": True, "id": gid, "byted_token": byted_token, "h5_link": h5_link, } async def rl_check_validate_result(local_group_id, vendor, apikey=None, secretkey=None): """Check real person validation result and get vendor Group ID.""" dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: recs = await sor.R("rl_asset_group", {"id": local_group_id}) if not recs: return {"success": False, "message": "本地记录不存在"} rec = recs[0] byted_token = rec.byted_token project_name = rec.project_name or "default" client = _get_client(vendor, apikey, secretkey) result = client.get_visual_validate_result(byted_token, project_name) if "error" in result: return {"success": False, "message": result.get("error", "查询失败")} vendor_group_id = result.get("GroupId", "") if not vendor_group_id: return {"success": False, "message": "尚未完成认证或认证失败"} # Update local record now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") async with db.sqlorContext(dbname) as sor: await sor.U("rl_asset_group", { "vendor_group_id": vendor_group_id, "status": "active", "name": f"已认证-{vendor_group_id}", "title": f"已认证-{vendor_group_id}", "update_time": now, }, {"id": local_group_id}) return {"success": True, "vendor_group_id": vendor_group_id} async def rl_create_asset(org_id, local_group_id, source_url, asset_type="Image", name="", vendor=None, apikey=None, secretkey=None, user_id=None): """Upload asset to vendor and create local record.""" dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: recs = await sor.R("rl_asset_group", {"id": local_group_id}) if not recs: return {"success": False, "message": "素材组合不存在"} grp = recs[0] vendor = vendor or grp.vendor vendor_group_id = grp.vendor_group_id project_name = grp.project_name or "default" if not vendor_group_id: return {"success": False, "message": "素材组合尚未完成真人认证"} client = _get_client(vendor, apikey, secretkey) result = client.create_asset( vendor_group_id, source_url, asset_type, name, project_name ) vendor_asset_id = result.get("Id", "") if not vendor_asset_id and "error" not in result: # Try nested result structure r = result.get("Result", {}) vendor_asset_id = r.get("Id", "") now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") asset_id = getID() asset_uri = f"asset://{vendor_asset_id}" if vendor_asset_id else "" async with db.sqlorContext(dbname) as sor: await sor.I("rl_asset", { "id": asset_id, "org_id": org_id, "group_id": local_group_id, "vendor": vendor, "vendor_asset_id": vendor_asset_id, "asset_type": asset_type, "name": name or source_url.split("/")[-1][:50], "status": "Processing", "source_url": source_url, "asset_uri": asset_uri, "project_name": project_name, "vendor_response": json.dumps(result, ensure_ascii=False), "created_by": user_id or "", "create_time": now, "update_time": now, }) return { "success": "error" not in result, "id": asset_id, "vendor_asset_id": vendor_asset_id, "status": "Processing", "message": result.get("error", ""), } async def rl_sync_asset_status(asset_id, vendor=None, apikey=None, secretkey=None): """Sync asset status from vendor.""" dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: recs = await sor.R("rl_asset", {"id": asset_id}) if not recs: return {"success": False, "message": "素材不存在"} rec = recs[0] vendor = vendor or rec.vendor vendor_asset_id = rec.vendor_asset_id project_name = rec.project_name or "default" if not vendor_asset_id: return {"success": False, "message": "无供应商端资产ID"} client = _get_client(vendor, apikey, secretkey) result = client.get_asset(vendor_asset_id, project_name) if "error" in result: return {"success": False, "message": result.get("error", "查询失败")} # Extract status from result (may be nested under Result) status = result.get("Status", "") if not status: r = result.get("Result", {}) status = r.get("Status", result.get("status", "")) url = result.get("URL", result.get("Result", {}).get("URL", "")) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") async with db.sqlorContext(dbname) as sor: upd = { "status": status, "update_time": now, "vendor_response": json.dumps(result, ensure_ascii=False), } if url: upd["url"] = url await sor.U("rl_asset", upd, {"id": asset_id}) return {"success": True, "status": status, "url": url} async def rl_delete_asset(asset_id, vendor=None, apikey=None, secretkey=None): """Delete asset from vendor and local DB.""" dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: recs = await sor.R("rl_asset", {"id": asset_id}) if not recs: return {"success": False, "message": "素材不存在"} rec = recs[0] vendor = vendor or rec.vendor vendor_asset_id = rec.vendor_asset_id project_name = rec.project_name or "default" # Delete from vendor if vendor_asset_id: client = _get_client(vendor, apikey, secretkey) result = client.delete_asset(vendor_asset_id, project_name) debug(f"vendor delete asset: {result}") # Delete local async with db.sqlorContext(dbname) as sor: await sor.D("rl_asset", {"id": asset_id}) return {"success": True} async def rl_delete_group(local_group_id, vendor=None, apikey=None, secretkey=None): """Delete asset group from vendor and local DB (cascade).""" dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: recs = await sor.R("rl_asset_group", {"id": local_group_id}) if not recs: return {"success": False, "message": "素材组合不存在"} rec = recs[0] vendor = vendor or rec.vendor vendor_group_id = rec.vendor_group_id project_name = rec.project_name or "default" # Delete from vendor if vendor_group_id: client = _get_client(vendor, apikey, secretkey) result = client.delete_asset_group(vendor_group_id, project_name) debug(f"vendor delete group: {result}") # Delete local (cascade) async with db.sqlorContext(dbname) as sor: await sor.D("rl_asset", {"group_id": local_group_id}) await sor.D("rl_asset_group", {"id": local_group_id}) return {"success": True} async def rl_sync_group_from_vendor(org_id, vendor, apikey=None, secretkey=None, project_name="default"): """Sync asset groups from vendor to local DB.""" client = _get_client(vendor, apikey, secretkey) result = client.list_asset_groups(project_name=project_name) items = result.get("Items", result.get("Result", {}).get("Items", [])) synced = 0 dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: for item in items: vgid = item.get("Id", "") if not vgid: continue # Check if exists existing = await sor.R("rl_asset_group", { "vendor": vendor, "vendor_group_id": vgid, }) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if existing: await sor.U("rl_asset_group", { "name": item.get("Name", ""), "title": item.get("Title", item.get("Name", "")), "description": item.get("Description", ""), "update_time": now, }, {"id": existing[0].id}) else: gid = getID() await sor.I("rl_asset_group", { "id": gid, "org_id": org_id, "vendor": vendor, "vendor_group_id": vgid, "name": item.get("Name", ""), "title": item.get("Title", item.get("Name", "")), "description": item.get("Description", ""), "group_type": item.get("GroupType", "LivenessFace"), "project_name": item.get("ProjectName", project_name), "status": "active", "create_time": item.get("CreateTime", now), "update_time": now, }) synced += 1 return {"success": True, "synced": synced} async def rl_sync_assets_from_vendor(org_id, local_group_id, vendor=None, apikey=None, secretkey=None): """Sync assets for a group from vendor to local DB.""" dbname = _get_dbname() db = DBPools() async with db.sqlorContext(dbname) as sor: recs = await sor.R("rl_asset_group", {"id": local_group_id}) if not recs: return {"success": False, "message": "素材组合不存在"} grp = recs[0] vendor = vendor or grp.vendor vendor_group_id = grp.vendor_group_id project_name = grp.project_name or "default" if not vendor_group_id: return {"success": False, "message": "无供应商端组合ID"} client = _get_client(vendor, apikey, secretkey) result = client.list_assets(group_ids=[vendor_group_id]) items = result.get("Items", result.get("Result", {}).get("Items", [])) synced = 0 async with db.sqlorContext(dbname) as sor: for item in items: vaid = item.get("Id", "") if not vaid: continue existing = await sor.R("rl_asset", { "vendor": vendor, "vendor_asset_id": vaid, }) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if existing: await sor.U("rl_asset", { "status": item.get("Status", ""), "url": item.get("URL", ""), "name": item.get("Name", existing[0].name), "update_time": now, }, {"id": existing[0].id}) else: aid = getID() await sor.I("rl_asset", { "id": aid, "org_id": org_id, "group_id": local_group_id, "vendor": vendor, "vendor_asset_id": vaid, "asset_type": item.get("AssetType", "Image"), "name": item.get("Name", ""), "status": item.get("Status", "Processing"), "url": item.get("URL", ""), "asset_uri": f"asset://{vaid}", "project_name": item.get("ProjectName", project_name), "create_time": item.get("CreateTime", now), "update_time": now, }) synced += 1 return {"success": True, "synced": synced} # ============================================================ # Module loader # ============================================================ def load_reallife_asset(): """Register all functions with ServerEnv.""" g = ServerEnv() g.rl_create_validate_session = rl_create_validate_session g.rl_check_validate_result = rl_check_validate_result g.rl_create_asset = rl_create_asset g.rl_sync_asset_status = rl_sync_asset_status g.rl_delete_asset = rl_delete_asset g.rl_delete_group = rl_delete_group g.rl_sync_group_from_vendor = rl_sync_group_from_vendor g.rl_sync_assets_from_vendor = rl_sync_assets_from_vendor return True