pcapi/app/ldap/ldapOperate.py
2025-07-16 14:46:24 +08:00

101 lines
2.9 KiB
Python

from ldap3 import Server, Connection, ALL, NTLM, SUBTREE,MODIFY_REPLACE
import json
# LDAP服务器信息
# ldap_server_uri = 'ldap://127.0.0.1:7389' # 或者 ldaps://your-ldap-server-secure
ldap_server_uri = 'ldap://10.8.64.15' # 或者 ldaps://your-ldap-server-secure
ldap_user = 'cn=admin,dc=test,dc=com'
ldap_password = '123456'
ldap_base = 'dc=test,dc=com'
# 创建LDAP服务器对象
server = Server(ldap_server_uri, get_info=ALL)
# 创建连接对象并绑定用户
conn = Connection(server, user=ldap_user, password=ldap_password, auto_bind=True)
def get_all_ldap_user():
# 搜索条目
search_filter = '(objectClass=person)'
search_attribute = ['cn', 'sn', 'mail']
conn.search(search_base=ldap_base,
search_filter=search_filter,
search_scope=SUBTREE,
attributes=search_attribute)
result=[ json.loads(x.entry_to_json())for x in conn.entries]
return result
def get_all_ldap_cn():
# 搜索条目
search_filter = '(objectClass=posixGroup)'
search_attribute = ['cn', 'objectClass', 'gidNumber']
conn.search(search_base=ldap_base,
search_filter=search_filter,
search_scope=SUBTREE,
attributes=search_attribute)
result=[ json.loads(x.entry_to_json())for x in conn.entries]
return result
def get_one_cn(cn):
# 搜索条目
search_filter = f'(&(cn={cn})(objectClass=posixGroup))'
search_attribute = ['cn', 'objectClass', 'gidNumber']
conn.search(search_base=ldap_base,
search_filter=search_filter,
search_scope=SUBTREE,
attributes=search_attribute
)
if conn.entries is None:
return None
else:
return json.loads(conn.entries[0].entry_to_json())
''''
传参示例
# uid="test_add1"
# plaintext_password="654321"
# uid_number=123456
# cn="test"
# add_ldap_user(uid,plaintext_password,cn)
'''
def add_ldap_user(uid,uid_number,plaintext_password,cn ):
cn_attr=get_one_cn(cn)
new_user_dn=f"uid={uid},ou=test,{ldap_base}"
new_user_attrs={
"objectClass": ["top", "posixAccount", "inetOrgPerson", "shadowAccount"],
"uidNumber":uid_number,
"gidNumber":cn_attr["attributes"]["gidNumber"],
'sn':[uid],
'loginShell': ["/bin/bash"],
'homeDirectory':["/srv/nfs/"+uid],
'cn':[cn]
}
flag=conn.add(new_user_dn,new_user_attrs["objectClass"],new_user_attrs)
print(conn.result)
if flag is True:
return modify_password(new_user_dn, plaintext_password)
else:
return conn.result
def modify_password(new_user_dn,plaintext_password):
mod_attrs = {
'userPassword': (
MODIFY_REPLACE,[plaintext_password])
}
conn.modify(new_user_dn, mod_attrs)
return conn.result
def delete_ldap_user(uid):
user_dn=f"uid={uid},ou=test,{ldap_base}"
conn.delete(user_dn)
return conn.result