# -*- coding: utf-8 -*- # @Time: 2023/4/23 17:18 """ 环境要求 Python 3 安装 SDK 核心库 OpenAPI ,使用pip安装包依赖: pip install alibabacloud_tea_openapi pip install alibabacloud_dysmsapi20170525 """ """ 资源到期预警 [开元云科技] 尊敬的用户, 您好!您的云服务器账号中将于 2023-01-14 23:59:59 到期, 为避免影响到您的使用, 请及时关注并续费! 感谢您的支持。 资源到期通知 [开元云科技] 尊敬的用户, 您好!您的云服务器已于昨天到期, 将于今天中午12:00关停服务, 为避免影响到您的使用, 请及时关注并续费! 新产品上线通知 [开元云科技] 惠上云,更简单,新产品火热开售,王牌性价比,享触底尝鲜价,立即抢购: http://... 余额不足预警 [开元云科技] 尊敬的用户,您好!您的余额还有${amount}$元, 为避免影响到您的使用, 请及时关注并续费! """ import random import re import asyncio from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient import json from sqlor.dbpools import DBPools from appPublic.uniqueID import getID as uuid from appPublic.timeUtils import timestampstr from appPublic.log import info, debug, error, warning, critical, exception """ self.access_key = 'LTAI5t5w7xsZgueod6uZ3TCD' self.access_key_secret = 'n1HttSbQvgEbjvf62Gzl1aagfKyIyS' """ class AliSMS: # 返回错误码对应: doc = "https://help.aliyun.com/document_detail/101346.html" def __init__(self, apikey, secretkey): self.access_key = apikey self.access_key_secret = secretkey self.sms_client = self.create_client(self.access_key, self.access_key_secret) self.sign_name = '开元云科技' async def send_validate_code(self, cell_no, vcode): info(f'{cell_no=}, {vcode=}') return await self.send('SMS_460770222', cell_no, {'code':vcode}) async def send(self, type_code, phone, ns) -> dict: """ 主程序入口,异步方式 """ if isinstance(ns, dict): ns = json.dumps(ns) send_sms_request = dysmsapi_20170525_models.SendSmsRequest( phone_numbers=phone, sign_name=self.sign_name, template_code=type_code, template_param=ns ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = await self.sms_client.send_sms_with_options_async(send_sms_request, runtime) if resp.body.code == "OK": return { 'status': True } return { 'status': False, 'msg':f'{timestampstr()} {phone} 短信发送失败,返回code:{resp.body.code},请参考文档:{self.doc}' } except Exception as error: print(error) return { 'status': False, 'msg': error } def create_client(self, access_key_id: str, access_key_secret: str, ) -> Dysmsapi20170525Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 您的 AccessKey ID, access_key_id=access_key_id, # 您的 AccessKey Secret, access_key_secret=access_key_secret ) # 访问的域名 config.endpoint = f'dysmsapi.aliyuncs.com' return Dysmsapi20170525Client(config) if __name__ == '__main__': async def main(sms): r = await sms.send('SMS_460770222', '13801015292', {'code':'993344'}) print(f'return {r}') sms = AliSMS('LTAI5t5w7xsZgueod6uZ3TCD', 'n1HttSbQvgEbjvf62Gzl1aagfKyIyS') asyncio.get_event_loop().run_until_complete(main(sms))