async def handle_login_failed(user_name): from datetime import datetime, timedelta db = DBPools() async with db.sqlorContext('kboss') as sor: try: current_time = datetime.now() # 1. 查询该用户当前的失败记录 record = await sor.sqlExe("SELECT first_failed_time, failed_count FROM login_fail_log WHERE user_name = '%s'" % user_name, {}) if not record: # === 第一次失败 === await sor.sqlExe(""" INSERT INTO login_fail_log (user_name, first_failed_time, failed_count) VALUES ('%s', '%s', 1) """ % (user_name, current_time), {}) return # 正常失败,不需要等待 first_failed_time = record[0]['first_failed_time'] failed_count = record[0]['failed_count'] first_failed_time = datetime.strptime(first_failed_time, '%Y-%m-%d %H:%M:%S') # 2. 判断是否还在10分钟窗口内 time_diff = (current_time - first_failed_time).total_seconds() / 60 # 分钟 if time_diff <= 10: # 还在10分钟窗口内,失败次数+1 new_count = failed_count + 1 if new_count >= 3: # === 第三次及以上失败:触发锁定 === # 更新失败次数,并记录锁定开始时间 await sor.sqlExe(""" UPDATE login_fail_log SET failed_count = 3, last_failed_time = '%s', lock_until = '%s' WHERE user_name = '%s' """ % (current_time, current_time + timedelta(minutes=20), user_name), {}) print("十分钟内登录失败三次,请等待20分钟后再试") else: # 第二次失败 await sor.sqlExe(""" UPDATE login_fail_log SET failed_count = '%s', last_failed_time = '%s' WHERE user_name = '%s' """ % (new_count, current_time, user_name), {}) # 可选:返回还剩几次机会 if new_count == 2: print("登录失败,还剩1次机会(10分钟内)") else: # === 已经超过10分钟窗口,重置计数器 === await sor.sqlExe(""" UPDATE login_fail_log SET first_failed_time = '%s', failed_count = 1, last_failed_time = '%s', lock_until = NULL WHERE user_name = '%s' """ % (current_time, current_time, user_name), {}) except Exception as e: print("exception:", user_name, e) return {'status': False, 'msg': '登录操作失败', 'error': str(e)} async def check_login_allowed(user_name): from datetime import datetime, timedelta db = DBPools() async with db.sqlorContext('kboss') as sor: try: record = await sor.sqlExe("SELECT lock_until, first_failed_time, failed_count FROM login_fail_log WHERE user_name = '%s'" % user_name, {}) current_time = datetime.now() if not record: return {'status': True} # 允许登录 record = record[0] if record.get('lock_until'): record['lock_until'] = datetime.strptime(record['lock_until'], '%Y-%m-%d %H:%M:%S') # 检查是否处于锁定状态 if record.get('lock_until') and record['lock_until'] > current_time: wait_minutes = (record['lock_until'] - current_time).total_seconds() // 60 msg = f"十分钟内登录失败三次,请等待{int(wait_minutes)}分钟后再试" return { 'status': False, 'msg': msg } # 如果锁定时间已过,但还在检查窗口,也可以重置(可选) return {'status': True} # 允许登录 except Exception as e: print("exception:", user_name, e) return {'status': False, 'msg': '登录操作失败, %s' % str(e)} async def logintype(ns): """ 1、判断用户是否为主级(如果在reseller没要找到数据,证明就是主级) 2、判断主级的域名跟前段传递的域名是否一致 3、 :param ns: :return: """ # 兼容手机号登录 if ns.get('password'): # 密码解密 data = """-----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEApJ3ThUWT3CgvH0O8rrT6qycpqX0NTq4Q3CxBrvNxo9//qX2b KvhomoLNd+vdti8xNOK6/3zuTJIVt0RoNKwE0HWMR8H0jgp7ING54DtT5B8bhmUp bs/hownGzIBGOedhqeOPiv0Q5oSi9OIEE+PK2L8KdgFF2Z6Q1DQdv5Y1qvD/t2mJ VjR+NPTwcwBIT8UJ0Cfu8lqHjjJbNF//smTjQ8v2pnqp19jItuHeD4G4u7a8fWC3 /IGEv4+uc5rq5qhwdzRxHUveNmoE+nyh0T8RC8Y8/XLkEiD0nMvZZjBn7Bof6f1s t0aqJX8R1VGvzdTJ8eTvJuyMNsR4wLoF5PvxhQIDAQABAoIBAAkz6MMdL6kk9sI8 F2YUKHG9cvI07Obs8XTAy9Wlag34raJ8RI3Z002hnS3K5tssUKDqdMloFff744qA ulHV6AyYKOtK2yQk6KgRPkX13lIoSj6jx/XLWb7mnGskV+JQa1pQpkUYsenHmOCX auKKko8cpwLcpI4IJx3MSBMRqUoEFgDAZd98tpKyLHzqJN4CL9MkC3jkJcenIV6V CDXFWaf2l+tjlBOU6RbOvNi3WSDav0ZEJa58Irw077YajF9tp+7/TRqEXvvrYjJG rneKYlhUpsHA6ROgEHNKFYePqcWl83SB8nLQjFFYxs12rIQVB/+f0Q3rkf8z/oS5 Q2tSNEUCgYEAuBUjQw5FhNCZpSd6L7x47iIXw/qlz4Ovs3riapwBBhMkvmIL8c4N rDZVk5VuiGL0RzUMV17Br+vdWP2+LGTDPqNJywQ49zrUYFd29hXeIuUMXQZWKTnG QTjKKu8gcWAu2aa3S21YO5GKRuaF+LDO5Ny6CRqyyvjkA+Joh0pHQtMCgYEA5O3O lKiKZSKfvh4o/60QB3a7phaEua0RmXLc7m03gcKyqL/yYfUujKPGqKz06rYXJiEz BhY1d8KoGKBXPWXm+NmLwSbDgXcYD0Iv3S+2mjKz2IClKVGLu3F3+Y+sDLPSYOh+ uuZc5EBIr394FDcIi1yzybY14C7pIFBobkh6U0cCgYEAp/fhrRYgqQgOqnxTKau4 TNIfDVxwX+GxOPwwDUc8R+a/6Fhevc1e/Hy3qWr46MoiS7dEA5Ua9wMpfkJKfQeU gFtOwkigEjdEAoTMr89UWlLV7SwTApsNJAZFstdvqwQwu7k8Fz82+2PZ4MgIBtyO bJZbymLqMfrZVBILfydvzrECgYEAptMdLXu8joKywgqceeUVvzqo+BmE0wHpx1bK 5Py17FuDcjGeYgAX0ekvRY/0Um4rZF/hHNhmUkWJ1ZhSuq1dqY5up2ymH9w3rjbf RmIUwT2djXIB6aBBIXFIqpYmHIOWFYSXhX+FxdeKoElVqfWIxnXhlmav0BSfUisG TweGDNUCgYAQjIbPI2/dsVmb1dkV0rohVaARO0wQGcIMfc5qXzQOIwrprYcBGSa9 hos4TY/TZt/Kw0yAqnwiJ/6k89I67DRx30mu5ORgrzyolH5syKgbuJuRK7hE7708 48mkFM5MR+RavHbQt3IY8GN4x05bm8uzaiV3WsAGUNukkkyN6VW1WA== -----END RSA PRIVATE KEY-----""" private_key = RSA.importKey(data) cipher = PKCS1_v1_5.new(private_key) back_text = cipher.decrypt(base64.b64decode(ns['password']), 0) ns['password'] = back_text.decode('utf-8') db = DBPools() async with db.sqlorContext('kboss') as sor: domain_name = ns.get('domain_name') if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527'] and ns.get('username') not in ['开元云(北京)科技有限公司', 'admin', 'kyy_root', 'kyy_kaiyuan']: # 登录失败次数限制 login_allowed = await check_login_allowed(ns.get('username')) if not login_allowed.get('status'): return {'status': False, 'msg': login_allowed.get('msg')} if not ns.get('mobile'): return { 'status': False, 'msg': '请输入手机号' } real_mobile_li = await sor.R('users', {'username': ns['username']}) if not real_mobile_li: return { 'status': False, 'msg': '用户名有误' } real_mobile = real_mobile_li[0]['mobile'] if real_mobile_li else None if not real_mobile: return { 'status': False, 'msg': '未查询到匹配得手机号码' } if real_mobile and ns['mobile'] != real_mobile: return { 'status': False, 'msg': '您的手机号与用户名不匹配' } if not ns.get('codeid'): return { 'status': False, 'msg': 'codeid不存在, 请输入验证码' } code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')}) if len(code) < 1: await handle_login_failed(ns.get('username')) return {'status': False, 'msg': '验证码不正确'} password = password_encode(ns['password']) users = await sor.R('users', {'username': ns.get('username'), 'password': password}) if len(users) < 1: await handle_login_failed(ns.get('username')) return {"status": False,'msg':'用户名或密码错误'} return {'status': True} else: type = 0 if ns.get('codeid'): type += 1 code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')}) if len(code) < 1: return {'status': False, 'msg': '验证码不正确'} if type == 1: # 手机号登录 users = await sor.R('users', {'mobile': ns.get('username')}) else: password = password_encode(ns['password']) users = await sor.R('users', {'username': ns.get('username'), 'password': password}) if len(users) < 1: await handle_login_failed(ns.get('username')) return {"status": False,'msg':'用户名或密码错误'} elif ns.get('username') == "admin": return {'status': True} else: reseller = await sor.R('reseller', {'domain_name': ns.get('domain_name')}) # 查到用户的所在机构 user_org = await sor.R('organization', {'id': users[0]['orgid']}) #0代表用户为主 1代表用户为子 user_type = 0 # 子页面判断用户 if len(reseller) >= 1: if reseller[0]['orgid'] == user_org[0]['parentid']: user_type += 1 if reseller[0]['orgid'] == users[0]['orgid']: user_type += 1 # 主页面判断用户 else: resellers = await sor.R('reseller', {'orgid': user_org[0]['parentid']}) if len(resellers) >= 1: user_type += 1 if users[0]['user_reseller'] == '1': user_type += 1 # 证明是主级页面 if len(reseller) < 1: # 用户为主 if user_type == 0: return {'status': True} else: # 用户为子 return {'status': False,'msg':'用户名或密码错误'} # 子级页面 else: # 用户为主 if user_type == 0: return {'status': False,'msg':'用户名或密码错误'} # 用户为子 else: return {'status': True} ret = await logintype(params_kw) return ret