sage/app/ali_sms_send.py
2025-07-16 14:28:41 +08:00

175 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# @Time: 2023/4/23 17:18
"""
环境要求
Python 3
安装 SDK 核心库 OpenAPI 使用pip安装包依赖:
pip install alibabacloud_tea_openapi
pip install alibabacloud_dysmsapi20170525
"""
"""
资源到期预警
[开元云科技] 尊敬的用户, 您好!您的云服务器账号中将于 2023-01-14 23:59:59 到期, 为避免影响到您的使用, 请及时关注并续费!
感谢您的支持。
资源到期通知
[开元云科技] 尊敬的用户, 您好!您的云服务器已于昨天到期, 将于今天中午12:00关停服务 为避免影响到您的使用, 请及时关注并续费!
新产品上线通知
[开元云科技] 惠上云,更简单,新产品火热开售,王牌性价比,享触底尝鲜价,立即抢购: http://...
余额不足预警
[开元云科技] 尊敬的用户,您好!您的余额还有${amount}$元, 为避免影响到您的使用, 请及时关注并续费!
"""
import random
import re
import asyncio
# from core.exception import CustomException
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
import json
import logging
import datetime
from sqlor.dbpools import DBPools
from appPublic.uniqueID import getID as uuid
class AliyunSMS:
# 返回错误码对应:
doc = "https://help.aliyun.com/document_detail/101346.html"
def __init__(self):
self.access_key = 'LTAI5t5w7xsZgueod6uZ3TCD'
self.access_key_secret = 'n1HttSbQvgEbjvf62Gzl1aagfKyIyS'
self.sms_client = self.create_client(self.access_key,
self.access_key_secret)
self.sign_name = '开元云科技'
self.sms_types = {
"SMS_460770222": send_vcode,
}
async def send(self, stype, type_code, phone, ns) -> dict:
"""
主程序入口,异步方式
"""
if isinstance(ns, dict):
ns = json.dumps(ns)
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
phone_numbers=phone,
sign_name=self.sign_name,
template_code=type_code,
template_param=ns
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
resp = await self.sms_client.send_sms_with_options_async(send_sms_request, runtime)
return await self.__validation(stype=stype, type_code=type_code, ns=ns, phone=phone, resp=resp)
except Exception as error:
print(error)
return {
'status': False,
'msg': error
}
# 如有需要,请打印 error
async def send_vcode(self, phone: str, stype: str, vcode) -> dict:
db = DBPools()
async with db.sqlorContext('kboss') as sor:
type_code_li = await sor.R('sms_template', {'name': stype})
if type_code_li:
type_code = type_code_li[0]['code']
else:
sms_record_log = {
'id': uuid(),
'send_type': stype,
'mobile': phone,
'message': str(vcode),
'send_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'send_status': 0,
'remark': '无法找到:%s, 请查看字符串是否一致' % stype
}
await sor.C('sms_record', sms_record_log)
return {
'status': False,
'msg': 'can not get type code from sms_template, please check...'
}
return await self.send(stype, type_code, phone, vcode)
async def __validation(self, stype, type_code, ns, phone, resp: dysmsapi_20170525_models.SendSmsResponse) -> dict:
"""
验证结果并返回
"""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
send_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sms_record_log = {
'id': uuid(),
'send_type': stype,
'mobile': phone,
'message': str(ns),
'send_time': send_time,
'send_status': 1
}
if resp.body.code == "OK":
msg = f'{send_time} {phone} 短信发送成功返回code{resp.body.code}'
print(msg)
sms_record_log['send_status'] = 1
await sor.C('sms_record', sms_record_log)
return {
'status': True,
'msg': msg
}
else:
msg = f'{send_time} {phone} 短信发送失败返回code{resp.body.code},请参考文档:{self.doc}'
print(msg)
sms_record_log['send_status'] = 0
sms_record_log['remark'] = msg
await sor.C('sms_record', sms_record_log)
return {
'status': False,
'msg': msg
}
def create_client(self,
access_key_id: str,
access_key_secret: str,
) -> Dysmsapi20170525Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
# 您的 AccessKey ID,
access_key_id=access_key_id,
# 您的 AccessKey Secret,
access_key_secret=access_key_secret
)
# 访问的域名
config.endpoint = f'dysmsapi.aliyuncs.com'
return Dysmsapi20170525Client(config)
async def send_vcode(phone: str, stype:str, vcode: dict) -> dict:
result = await AliyunSMS().send_vcode(phone, stype, vcode)
return result
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(send_vcode('13801015291', '注册登录验证', {'code': '209898'}))
# loop.run_until_complete(send_vcode('13801015291', '供应商结算提醒', {'name': '中金超算 济南超算'}))
# loop.run_until_complete(send_vcode('13801015291', '到期续费通知', {'time': '2023-10-10'}))
# AliyunSMS("13801015291").main_async()