kboss/kgadget/src/get_id_by_phone_dd.py
2025-07-16 14:27:17 +08:00

51 lines
1.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from aiohttp import client as aiohttp_client
from appPublic.rc4 import unpassword
from sqlor.dbpools import DBPools
# 获取token
async def get_token(appkey, appsecret):
url = f"https://oapi.dingtalk.com/gettoken?appkey={appkey}&appsecret={appsecret}"
async with aiohttp_client.request("GET", url) as res:
if res.status == 200:
data = await res.json()
if data['errcode'] != 0:
return False, data['errmsg']
return True, data['access_token']
else:
return False
# 钉钉通过手机号获取id
async def get_id_by_phone(orgid, phone):
if not phone.isdigit():
return {"status": False, "msg": f"手机号格式错误:{phone}"}
db = DBPools()
data = []
async with db.sqlorContext('kboss') as sor:
data = await sor.R("apv_key", {"orgid": orgid, "del_flg": 0})
if not data:
return {"status": False, "msg": f"未找到密钥,请检查参数,orgid:{orgid}"}
data = data[0]
# 解密
appkey = unpassword(data['app_key'])
appsecret = unpassword(data['app_secret'])
ok, token = await get_token(appkey, appsecret)
if not ok:
return {"status": False, "msg": f"获取token失败,{token}"}
url = f"https://oapi.dingtalk.com/topapi/v2/user/getbymobile?access_token={token}"
body = {
"mobile": phone,
"support_exclusive_account_search": "true"
}
async with aiohttp_client.request("POST", url, json=body) as res:
if res.status == 200:
data = await res.json()
if data['errcode'] == 0:
return {"status": True, "user_id": data['result']['userid']}
else:
return {"status": False, "msg": f"通过手机号获取userid失败,{data['errmsg']},phone:{phone}"}
else:
return {"status": False, "msg": f"获取userid失败:token:{token},phone:{phone}"}