kboss/b/user/logintype.dspy
2026-04-28 17:47:26 +08:00

247 lines
11 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def handle_login_failed(user_name):
from datetime import datetime, timedelta
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
current_time = datetime.now()
# 1. 查询该用户当前的失败记录
record = await sor.sqlExe("SELECT first_failed_time, failed_count FROM login_fail_log WHERE user_name = '%s'" % user_name, {})
if not record:
# === 第一次失败 ===
await sor.sqlExe("""
INSERT INTO login_fail_log (user_name, first_failed_time, failed_count)
VALUES ('%s', '%s', 1)
""" % (user_name, current_time), {})
return # 正常失败,不需要等待
first_failed_time = record[0]['first_failed_time']
failed_count = record[0]['failed_count']
first_failed_time = datetime.strptime(first_failed_time, '%Y-%m-%d %H:%M:%S')
# 2. 判断是否还在10分钟窗口内
time_diff = (current_time - first_failed_time).total_seconds() / 60 # 分钟
if time_diff <= 10:
# 还在10分钟窗口内失败次数+1
new_count = failed_count + 1
if new_count >= 3:
# === 第三次及以上失败:触发锁定 ===
# 更新失败次数,并记录锁定开始时间
await sor.sqlExe("""
UPDATE login_fail_log
SET failed_count = 3, last_failed_time = '%s', lock_until = '%s'
WHERE user_name = '%s'
""" % (current_time, current_time + timedelta(minutes=20), user_name), {})
print("十分钟内登录失败三次请等待20分钟后再试")
else:
# 第二次失败
await sor.sqlExe("""
UPDATE login_fail_log
SET failed_count = '%s', last_failed_time = '%s'
WHERE user_name = '%s'
""" % (new_count, current_time, user_name), {})
# 可选:返回还剩几次机会
if new_count == 2:
print("登录失败还剩1次机会10分钟内")
else:
# === 已经超过10分钟窗口重置计数器 ===
await sor.sqlExe("""
UPDATE login_fail_log
SET first_failed_time = '%s', failed_count = 1, last_failed_time = '%s', lock_until = NULL
WHERE user_name = '%s'
""" % (current_time, current_time, user_name), {})
except Exception as e:
print("exception:", user_name, e)
return {'status': False, 'msg': '登录操作失败', 'error': str(e)}
async def check_login_allowed(user_name):
from datetime import datetime, timedelta
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
record = await sor.sqlExe("SELECT lock_until, first_failed_time, failed_count FROM login_fail_log WHERE user_name = '%s'" % user_name, {})
current_time = datetime.now()
if not record:
return {'status': True} # 允许登录
record = record[0]
if record.get('lock_until'):
record['lock_until'] = datetime.strptime(record['lock_until'], '%Y-%m-%d %H:%M:%S')
# 检查是否处于锁定状态
if record.get('lock_until') and record['lock_until'] > current_time:
wait_minutes = (record['lock_until'] - current_time).total_seconds() // 60
msg = f"十分钟内登录失败三次,请等待{int(wait_minutes)}分钟后再试"
return {
'status': False,
'msg': msg
}
# 如果锁定时间已过,但还在检查窗口,也可以重置(可选)
return {'status': True} # 允许登录
except Exception as e:
print("exception:", user_name, e)
return {'status': False, 'msg': '登录操作失败, %s' % str(e)}
async def logintype(ns):
"""
1、判断用户是否为主级如果在reseller没要找到数据证明就是主级
2、判断主级的域名跟前段传递的域名是否一致
3、
:param ns:
:return:
"""
# 兼容手机号登录
if ns.get('password'):
# 密码解密
data = """-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEApJ3ThUWT3CgvH0O8rrT6qycpqX0NTq4Q3CxBrvNxo9//qX2b
KvhomoLNd+vdti8xNOK6/3zuTJIVt0RoNKwE0HWMR8H0jgp7ING54DtT5B8bhmUp
bs/hownGzIBGOedhqeOPiv0Q5oSi9OIEE+PK2L8KdgFF2Z6Q1DQdv5Y1qvD/t2mJ
VjR+NPTwcwBIT8UJ0Cfu8lqHjjJbNF//smTjQ8v2pnqp19jItuHeD4G4u7a8fWC3
/IGEv4+uc5rq5qhwdzRxHUveNmoE+nyh0T8RC8Y8/XLkEiD0nMvZZjBn7Bof6f1s
t0aqJX8R1VGvzdTJ8eTvJuyMNsR4wLoF5PvxhQIDAQABAoIBAAkz6MMdL6kk9sI8
F2YUKHG9cvI07Obs8XTAy9Wlag34raJ8RI3Z002hnS3K5tssUKDqdMloFff744qA
ulHV6AyYKOtK2yQk6KgRPkX13lIoSj6jx/XLWb7mnGskV+JQa1pQpkUYsenHmOCX
auKKko8cpwLcpI4IJx3MSBMRqUoEFgDAZd98tpKyLHzqJN4CL9MkC3jkJcenIV6V
CDXFWaf2l+tjlBOU6RbOvNi3WSDav0ZEJa58Irw077YajF9tp+7/TRqEXvvrYjJG
rneKYlhUpsHA6ROgEHNKFYePqcWl83SB8nLQjFFYxs12rIQVB/+f0Q3rkf8z/oS5
Q2tSNEUCgYEAuBUjQw5FhNCZpSd6L7x47iIXw/qlz4Ovs3riapwBBhMkvmIL8c4N
rDZVk5VuiGL0RzUMV17Br+vdWP2+LGTDPqNJywQ49zrUYFd29hXeIuUMXQZWKTnG
QTjKKu8gcWAu2aa3S21YO5GKRuaF+LDO5Ny6CRqyyvjkA+Joh0pHQtMCgYEA5O3O
lKiKZSKfvh4o/60QB3a7phaEua0RmXLc7m03gcKyqL/yYfUujKPGqKz06rYXJiEz
BhY1d8KoGKBXPWXm+NmLwSbDgXcYD0Iv3S+2mjKz2IClKVGLu3F3+Y+sDLPSYOh+
uuZc5EBIr394FDcIi1yzybY14C7pIFBobkh6U0cCgYEAp/fhrRYgqQgOqnxTKau4
TNIfDVxwX+GxOPwwDUc8R+a/6Fhevc1e/Hy3qWr46MoiS7dEA5Ua9wMpfkJKfQeU
gFtOwkigEjdEAoTMr89UWlLV7SwTApsNJAZFstdvqwQwu7k8Fz82+2PZ4MgIBtyO
bJZbymLqMfrZVBILfydvzrECgYEAptMdLXu8joKywgqceeUVvzqo+BmE0wHpx1bK
5Py17FuDcjGeYgAX0ekvRY/0Um4rZF/hHNhmUkWJ1ZhSuq1dqY5up2ymH9w3rjbf
RmIUwT2djXIB6aBBIXFIqpYmHIOWFYSXhX+FxdeKoElVqfWIxnXhlmav0BSfUisG
TweGDNUCgYAQjIbPI2/dsVmb1dkV0rohVaARO0wQGcIMfc5qXzQOIwrprYcBGSa9
hos4TY/TZt/Kw0yAqnwiJ/6k89I67DRx30mu5ORgrzyolH5syKgbuJuRK7hE7708
48mkFM5MR+RavHbQt3IY8GN4x05bm8uzaiV3WsAGUNukkkyN6VW1WA==
-----END RSA PRIVATE KEY-----"""
private_key = RSA.importKey(data)
cipher = PKCS1_v1_5.new(private_key)
back_text = cipher.decrypt(base64.b64decode(ns['password']), 0)
ns['password'] = back_text.decode('utf-8')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
domain_name = ns.get('domain_name')
if domain_name in ['www.opencomputing.cn', 'dev.opencomputing.cn', 'localhost:9527'] and ns.get('username') not in ['开元云(北京)科技有限公司', 'admin', 'kyy_root']:
# 登录失败次数限制
login_allowed = await check_login_allowed(ns.get('username'))
if not login_allowed.get('status'):
return {'status': False, 'msg': login_allowed.get('msg')}
if not ns.get('mobile'):
return {
'status': False,
'msg': '请输入手机号'
}
real_mobile_li = await sor.R('users', {'username': ns['username']})
if not real_mobile_li:
return {
'status': False,
'msg': '用户名有误'
}
real_mobile = real_mobile_li[0]['mobile'] if real_mobile_li else None
if not real_mobile:
return {
'status': False,
'msg': '未查询到匹配得手机号码'
}
if real_mobile and ns['mobile'] != real_mobile:
return {
'status': False,
'msg': '您的手机号与用户名不匹配'
}
if not ns.get('codeid'):
return {
'status': False,
'msg': 'codeid不存在, 请输入验证码'
}
code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')})
if len(code) < 1:
await handle_login_failed(ns.get('username'))
return {'status': False, 'msg': '验证码不正确'}
password = password_encode(ns['password'])
users = await sor.R('users', {'username': ns.get('username'), 'password': password})
if len(users) < 1:
await handle_login_failed(ns.get('username'))
return {"status": False,'msg':'用户名或密码错误'}
return {'status': True}
else:
type = 0
if ns.get('codeid'):
type += 1
code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')})
if len(code) < 1:
return {'status': False, 'msg': '验证码不正确'}
if type == 1:
# 手机号登录
users = await sor.R('users', {'mobile': ns.get('username')})
else:
password = password_encode(ns['password'])
users = await sor.R('users', {'username': ns.get('username'), 'password': password})
if len(users) < 1:
await handle_login_failed(ns.get('username'))
return {"status": False,'msg':'用户名或密码错误'}
elif ns.get('username') == "admin":
return {'status': True}
else:
reseller = await sor.R('reseller', {'domain_name': ns.get('domain_name')})
# 查到用户的所在机构
user_org = await sor.R('organization', {'id': users[0]['orgid']})
#0代表用户为主 1代表用户为子
user_type = 0
# 子页面判断用户
if len(reseller) >= 1:
if reseller[0]['orgid'] == user_org[0]['parentid']:
user_type += 1
if reseller[0]['orgid'] == users[0]['orgid']:
user_type += 1
# 主页面判断用户
else:
resellers = await sor.R('reseller', {'orgid': user_org[0]['parentid']})
if len(resellers) >= 1:
user_type += 1
if users[0]['user_reseller'] == '1':
user_type += 1
# 证明是主级页面
if len(reseller) < 1:
# 用户为主
if user_type == 0:
return {'status': True}
else:
# 用户为子
return {'status': False,'msg':'用户名或密码错误'}
# 子级页面
else:
# 用户为主
if user_type == 0:
return {'status': False,'msg':'用户名或密码错误'}
# 用户为子
else:
return {'status': True}
ret = await logintype(params_kw)
return ret