# -*- coding: utf-8 -*- # @Time: 2025/6/16 15:43 # -*- coding: utf-8 -*- # @Time: 2023/4/23 17:18 import datetime #从Python SDK导入SMS配置管理模块以及安全认证模块 from baidubce.bce_client_configuration import BceClientConfiguration from baidubce.auth.bce_credentials import BceCredentials # 导入SMS相关模块 import baidubce.services.sms.sms_client as sms import baidubce.exception as ex from sqlor.dbpools import DBPools from appPublic.uniqueID import getID as uuid class BaiduSMS: # 百度短信错误码文档(需替换为实际文档链接) doc = "https://cloud.baidu.com/doc/SMS/s/zjwvxry6e" def __init__(self): # 替换为您的百度AK/SK self.access_key = 'ALTAKPk92fX9cgGDax83yNL8mP' self.access_key_secret = '9b16b8efd4dc463d8bbd5462db1db8a5' self.host = 'sms.bj.baidubce.com' self.sms_client = self.create_client(self.access_key, self.access_key_secret, self.host) # 替换为您的百度短信签名名称 # self.signature_id = 'sms-sign-qQHYeC17077' # 开元云科技 self.signature_id = 'sms-sign-BqOhYB33019' # 开元云 # 短信模板类型映射(键为业务类型,值为对应模板ID) self.sms_types = { "注册登录验证": "sms-tpl-123", # 示例模板ID } async def send(self, stype, template_id, phone, params) -> dict: """ 主发送逻辑(适配百度API) """ # 百度短信请求参数(根据实际API调整字段名) try: resp = self.sms_client.send_message(signature_id=self.signature_id, template_id=template_id, mobile=phone, content_var_dict=params ) # 百度异步发送方法 return await self.__validation(stype=stype, template_id=template_id, params=params, phone=phone, resp=resp) except ex.BceHttpClientError as e: if isinstance(e.last_error, ex.BceServerError): print('send request failed. Response %s, code: %s, request_id: %s' % (e.last_error.status_code, e.last_error.code, e.last_error.request_id)) else: print('send request failed. Unknown exception: %s' % e) return {'status': False, 'msg': str(e)} async def send_vcode(self, phone: str, stype: str, vcode) -> dict: """ 发送验证码 """ db = DBPools() async with db.sqlorContext('kboss') as sor: # 假设数据库表结构与原代码一致(sms_template表) template_info = await sor.R('sms_template', {'name': stype, 'del_flg': '0'}) if template_info: template_id = template_info[0]['code'] # 假设code字段存储百度模板ID else: log = { 'id': uuid(), 'send_type': stype, 'mobile': phone, 'message': str(vcode), 'send_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'send_status': 0, 'remark': f'未找到模板类型: {stype}' } await sor.C('sms_record', log) return {'status': False, 'msg': '模板未配置,请检查sms_template表'} return await self.send(stype, template_id, phone, vcode) async def __validation(self, stype, template_id, params, phone, resp) -> dict: """ 验证百度短信返回结果 """ db = DBPools() async with db.sqlorContext('kboss') as sor: send_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") log = { 'id': uuid(), 'send_type': stype, 'mobile': phone, 'message': str(params), 'send_time': send_time, 'send_status': 1 } # 百度通常用0表示成功(需根据实际API调整) if resp.code == '1000': msg = f'{send_time} {phone} 百度短信发送成功,code: {resp.code}' log['send_status'] = 1 await sor.C('sms_record', log) return {'status': True, 'msg': msg} else: msg = f'{send_time} {phone} 百度短信发送失败,code: {resp.code},参考文档: {self.doc}' log['send_status'] = 0 log['remark'] = msg await sor.C('sms_record', log) return {'status': False, 'msg': msg} def create_client(self, access_key: str, access_key_secret: str, host: str): """ 创建百度短信客户端 """ baiDuSmsConfig = BceClientConfiguration(credentials=BceCredentials(access_key, access_key_secret), endpoint=host) sms_client = sms.SmsClient(baiDuSmsConfig) return sms_client async def send_baidu_vcode(phone: str, stype: str, vcode: dict) -> dict: """ 外部调用入口(与原代码结构一致) """ result = await BaiduSMS().send_vcode(phone, stype, vcode) return result # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(send_baidu_vcode('18053191417', '用户注册登录验证', {'SMSvCode': '989898'})) # loop.run_until_complete(send_baidu_vcode('18053191417', '余额不足提醒', {'time': '2025-06-18 17:47:00'})) # loop.run_until_complete(send_baidu_vcode('18053191417', '用户欠费通知', {'time': '2025-06-18 17:47:00', 'productname': 'BCC_云主机'})) # loop.run_until_complete(send_baidu_vcode('18053191417', '百度kafka普通验证码', {'content': '验证码为:636814,感谢您使用百度云服务,请填写完整完成验证。'})) # loop.run_until_complete(send_baidu_vcode('18053191417', '百度kafka普通通知', {'content': '尊敬的用户,您的百度智能云账户(登录名:-)的 弹性公网IP资源(区域:华北-北京 IP:120.48.5.167 ,回收站中 )将于2025-06-18释放, 截至当前仅剩1天。 为保证正常使用,请及时续费或保留好数据,资源详情请查看邮件,感谢支持! '}))