async def get_valid_admin_person(nss={}): db = DBPools() async with db.sqlorContext('kboss') as sor: # 查找客户所在的机构 who_is_org = (await sor.R('organization', {'id': nss['customerid']}))[0]['parentid'] # 查找销售的电话 saleman_phone = 0 saleman_id_li = await sor.R('customer', {'customerid': nss['customerid'], 'del_flg': '0'}) if saleman_id_li and saleman_id_li[0]['salemanid']: saleman_phone_li = await sor.R('users', {'id': saleman_id_li[0]['salemanid']}) if saleman_phone_li and saleman_phone_li[0]['mobile']: saleman_phone = saleman_phone_li[0]['mobile'] return saleman_id_li[0]['salemanid'], saleman_phone if (not saleman_phone) or (not saleman_id_li): # 首先查找运营电话 yunying_mobile = 0 current_yunying_id = await get_role_user_from_org(orgid=who_is_org, role='运营') if current_yunying_id: yunying_mobile = (await sor.R('users', {'id': current_yunying_id, 'del_flg': '0'}))[0]['mobile'] if yunying_mobile: return current_yunying_id, yunying_mobile if (not yunying_mobile) or (not current_yunying_id): # 查找管理员电话 current_admin_id = await get_role_user_from_org(orgid=who_is_org, role='管理员') if current_admin_id: admin_mobile = (await sor.R('users', {'id': current_admin_id, 'del_flg': '0'}))[0]['mobile'] if admin_mobile: return current_admin_id, admin_mobile print('%s 没有找到对应的管理人员进行电话,站内信发送' % nss['customerid']) return None, None async def innder_mail_add_msg(nss={}): """ :param ns: :return: senderid发送人id receiverid收件人id msgtitle消息标头 msgtext消息内容 """ user_orgid = nss['user_orgid'] user_id = nss['userid'] db = DBPools() async with db.sqlorContext('kboss') as sor: customer_name = (await sor.R('organization', {'id': user_orgid}))[0]['orgname'] saleman_id, saleman_mobile = await get_valid_admin_person({'customerid': user_orgid}) ns_customer = { 'id': uuid(), 'senderid': 'inner', 'receiverid': user_id, 'msgtitle': '系统消息通知', 'msgtype': '信息同步异常', 'msgtext': '尊敬的用户,由于资源升级,您购买的产品没有同步完成,请提交工单或联系售后完善操作,由此给您带来的不便深表歉意!感谢您的支持。', } ns_saleman = { 'id': uuid(), 'senderid': 'inner', 'receiverid': saleman_id, 'msgtitle': '系统新消息通知', 'msgtype': '账号补充通知', 'msgtext': '您好! 由于超算ldap账号消耗完, 用户%s购买的产品超算产品同步到供应商信息没有成功, 请及时登录查看处理!' % customer_name } await sor.C('message', ns_customer) await sor.C('message', ns_saleman) return {'status': True, 'msg': '添加成功'} async def diff_sms_send_save(sor=None, user_orgid=None): try: # customer_phone = (await sor.R('organization', {'id': user_orgid}))[0]['contactor_phone'] customer_name = (await sor.R('organization', {'id': user_orgid}))[0]['orgname'] # 给相关管理人员发送短信 saleman_id, saleman_mobile = await get_valid_admin_person({'customerid': user_orgid}) if saleman_mobile: sms_send_dict_saleman = { 'reason': '可用ldap账号消耗完', 'name': customer_name, 'productname': '济南超算产品' } print('短信发送 sms_send_dict_saleman', sms_send_dict_saleman) await send_vcode(saleman_mobile, '信息同步异常角色提醒', sms_send_dict_saleman) except Exception as e: print('发送短信失败: %s' % e) return { 'status': False, 'msg': '发送短信失败', 'data': e } async def hpc_sync_user(ns={}): if ns.get('loggers'): loggers = ns['loggers'] else: loggers = await loggerconfig(logname='hpc_jncs.log', logger="hpc_saleman_sync") db = DBPools() async with db.sqlorContext('kboss') as sor: # 如果已经同步过, 不再执行同步操作 sync_already_li = await sor.R('jncs_syncinfo', {'id': ns.get('syncinfoid'), 'del_flg': '0'}) if sync_already_li and sync_already_li[0]['sync_date']: if sync_already_li[0]['sync_status'] == '1': sync_already = { 'status': False, 'id': ns.get('syncinfoid'), 'msg': '这个账单已经同步过' } loggers.info(str(sync_already)) return sync_already url = 'https://hpc.kaiyuancloud.cn/rms/api/biz/user/addCTimeInfo' data = { "loginId": ns.get('orgid'), "loginName": ns.get('orgname'), "clusterCode": ns.get('clustercode'), "ldapUid": ns.get('ldapuid'), "queueCode": ns.get('queuecode'), "ctime": ns.get('quantity') } # get cookie cookie_url = "https://hpc.kaiyuancloud.cn/rms/api/sign" cipher = "WIfHaMnen2WaRw4Agwenoe6TwDD3LxBbLEgMjVKDHADPE7xDJicjSG3yAfK4iNes1vUmVcoTxV2/+gMfla8ZDg==" # response_cookie = requests.post(url=cookie_url, data={"cipher": cipher}) async with aiohttp_client.request( method='POST', url=cookie_url, data={"cipher": cipher}) as res_cipher: res_cipher_cookie = res_cipher.headers.get("Set-Cookie") # res = requests.post(url=url, headers=headerss, data=data) async with aiohttp_client.request( method='POST', url=url, headers={"Cookie": res_cipher_cookie}, data=data) as res: res_json = await res.json() if res.status == 200: if res_json.get('code') == 200: # update orgid status syncinfo_update = { 'id': ns.get('syncinfoid'), 'queuecode': ns.get('queuecode'), 'clustercode': ns.get('clustercode'), 'sync_date': time.strftime('%Y-%m-%d'), 'sync_status': '1' } usermap_udpate_sql = """UPDATE jncs_usermapping SET sync_status = '1' WHERE ldapuid = '%s' and clustercode = '%s' and queuecode = '%s' and orgid = '%s';""" % (ns.get('ldapuid'), ns.get('clustercode'), ns.get('queuecode'), ns.get('orgid')) await sor.U('jncs_syncinfo', syncinfo_update) await sor.sqlExe(usermap_udpate_sql, {}) return_res = { 'status': True, 'msg': '超算账号: %s, 核时: %s, 同步成功' % (data.get('loginName'), data.get('ctime')), 'sync_time': time.strftime('%Y-%m-%d %H:%M:%S'), 'data': res_json } loggers.info(str(return_res)) return return_res else: return_res = { 'status': False, 'msg': 'jncs account: %s, ctime: %s, 同步失败' % (data.get('loginName'), data.get('ctime')), 'data': res_json, 'sync_time': time.strftime('%Y-%m-%d %H:%M:%S') } loggers.info(str(return_res)) return return_res async def get_role_user_from_org(orgid=None, role=None): """ 通过机构获取对应的角色 :param orgid: :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: # 找出所有业主机构和分销商userid role_li = await sor.R('role', {'role': role, 'del_flg': '0'}) role_ids = [i['id'] for i in role_li] role_all_sql = """select userid from userrole where roleid in ({});""".format(','.join(["'{}'".format(val) for val in role_ids])) role_all_li = await sor.sqlExe(role_all_sql, {}) role_all = [i['userid'] for i in role_all_li] # 找出userid表中orgid为当前机构的所有用户 user_ids_li = await sor.R('users', {'orgid': orgid, 'del_flg': '0'}) user_ids = [i['id'] for i in user_ids_li] current_li = list(set(role_all).intersection(set(user_ids))) if current_li: current_role_userid = current_li[0] else: current_role_userid = None return current_role_userid async def get_valid_ldap_sync_user_time(loggers=None, nss=None, productname=None): """ 筛选有效ldap账号并同步核时 :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: activate_ldap_li = await sor.R('jncs_usermapping', {'orgid': '', 'relate_config': productname, 'sync_status': 0, 'del_flg': '0'}) user_org_name = (await sor.R('organization', {'id': nss['customerid']}))[0]['orgname'] user_id = (await sor.R('users', {'orgid': nss['customerid']}))[0]['id'] # have activate ldap if activate_ldap_li: new_id = activate_ldap_li[0].get('id') clustercode, queuecode, ldapuid = activate_ldap_li[0]['clustercode'], activate_ldap_li[0]['queuecode'], activate_ldap_li[0]['ldapuid'] loggers.info('未绑定账号: %s, 获取新的ldapuid: %s 将要绑定并尝试同步核时...' % (user_org_name, ldapuid)) else: # 发送短信给销售 # await diff_sms_send_save(sor=sor, user_orgid=nss['customerid']) await innder_mail_add_msg({'user_orgid': nss['customerid'], 'userid': user_id, 'productname': '济南超算产品'}) valid_res = { 'status': False, 'msg': '%s 新用户: %s, 可用ldap账号已经消耗完, 请添加新的ldap账号' % (time.strftime('%Y-%m-%d %H:%M:%S'), nss['customerid']) } loggers.info(str(valid_res)) return valid_res sync_dict = { "syncinfoid": nss['id'], "customerid": nss['customerid'], "productid": nss['productid'], "quantity": nss['quantity'], "ldapuid": ldapuid, "clustercode": clustercode, "queuecode": queuecode, "orgid": nss['customerid'], 'loggers': loggers } loggers.info('新用户绑定新账号以后开始尝试账单同步信息: %s' % str(sync_dict)) sync_dict["orgname"] = user_org_name sync_status = await hpc_sync_user(sync_dict) if sync_status and sync_status['status']: # TODO 同步成功绑定对应账号 sync_res = { 'status': True, 'msg': '未绑定账号: %s, 获取账号并同步核时成功, 绑定账号成功' % sync_dict["orgname"] } loggers.info(str(sync_res)) return sync_res else: db = DBPools() async with db.sqlorContext('kboss') as sor: # 没有同步成功 就把报错信息填入sync_msg await sor.U('jncs_usermapping', {'id': new_id, 'orgid': 1, 'sync_status': 2, 'sync_msg': sync_status['data']['msg']}) loggers.info('未绑定账号: %s, 获取新账号id: %s 绑定失败' % (sync_dict["orgname"], new_id)) # 递归该函数 直到同步成功为止 或 没有可用的ldap账号 return await get_valid_ldap_sync_user_time(loggers=loggers, nss=nss, productname=productname) async def hpc_save_bill(ns={}): """ 保存付费成功购买了济南超算产品 hpc_sync_user/get_valid_ldap_sync_user_time/get_role_user_from_org 1. 用户原来同步过 是否同步成功 同步成功 2. 用户原来没有同步过 2.1 数据库还有剩余有效账号 2.1.1 同步了一次就绑定了有效账号 是否核时同步成功 2.1.2 先绑定了无效账号 无效账号状态是否改变 2.2 数据库没有剩余有效账号 2.2.1 用户/管理人员是否发送短信/站内信 成功 3. 用户绑定过 3.1 用户绑定过关联产品 同步成功 3.2 用户绑定过其他产品 没有绑定关联产品 productname=None 修复 3.3 用户绑定过其他产品 没有绑定关联产品 不存在可用 短信/站内信发送成功 不可用状态更新成功 4. 用户没有绑定过 :param ns: :return: """ # orderid, customerid, providerid, productid, quantity # await HpcSaveBill(orderid, orgid[0]['customerid'],product[0]['providerpid'],product[0]['id'],j['quantity']) nss = { 'id': uuid(), 'orderid': ns.get('orderid'), 'customerid': ns.get('customerid'), 'productid': ns.get('productid'), 'quantity': ns.get('quantity'), 'sync_status': 0 } try: # 获取济南超算id db = DBPools() async with db.sqlorContext('kboss') as sor: jncs_li = await sor.R('organization', {'orgname': '济南超算', 'id': ns.get('providerid'), 'del_flg': '0'}) product_name = (await sor.R('product', {'id': nss['productid']}))[0]['name'] if len(jncs_li) >= 1: await sor.C('jncs_syncinfo', nss) if jncs_li: loggers = await loggerconfig(logname='hpc_jncs.log', logger="hpc") db = DBPools() async with db.sqlorContext('kboss') as sor: # 查看用户是否已经绑定过 # get ldap account acc_li = await sor.R('jncs_usermapping', {'orgid': ns.get('customerid'), 'relate_config': product_name, 'del_flg': '0'}) # already sync if acc_li: clustercode, queuecode, ldapuid = acc_li[0]['clustercode'], acc_li[0]['queuecode'], acc_li[0]['ldapuid'] sync_dict = { "syncinfoid": nss['id'], "customerid": nss['customerid'], "productid": nss['productid'], "quantity": nss['quantity'], "ldapuid": ldapuid, "clustercode": clustercode, "queuecode": queuecode, "orgid": nss['customerid'], 'orderid': nss['orderid'], 'loggers': loggers # 'from_save_bill': 1 } loggers.info('用户此前已经绑定过ldap账号, 账单将要同步内容: %s' % str(sync_dict)) sync_dict["orgname"] = (await sor.R('organization', {'id': nss['customerid']}))[0]['orgname'] sync_status = await hpc_sync_user(sync_dict) if sync_status['status']: sync_res = { 'status': True, 'msg': sync_status['msg'], 'data': '%s 已经绑定过ldap账号, 同步核时成功' % nss['customerid'] } return sync_res else: sync_res = { 'status': False, 'msg': '%s 已经绑定过ldap账号, 同步核时失败' % nss['customerid'] } loggers.info(str(sync_res)) return sync_res # not sync else: sync_ldap_ctime = await get_valid_ldap_sync_user_time(loggers=loggers, nss=nss, productname=product_name) if sync_ldap_ctime and sync_ldap_ctime['status']: return { 'status': True, 'msg': sync_ldap_ctime['msg'] } else: return { 'status': False, 'msg': sync_ldap_ctime['msg'] } else: return { 'status': False, 'msg': 'the provider is not jncs' } except Exception as e: print(e) raise e print('params_kw=', params_kw) ret = await hpc_save_bill(params_kw) return ret