sage/app/alisms.py
2025-07-16 14:28:41 +08:00

123 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Time: 2023/4/23 17:18
"""
环境要求
Python 3
安装 SDK 核心库 OpenAPI 使用pip安装包依赖:
pip install alibabacloud_tea_openapi
pip install alibabacloud_dysmsapi20170525
"""
"""
资源到期预警
[开元云科技] 尊敬的用户, 您好!您的云服务器账号中将于 2023-01-14 23:59:59 到期, 为避免影响到您的使用, 请及时关注并续费!
感谢您的支持。
资源到期通知
[开元云科技] 尊敬的用户, 您好!您的云服务器已于昨天到期, 将于今天中午12:00关停服务 为避免影响到您的使用, 请及时关注并续费!
新产品上线通知
[开元云科技] 惠上云,更简单,新产品火热开售,王牌性价比,享触底尝鲜价,立即抢购: http://...
余额不足预警
[开元云科技] 尊敬的用户,您好!您的余额还有${amount}$元, 为避免影响到您的使用, 请及时关注并续费!
"""
import random
import re
import asyncio
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
import json
from sqlor.dbpools import DBPools
from appPublic.uniqueID import getID as uuid
from appPublic.timeUtils import timestampstr
from appPublic.log import info, debug, error, warning, critical, exception
"""
self.access_key = 'LTAI5t5w7xsZgueod6uZ3TCD'
self.access_key_secret = 'n1HttSbQvgEbjvf62Gzl1aagfKyIyS'
"""
class AliSMS:
# 返回错误码对应:
doc = "https://help.aliyun.com/document_detail/101346.html"
def __init__(self, apikey, secretkey):
self.access_key = apikey
self.access_key_secret = secretkey
self.sms_client = self.create_client(self.access_key,
self.access_key_secret)
self.sign_name = '开元云科技'
async def send_validate_code(self, cell_no, vcode):
info(f'{cell_no=}, {vcode=}')
return await self.send('SMS_460770222', cell_no, {'code':vcode})
async def send(self, type_code, phone, ns) -> dict:
"""
主程序入口,异步方式
"""
if isinstance(ns, dict):
ns = json.dumps(ns)
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
phone_numbers=phone,
sign_name=self.sign_name,
template_code=type_code,
template_param=ns
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = await self.sms_client.send_sms_with_options_async(send_sms_request, runtime)
if resp.body.code == "OK":
return {
'status': True
}
return {
'status': False,
'msg':f'{timestampstr()} {phone} 短信发送失败返回code{resp.body.code},请参考文档:{self.doc}'
}
except Exception as error:
print(error)
return {
'status': False,
'msg': error
}
def create_client(self,
access_key_id: str,
access_key_secret: str,
) -> Dysmsapi20170525Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
# 您的 AccessKey ID,
access_key_id=access_key_id,
# 您的 AccessKey Secret,
access_key_secret=access_key_secret
)
# 访问的域名
config.endpoint = f'dysmsapi.aliyuncs.com'
return Dysmsapi20170525Client(config)
if __name__ == '__main__':
async def main(sms):
r = await sms.send('SMS_460770222', '13801015292', {'code':'993344'})
print(f'return {r}')
sms = AliSMS('LTAI5t5w7xsZgueod6uZ3TCD', 'n1HttSbQvgEbjvf62Gzl1aagfKyIyS')
asyncio.get_event_loop().run_until_complete(main(sms))