async def hpc_sync_user(ns={}): # if ns.get('loggers'): # loggers = ns['loggers'] # else: # loggers = await loggerconfig(logname='hpc_jncs.log', logger="hpc_saleman_sync") db = DBPools() async with db.sqlorContext('kboss') as sor: # 如果已经同步过, 不再执行同步操作 sync_already_li = await sor.R('jncs_syncinfo', {'id': ns.get('syncinfoid'), 'del_flg': '0'}) if sync_already_li and sync_already_li[0]['sync_date']: if sync_already_li[0]['sync_status'] == '1': sync_already = { 'status': False, 'id': ns.get('syncinfoid'), 'msg': '这个账单已经同步过' } # loggers.info(str(sync_already)) return sync_already url = 'https://hpc.kaiyuancloud.cn/rms/api/biz/user/addCTimeInfo' data = { "loginId": ns.get('orgid'), "loginName": ns.get('orgname'), "clusterCode": ns.get('clustercode'), "ldapUid": ns.get('ldapuid'), "queueCode": ns.get('queuecode'), "ctime": ns.get('quantity') } # get cookie cookie_url = "https://hpc.kaiyuancloud.cn/rms/api/sign" cipher = "WIfHaMnen2WaRw4Agwenoe6TwDD3LxBbLEgMjVKDHADPE7xDJicjSG3yAfK4iNes1vUmVcoTxV2/+gMfla8ZDg==" # response_cookie = requests.post(url=cookie_url, data={"cipher": cipher}) async with aiohttp_client.request( method='POST', url=cookie_url, data={"cipher": cipher}) as res_cipher: res_cipher_cookie = res_cipher.headers.get("Set-Cookie") # res = requests.post(url=url, headers=headerss, data=data) async with aiohttp_client.request( method='POST', url=url, headers={"Cookie": res_cipher_cookie}, data=data) as res: res_json = await res.json() if res.status == 200: if res_json.get('code') == 200: # update orgid status syncinfo_update = { 'id': ns.get('syncinfoid'), 'queuecode': ns.get('queuecode'), 'clustercode': ns.get('clustercode'), 'sync_date': time.strftime('%Y-%m-%d'), 'sync_status': '1' } usermap_udpate_sql = """UPDATE jncs_usermapping SET sync_status = '1' WHERE ldapuid = '%s' and clustercode = '%s' and queuecode = '%s' and orgid = '%s';""" % (ns.get('ldapuid'), ns.get('clustercode'), ns.get('queuecode'), ns.get('orgid')) await sor.U('jncs_syncinfo', syncinfo_update) await sor.sqlExe(usermap_udpate_sql, {}) return_res = { 'status': True, 'msg': '超算账号: %s, 核时: %s, 同步成功' % (data.get('loginName'), data.get('ctime')), 'sync_time': time.strftime('%Y-%m-%d %H:%M:%S'), 'data': res_json } # loggers.info(str(return_res)) return return_res else: return_res = { 'status': False, 'msg': 'jncs account: %s, ctime: %s, 同步失败' % (data.get('loginName'), data.get('ctime')), 'data': res_json, 'sync_time': time.strftime('%Y-%m-%d %H:%M:%S') } # loggers.info(str(return_res)) return return_res async def get_role_user_from_org(orgid=None, role=None): """ 通过机构获取对应的角色 :param orgid: :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: # 找出所有业主机构和分销商userid role_li = await sor.R('role', {'role': role, 'del_flg': '0'}) role_ids = [i['id'] for i in role_li] role_all_sql = """select userid from userrole where roleid in ({});""".format(','.join(["'{}'".format(val) for val in role_ids])) role_all_li = await sor.sqlExe(role_all_sql, {}) role_all = [i['userid'] for i in role_all_li] # 找出userid表中orgid为当前机构的所有用户 user_ids_li = await sor.R('users', {'orgid': orgid, 'del_flg': '0'}) user_ids = [i['id'] for i in user_ids_li] current_li = list(set(role_all).intersection(set(user_ids))) if current_li: current_role_userid = current_li[0] else: current_role_userid = None return current_role_userid async def get_valid_ldap_sync_user_time(loggers=None, nss=None): """ 筛选有效ldap账号并同步核时 :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: activate_ldap_li = await sor.R('jncs_usermapping', {'orgid': '', 'sync_status': 0, 'del_flg': '0'}) # have activate ldap if activate_ldap_li: user_org_name = (await sor.R('organization', {'id': nss['customerid']}))[0]['orgname'] new_id = activate_ldap_li[0].get('id') clustercode, queuecode, ldapuid = activate_ldap_li[0]['clustercode'], activate_ldap_li[0]['queuecode'], activate_ldap_li[0]['ldapuid'] loggers.info('未绑定账号: %s, 获取新的ldapuid: %s 将要绑定并尝试同步核时...' % (user_org_name, ldapuid)) else: # TODO 发送短信给销售 # 查找客户所在的机构 # who_is_org = (await sor.R('organization', {'id': nss['customerid']}))[0]['parentid'] # # 查找销售的电话 # saleman_phone = 0 # saleman_id_li = await sor.R('customer', {'customerid': nss['customerid'], 'del_flg': '0'}) # if saleman_id_li and saleman_id_li[0]['salemanid']: # saleman_phone_li = await sor.R('users', {'id': saleman_id_li[0]['salemanid']}) # if saleman_phone_li and saleman_phone_li[0]['mobile']: # saleman_phone = saleman_phone_li[0]['mobile'] # await send_vcode(saleman_phone, '注册登录验证', {'code': '209898'}) # # if (not saleman_phone) or (not saleman_id_li): # # 首先查找运营电话 # yunying_mobile = 0 # current_yunying_id = await get_role_user_from_org(orgid=who_is_org, role='运营') # if current_yunying_id: # yunying_mobile = (await sor.R('users', {'id': current_yunying_id, 'del_flg': '0'}))[0]['mobile'] # if yunying_mobile: # await send_vcode(yunying_mobile, '注册登录验证', {'code': '209898'}) # if (not yunying_mobile) or (not current_yunying_id): # # 查找管理员电话 # current_admin_id = await get_role_user_from_org(orgid=who_is_org, role='管理员') # if current_admin_id: # admin_mobile = (await sor.R('users', {'id': current_admin_id, 'del_flg': '0'}))[0]['mobile'] # if admin_mobile: # send_vcode(admin_mobile, '注册登录验证', {'code': '209898'}) # TODO 发送站内信给销售 # TODO 发送站内信给客户 valid_res = { 'status': False, 'msg': '%s 新用户: %s, 可用ldap账号已经消耗完, 请添加新的ldap账号' % (time.strftime('%Y-%m-%d %H:%M:%S'), nss['customerid']) } # loggers.info(str(valid_res)) return valid_res sync_dict = { "syncinfoid": nss['id'], "customerid": nss['customerid'], "productid": nss['productid'], "quantity": nss['quantity'], "ldapuid": ldapuid, "clustercode": clustercode, "queuecode": queuecode, "orgid": nss['customerid'], # 'loggers': loggers } loggers.info('新用户绑定新账号以后开始尝试账单同步信息: %s' % str(sync_dict)) sync_dict["orgname"] = user_org_name sync_status = await hpc_sync_user(sync_dict) if sync_status and sync_status['status']: # TODO 同步成功绑定对应账号 sync_res = { 'status': True, 'msg': '未绑定账号: %s, 获取账号并同步核时成功, 绑定账号成功' % sync_dict["orgname"] } # loggers.info(str(sync_res)) return sync_res else: db = DBPools() async with db.sqlorContext('kboss') as sor: # 没有同步成功 就把报错信息填入sync_msg await sor.U('jncs_usermapping', {'id': new_id, 'orgid': 1, 'sync_status': 2, 'sync_msg': sync_status['data']['msg']}) # loggers.info('未绑定账号: %s, 获取新账号id: %s 绑定失败' % (sync_dict["orgname"], new_id)) # 递归该函数 直到同步成功为止 或 没有可用的ldap账号 return await get_valid_ldap_sync_user_time(loggers=loggers, nss=nss) async def hpc_save_bill(ns={}): """ 保存付费成功购买了济南超算产品 hpc_sync_user/get_valid_ldap_sync_user_time :param ns: :return: """ # orderid, customerid, providerid, productid, quantity # await HpcSaveBill(orderid, orgid[0]['customerid'],product[0]['providerpid'],product[0]['id'],j['quantity']) nss = { 'id': uuid(), 'orderid': ns.get('orderid'), 'customerid': ns.get('customerid'), 'productid': ns.get('productid'), 'quantity': ns.get('quantity'), 'sync_status': 0 } try: # 获取济南超算id db = DBPools() async with db.sqlorContext('kboss') as sor: jncs_li = await sor.R('organization', {'orgname': '济南超算', 'id': ns.get('providerid'), 'del_flg': '0'}) if len(jncs_li) >= 1: await sor.C('jncs_syncinfo', nss) if jncs_li: # loggers = await loggerconfig(logname='hpc_jncs.log', logger="hpc") db = DBPools() async with db.sqlorContext('kboss') as sor: # 查看用户是否已经绑定过 # get ldap account acc_li = await sor.R('jncs_usermapping', {'orgid': ns.get('customerid'), 'del_flg': '0'}) # already sync if acc_li: clustercode, queuecode, ldapuid = acc_li[0]['clustercode'], acc_li[0]['queuecode'], acc_li[0]['ldapuid'] sync_dict = { "syncinfoid": nss['id'], "customerid": nss['customerid'], "productid": nss['productid'], "quantity": nss['quantity'], "ldapuid": ldapuid, "clustercode": clustercode, "queuecode": queuecode, "orgid": nss['customerid'], 'orderid': nss['orderid'], 'loggers': loggers } # loggers.info('用户此前已经绑定过ldap账号, 账单将要同步内容: %s' % str(sync_dict)) sync_dict["orgname"] = (await sor.R('organization', {'id': nss['customerid']}))[0]['orgname'] sync_status = await hpc_sync_user(sync_dict) if sync_status['status']: sync_res = { 'status': True, 'msg': sync_status['msg'], 'data': '%s 已经绑定过ldap账号, 同步核时成功' % nss['customerid'] } return sync_res else: sync_res = { 'status': False, 'msg': '%s 已经绑定过ldap账号, 同步核时失败' % nss['customerid'] } # loggers.info(str(sync_res)) return sync_res # not sync else: sync_ldap_ctime = await get_valid_ldap_sync_user_time(loggers=loggers, nss=nss) if sync_ldap_ctime and sync_ldap_ctime['status']: return { 'status': True, 'msg': sync_ldap_ctime['msg'] } else: return { 'status': False, 'msg': sync_ldap_ctime['msg'] } else: return { 'status': False, 'msg': 'the provider is not jncs' } except Exception as e: print(e) raise e print('params_kw=', params_kw) ret = await hpc_save_bill(params_kw) return ret