debug(f'{params_kw=}') ns = { "username":params_kw.username, "password":password_encode(params_kw.password) } info(f'{ns=}') db = DBPools() dbname = get_module_dbname('rbac') async with db.sqlorContext(dbname) as sor: # Get user record with lockout fields (standard SQL) r = await sor.sqlExe('select * from users where username=${username}$', ns.copy()) if len(r) == 0: return { "widgettype":"Error", "options":{ "timeout":3, "title":"Login Error", "message":"user name or password error" } } user = r[0] # Lockout check in Python (DB-agnostic) fail_count = getattr(user, 'login_fail_count', 0) or 0 last_fail = getattr(user, 'last_login_fail', None) if fail_count >= 3 and last_fail: try: stored = datetime.strptime(str(last_fail), '%Y-%m-%d %H:%M:%S') elapsed = (datetime.now() - stored).total_seconds() if elapsed < 300: return { "widgettype":"Error", "options":{ "timeout":5, "title":"Account Locked", "message":"Account locked due to too many failed login attempts. Please try again in 5 minutes." } } else: # Lockout expired, reset in background (best-effort) await sor.sqlExe(""" UPDATE users SET login_fail_count = 0, last_login_fail = NULL WHERE id = ${id}$ """, {'id': user.id}) except (ValueError, TypeError): pass # Verify password r = await sor.sqlExe('select * from users where username=${username}$ and password=${password}$', ns.copy()) if len(r) == 0: # Atomic increment -- standard SQL now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') await sor.sqlExe(""" UPDATE users SET login_fail_count = login_fail_count + 1, last_login_fail = ${now}$ WHERE id = ${id}$ """, {'id': user.id, 'now': now_str}) new_fail_count = fail_count + 1 if new_fail_count >= 3: msg = "Too many failed attempts. Account locked for 5 minutes." else: msg = f"user name or password error ({3 - new_fail_count} attempts remaining)" return { "widgettype":"Error", "options":{ "timeout":3, "title":"Login Error", "message": msg } } # Success -- atomic reset now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') await sor.sqlExe(""" UPDATE users SET login_fail_count = 0, last_login_fail = NULL, last_login = ${now}$ WHERE id = ${id}$ """, {'id': user.id, 'now': now_str}) await remember_user(r[0].id, username=r[0].username, userorgid=r[0].orgid) return { "widgettype":"Message", "options":{ "timeout":3, "auto_open":True, "title":"Login", "message":f"{r[0].username} Welcome back" }, "binds":[ { "wid":"self", "event":"dismissed", "actiontype":"urlwidget", "target":"window.user_container", "options":{ "url":entire_url('/rbac/user/userinfo.ui') } }, { "wid":"self", "event":"dismissed", "actiontype":"script", "target": f'body.login_window', "script":"this.destroy()" } ] } return { "widgettype":"Error", "options":{ "timeout":3, "title":"Login Error", "message":"system error" } }