# -*- coding: utf-8 -*- # @Time: 2023/4/23 17:18 """ 环境要求 Python 3 安装 SDK 核心库 OpenAPI ,使用pip安装包依赖: pip install alibabacloud_tea_openapi pip install alibabacloud_dysmsapi20170525 """ """ 资源到期预警 [开元云科技] 尊敬的用户, 您好!您的云服务器账号中将于 2023-01-14 23:59:59 到期, 为避免影响到您的使用, 请及时关注并续费! 感谢您的支持。 资源到期通知 [开元云科技] 尊敬的用户, 您好!您的云服务器已于昨天到期, 将于今天中午12:00关停服务, 为避免影响到您的使用, 请及时关注并续费! 新产品上线通知 [开元云科技] 惠上云,更简单,新产品火热开售,王牌性价比,享触底尝鲜价,立即抢购: http://... 余额不足预警 [开元云科技] 尊敬的用户,您好!您的余额还有${amount}$元, 为避免影响到您的使用, 请及时关注并续费! """ import random import re import asyncio # from core.exception import CustomException from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient import json import logging import datetime from sqlor.dbpools import DBPools from appPublic.uniqueID import getID as uuid class AliyunSMS: # 返回错误码对应: doc = "https://help.aliyun.com/document_detail/101346.html" def __init__(self): self.access_key = 'LTAI5t5w7xsZgueod6uZ3TCD' self.access_key_secret = 'n1HttSbQvgEbjvf62Gzl1aagfKyIyS' self.sms_client = self.create_client(self.access_key, self.access_key_secret) self.sign_name = '开元云科技' self.sms_types = { "SMS_460770222": send_vcode, } async def send(self, stype, type_code, phone, ns) -> dict: """ 主程序入口,异步方式 """ if isinstance(ns, dict): ns = json.dumps(ns) send_sms_request = dysmsapi_20170525_models.SendSmsRequest( phone_numbers=phone, sign_name=self.sign_name, template_code=type_code, template_param=ns ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = await self.sms_client.send_sms_with_options_async(send_sms_request, runtime) return await self.__validation(stype=stype, type_code=type_code, ns=ns, phone=phone, resp=resp) except Exception as error: print(error) return { 'status': False, 'msg': error } # 如有需要,请打印 error async def send_vcode(self, phone: str, stype: str, vcode) -> dict: db = DBPools() async with db.sqlorContext('kboss') as sor: type_code_li = await sor.R('sms_template', {'name': stype}) if type_code_li: type_code = type_code_li[0]['code'] else: sms_record_log = { 'id': uuid(), 'send_type': stype, 'mobile': phone, 'message': str(vcode), 'send_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'send_status': 0, 'remark': '无法找到:%s, 请查看字符串是否一致' % stype } await sor.C('sms_record', sms_record_log) return { 'status': False, 'msg': 'can not get type code from sms_template, please check...' } return await self.send(stype, type_code, phone, vcode) async def __validation(self, stype, type_code, ns, phone, resp: dysmsapi_20170525_models.SendSmsResponse) -> dict: """ 验证结果并返回 """ db = DBPools() async with db.sqlorContext('kboss') as sor: send_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") sms_record_log = { 'id': uuid(), 'send_type': stype, 'mobile': phone, 'message': str(ns), 'send_time': send_time, 'send_status': 1 } if resp.body.code == "OK": msg = f'{send_time} {phone} 短信发送成功,返回code:{resp.body.code}' print(msg) sms_record_log['send_status'] = 1 await sor.C('sms_record', sms_record_log) return { 'status': True, 'msg': msg } else: msg = f'{send_time} {phone} 短信发送失败,返回code:{resp.body.code},请参考文档:{self.doc}' print(msg) sms_record_log['send_status'] = 0 sms_record_log['remark'] = msg await sor.C('sms_record', sms_record_log) return { 'status': False, 'msg': msg } def create_client(self, access_key_id: str, access_key_secret: str, ) -> Dysmsapi20170525Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 您的 AccessKey ID, access_key_id=access_key_id, # 您的 AccessKey Secret, access_key_secret=access_key_secret ) # 访问的域名 config.endpoint = f'dysmsapi.aliyuncs.com' return Dysmsapi20170525Client(config) async def send_vcode(phone: str, stype:str, vcode: dict) -> dict: result = await AliyunSMS().send_vcode(phone, stype, vcode) return result # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(send_vcode('13801015291', '注册登录验证', {'code': '209898'})) # loop.run_until_complete(send_vcode('13801015291', '供应商结算提醒', {'name': '中金超算 济南超算'})) # loop.run_until_complete(send_vcode('13801015291', '到期续费通知', {'time': '2023-10-10'})) # AliyunSMS("13801015291").main_async()