# ============================================================ # 上传素材到指定认证组合 # 参数: vendor_group_id(必填), source_url(必填, 素材URL), # asset_type(选填, Image/Video/Audio, 自动检测), name(选填) # curl 示例: # curl -X POST 'https://ai.atvoe.com/reallife_asset/api/rl_upload.dspy' \ # -H 'Authorization: Bearer YOUR_TOKEN' \ # -d 'vendor_group_id=group-xxx&source_url=https://example.com/photo.jpg&asset_type=Image&name=测试素材' # ============================================================ vendor_group_id = params_kw.get('vendor_group_id', '') source_url = params_kw.get('source_url', '') asset_type = params_kw.get('asset_type', '') name = params_kw.get('name', '') if not vendor_group_id or not source_url: return json.dumps({"status": "error", "data": {"message": "请选择认证组合并上传素材文件"}}, ensure_ascii=False) # Validate media file type from path extension import os ext = os.path.splitext(source_url.split('?')[0])[1].lower() if source_url else '' media_map = { '.jpg': 'Image', '.jpeg': 'Image', '.png': 'Image', '.gif': 'Image', '.bmp': 'Image', '.webp': 'Image', '.svg': 'Image', '.mp4': 'Video', '.avi': 'Video', '.mov': 'Video', '.wmv': 'Video', '.flv': 'Video', '.mkv': 'Video', '.webm': 'Video', '.mp3': 'Audio', '.wav': 'Audio', '.aac': 'Audio', '.flac': 'Audio', '.ogg': 'Audio', '.wma': 'Audio', '.m4a': 'Audio', } detected_type = media_map.get(ext, '') if ext and not detected_type: return json.dumps({"status": "error", "data": {"message": f"不支持的文件类型: {ext},请上传图片、音频或视频文件"}}, ensure_ascii=False) if not asset_type and detected_type: asset_type = detected_type if not asset_type: asset_type = 'Image' # If source_url is base64 data or local path, convert to public URL if source_url.startswith('data:') or (not source_url.startswith('http') and len(source_url) < 8000): source_url = await b64media2url(request, source_url) if not source_url: return json.dumps({"status": "error", "data": {"message": "素材文件转换失败"}}, ensure_ascii=False) org_id = (await get_userorgid()) or '0' user_id = (await get_user()) or '' result = await rl_upload_user(org_id, vendor_group_id, source_url, asset_type, name, user_id) if result.get('success'): return json.dumps({ "status": "ok", "data": { "id": result.get('id', ''), "vendor_asset_id": result.get('vendor_asset_id', ''), "status": result.get('status', 'Processing'), } }, ensure_ascii=False) else: return json.dumps({ "status": "error", "data": {"message": result.get('message', '上传失败')} }, ensure_ascii=False)