async def handle_login_failed(mobile: str) -> bool: """检查短信发送限制,十分钟内最多发送三次""" from datetime import datetime, timedelta db = DBPools() async with db.sqlorContext('kboss') as sor: # 查询该手机号的发送记录 records = await sor.R('sms_limit', {'mobile': mobile}) current_time = datetime.now() if len(records) == 0: # 首次发送,创建记录 await sor.C('sms_limit', { 'mobile': mobile, 'first_send_time': current_time, 'send_count': 1, 'last_send_time': current_time, 'lock_until': None }) return True record = records[0] lock_until = record.get('lock_until', None) if lock_until: lock_until = datetime.strptime(lock_until, '%Y-%m-%d %H:%M:%S') # 检查是否在锁定时间内 if record.get('lock_until') and current_time < lock_until: return False # 检查十分钟内的发送次数 if record.get('first_send_time'): first_send_time = datetime.strptime(record['first_send_time'], '%Y-%m-%d %H:%M:%S') time_diff = current_time - first_send_time if time_diff < timedelta(minutes=10): # 十分钟内,检查发送次数 if record.get('send_count', 0) >= 3: # 超过三次,锁定10分钟 lock_time = current_time + timedelta(minutes=10) sql = "update sms_limit set lock_until='%s' where mobile='%s'" % ( lock_time, mobile ) await sor.sqlExe(sql, {}) return False else: # 未超过三次,增加计数 sql = "update sms_limit set send_count='%s', last_send_time='%s' where mobile='%s'" % ( record['send_count'] + 1, current_time, mobile ) await sor.sqlExe(sql, {}) return True else: # 超过十分钟,重置计数 sql = "update sms_limit set first_send_time='%s', send_count='%s', last_send_time='%s', lock_until=NULL where mobile='%s'" % ( current_time, 1, current_time, mobile ) await sor.sqlExe(sql, {}) return True return True async def mobilecode(ns): """发送短信验证码,支持注册和登录筛选""" db = DBPools() async with db.sqlorContext('kboss') as sor: # 获取操作类型:register 或 login action_type = ns.get('action_type') # register 或 login if not action_type: return {'status': False, 'msg': '操作类型action_type不能为空'} # 通过手机号查找用户 mobile = ns.get('mobile') if not mobile: return {'status': False, 'msg': '手机号不能为空'} # 检查短信发送限制 can_send = await handle_login_failed(mobile) if not can_send: return {'status': False, 'msg': '发送过于频繁,请10分钟后再试'} userreacs = await sor.R('users', {'mobile': mobile, 'del_flg': '0'}) # 注册逻辑:检查手机号是否已存在 if action_type == 'register': if len(userreacs) >= 1: return {'status': False, 'msg': '手机号已注册,请直接登录'} else: # 注册时手机号不存在,可以发送验证码 code = await generate_vcode() nss = await send_vcode(mobile, '用户注册登录验证', {'SMSvCode': code.get('vcode')}) if nss['status']: return {'status': True, 'msg': '注册验证码发送成功', 'codeid': code.get('id')} else: return {'status': False, 'msg': '发送失败'} # 登录逻辑:检查手机号是否存在 elif action_type == 'login': if len(userreacs) >= 1: # 登录时手机号存在,可以发送验证码 code = await generate_vcode() nss = await send_vcode(userreacs[0]['mobile'], '用户注册登录验证', {'SMSvCode': code.get('vcode')}) if nss['status']: return {'status': True, 'msg': '登录验证码发送成功', 'codeid': code.get('id')} else: return {'status': False, 'msg': '发送失败'} else: return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'} # 原有逻辑:如果没有指定action_type,保持原有逻辑 else: type = 0 ns['del_flg'] = '0' userreacs = await sor.R('users', ns) if len(userreacs) >= 1: type += 1 else: userreacs = await sor.R('users', {'mobile': ns['username'], 'del_flg': '0'}) if len(userreacs) >= 1: type += 1 if type >= 1: code = await generate_vcode() nss = await send_vcode(userreacs[0]['mobile'], '用户注册登录验证', {'SMSvCode': code.get('vcode')}) # return {'1':nss} if nss: return {'status': True, 'msg': '发送成功', 'codeid': code.get('id')} else: return {'status': False, 'msg': '发送失败'} else: return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'} ret = await mobilecode(params_kw) return ret