rbac/wwwroot/phone_login.dspy
yumoqing 3fdd4efeff feat(rbac): add login tracking, lockout, secure cache
- Add created_at, last_login, login_fail_count, last_login_fail fields
- 3 failed logins locks account for 5 minutes
- LRU+TTL cache for UserPermissions, thread-safe
- All login methods update last_login
- Migration SQL for existing databases
2026-04-26 10:49:01 +08:00

95 lines
2.0 KiB
Plaintext

# 用短信模块检查验证码是否正确
debug(f'phone_login.dspy:{params_kw=}')
if params_kw.cellphone is None:
return {
"status": "error",
"data":{
"message": "需输入手机号"
}
}
if params_kw.sms_code is None:
return {
"status": "error",
"data": {
"message": "需输入验证码"
}
}
if params_kw.key is None:
return {
"status": "error",
"data": {
"message": "需要短信验证key"
}
}
f = await sms_engine.check_sms_code(params_kw.key, params_kw.sms_code)
if not f:
return {
"status": "error",
"data": {
"message": "手机短信验证码出错"
}
}
ns = {
"username": params_kw.cellphone,
"password": "^&%UHI",
"cfm_password": "^&%UHI",
"mobile": params_kw.cellphone,
"user_status": "0"
}
udata = DictObject(**ns)
async with get_sor_context(request._run_ns, 'rbac') as sor:
recs = await sor.R('users', {'mobile': params_kw.cellphone})
if recs:
if len(recs) == 1:
r = recs[0]
# Update last_login
await sor.U('users', {'id': r.id}, {
'last_login': curDateString('%Y-%m-%d %H:%M:%S'),
'login_fail_count': 0,
'last_login_fail': None
})
await remember_user(r.id, username=r.username, userorgid=r.orgid)
return {
"status": "ok",
"data":{
"user": r
}
}
if params_kw.selected_id:
for r in recs:
if r.id == params_kw.selected_id:
# Update last_login
await sor.U('users', {'id': r.id}, {
'last_login': curDateString('%Y-%m-%d %H:%M:%S'),
'login_fail_count': 0,
'last_login_fail': None
})
await remember_user(r.id, username=r.username, userorgid=r.orgid)
return {
"status": "ok",
"data":{
"user": r
}
}
else:
return {
"status": "choose",
"data": {
"users": recs
}
}
d = await register_user(sor, udata)
if d['status'] == 'error':
return d
try:
ownerid = await get_owner_orgid(sor, orgid)
await openCustomerAccounts(sor, ownerid, orgid)
except Exception as e:
exception(f'{e}')
r = d['data']['user']
await remember_user(r.id, username=r.username, userorgid=r.orgid)