kboss/kgadget/src/send_apv.py
2025-07-16 14:27:17 +08:00

441 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
from string import Template
from alibabacloud_dingtalk.oauth2_1_0 import models as dingtalkoauth_2__1__0_models
from alibabacloud_dingtalk.oauth2_1_0.client import Client as dingtalkoauth2_1_0Client
from alibabacloud_dingtalk.workflow_1_0 import models as dingtalkworkflow__1__0_models
from alibabacloud_dingtalk.workflow_1_0.client import Client as dingtalkworkflow_1_0Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from appPublic.rc4 import unpassword
from appPublic.uniqueID import getID as uuid
from sqlor.dbpools import DBPools
from get_id_by_phone_dd import get_id_by_phone
class APV:
"""
审批流
"""
def __init__(self):
self.client_auth2 = self.create_client_auto2()
self.client_workflow = self.create_client_workflow()
async def get_key(self, orgid=None) -> (dict, str):
"""
根据用户id获取app_key和app_secret
:param orgid: 机构id
:return: app_key,app_secret
"""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
sql = "select * from apv_key where del_flg = ${del_flg}$ and orgid = ${orgid}$ limit 1"
data = await sor.sqlExe(sql, {"del_flg": "0", "orgid": orgid})
if not data:
return {}, "未找到审批配置,orgid:" + orgid + ",请先配置key"
else:
data = data[0]
data["app_key"] = unpassword(data["app_key"])
data["app_secret"] = unpassword(data["app_secret"])
data["http_aes_key"] = unpassword(data["http_aes_key"])
data["http_token"] = unpassword(data["http_token"])
return data, ""
except Exception as e:
return {}, f"orgid:{orgid},获取key失败:{e}"
async def get_process_code(self, orgid, business_id):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
sql = "select process_code from apv_send_key where del_flg = ${del_flg}$ and orgid = ${orgid}$ and business_id=${business_id}$ limit 1"
data = await sor.sqlExe(sql, {"del_flg": "0", "orgid": orgid, "business_id": business_id})
if not data:
return {}, "未找到审批单,orgid:{},business_id:{}".format(orgid, business_id)
else:
data = data[0]
return data, ""
except Exception as e:
return {}, f"未找到审批单:{orgid},获取失败:{e}"
def create_client_auto2(self) -> dingtalkoauth2_1_0Client:
"""
使用 Token 初始化账号Client
:return: Client
"""
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalkoauth2_1_0Client(config)
def create_client_workflow(self) -> dingtalkworkflow_1_0Client:
"""
使用 Token 初始化账号Client
@return: Client
@throws Exception
"""
config = open_api_models.Config()
config.protocol = 'https'
config.region_id = 'central'
return dingtalkworkflow_1_0Client(config)
async def get_token(self, app_key, app_secret) -> (str, str):
"""
获取token
:param app_key: app_key
:param app_secret: app_secret
:return: token,error
"""
if not app_key or not app_secret:
return "", "app_key或app_secret不能为空"
err = ""
get_access_token_request = dingtalkoauth_2__1__0_models.GetAccessTokenRequest(
app_key=app_key,
app_secret=app_secret)
try:
resp = self.client_auth2.get_access_token(get_access_token_request)
token = resp.body.to_map()['accessToken']
except Exception as e:
err = "获取token失败,err:" + str(e)
return "", err
return token, err
async def start_process_instance(self, token: str, process_code: str, originator_user_id: str, approvers: list, form_component: dict) -> (str, str):
"""
发起审批流
:param token: token
:param process_code: 审批流模板唯一标识
:param originator_user_id: 发起人id
:param approvers: 审批人id
:param form_component: 表单参数
:return: error,resp
"""
start_process_instance_headers = dingtalkworkflow__1__0_models.StartProcessInstanceHeaders()
start_process_instance_headers.x_acs_dingtalk_access_token = token
start_process_instance_request = dingtalkworkflow__1__0_models.StartProcessInstanceRequest(
# 发起人
originator_user_id=originator_user_id,
# 审批模板 id
process_code=process_code,
# 直接指定的审批人列表最大列表长度20。
approvers=self.get_process_config(approvers),
# 表单参数
form_component_values=self.get_form_component_values(form_component),
)
try:
resp = self.client_workflow.start_process_instance_with_options(start_process_instance_request, start_process_instance_headers, util_models.RuntimeOptions())
return resp.body.to_map(), ""
except Exception as err:
return "", "钉钉报错:" + str(err)
# 审批流配置
def get_process_config(self, approvers_data: list) -> list:
"""
审批流配置
:param approvers_data: 审批人数据 [list],第一个参数为审批类型,后面为审批人id
:return: 审批流配置
"""
approvers = []
for data in approvers_data:
approver = dingtalkworkflow__1__0_models.StartProcessInstanceRequestApprovers(action_type=data[0], user_ids=data[1:])
approvers.append(approver)
return approvers
# 获取表单数据,并匹配模板
async def get_form_component_data(self, business_id: str, from_data: dict) -> (dict, str):
"""
获取表单数据,并匹配模板
:param business_id: 业务id
:param from_data: 表单数据
:return: 匹配模板后表单数据
"""
# 获取模板
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
data = await sor.R("apv_content_template", {"del_flg": "0", "business_id": business_id, "sord": "update_at desc"})
if not data:
return {}, f"未找到模板,请检查数据库:business_id:{business_id}"
else:
data = data[0]
except Exception as e:
err = f"获取模板失败:business_id:{business_id}:error:{e}"
raise Exception(err)
T = Template(data['title_template'])
C = Template(data['detail_template'])
# 匹配模板
try:
form_component_data = {
"title": T.substitute(from_data["title"]),
"detail": C.substitute(from_data["detail"]),
}
except Exception as e:
err = f"匹配模板参数失败:business_id:{business_id}:error:{e}"
return {}, err
return form_component_data, ""
# 表单参数配置
def get_form_component_values(self, form_component_values_data: dict) -> list:
"""
表单参数配置
:param form_component_values_data: 表单参数数据
:return: 表单参数配置
"""
form_component_values_datas = []
form_component_values_0 = dingtalkworkflow__1__0_models.StartProcessInstanceRequestFormComponentValues(
name='申请内容',
value=form_component_values_data['title'],
)
form_component_values_1 = dingtalkworkflow__1__0_models.StartProcessInstanceRequestFormComponentValues(
name='审批详情',
value=form_component_values_data['detail'],
)
form_component_values_datas.append(form_component_values_0)
form_component_values_datas.append(form_component_values_1)
return form_component_values_datas
# 获取审批流数据
async def get_approvals(self, orgid: str, business_id: str) -> (list, str):
"""
获取审批流数据
:param orgid: 机构id
:return:
"""
err = ""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
data = await sor.R("apv_flow", {"orgid": orgid, "del_flg": "0", "business_id": business_id, "sort": "level,apv_dd_user_id"})
if not data:
err = f"未找到审批流数据,请检查数据库:orgid:{orgid},business_id:{business_id}"
return [], err
except Exception as e:
err = f"orgid:{orgid},获取key失败:{e}"
raise Exception(err)
approver_data_0 = []
approver_data_1 = []
approver_data_2 = []
for data in data:
if data["level"] == "1":
if len(approver_data_0) == 0:
approver_data_0.append(data["mode"])
approver_data_0.append(data["apv_dd_user_id"])
elif data["level"] == "2":
if len(approver_data_1) == 0:
approver_data_1.append(data["mode"])
approver_data_1.append(data["apv_dd_user_id"])
elif data["level"] == "3":
if len(approver_data_2) == 0:
approver_data_2.append(data["mode"])
approver_data_2.append(data["apv_dd_user_id"])
else:
err = f"orgid:{orgid},审批人配置错误:级别不存在:lavel:{data['level']}"
return [], err
approvers = []
if len(approver_data_0) < 2:
return [], f"orgid:{orgid},审批人配置错误:第一级审批人列表为长度0:请检查数据库"
for i in [approver_data_0, approver_data_1, approver_data_2]:
if len(i) > 1:
approvers.append(i)
if len(i) == 1:
return [], f"orgid:{orgid},审批人配置错误:审批长度为0:请检查数据库"
return approvers, err
# 获取业主组织id
async def get_base_orgid(self, orgid: str) -> str:
"""
获取业主组织id
:param orgid: 非业主组织id
"""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
data = await sor.R("organization", {"del_flg": "0", "org_type": 0})
if not data:
return None
else:
data = data[0]
except Exception as e:
err = f"获取组织id失败:orgid:{orgid}:error:{e}"
raise Exception(err)
return data['id']
# 发送审批
async def send_apv(self, phone: str, orgid: str, user_id: str, business_id: str, form_component: dict, result_org: str, original_id: str, apv_json: str) -> (str, str):
"""
发送审批
:param orgid: 结构 id
:param user_id: 用户id
:param business_id: 业务id
:param form_component: 表单数据
:param result_org:
:param original_id:
:param apv_json:
:return: 发送审批结果
"""
# 夸机构的业务,获取业主机构id
if str(business_id) in ["1", "2"]:
orgid = await self.get_base_orgid(orgid=orgid)
if not orgid:
return "", f"未找到业主机构id,business_id:{business_id}"
# 通过手机号获取发送人userid
resp = await get_id_by_phone(orgid=orgid, phone=phone)
if not resp['status']:
return "", resp['msg']
originator_user_id = resp['user_id']
# 获取key
key_data, err = await self.get_key(orgid=orgid)
if err != "":
return "", "get key error:" + err
app_key = key_data['app_key']
print(f"use keydata:{key_data}")
app_secret = key_data['app_secret']
token, err = await self.get_token(app_key=app_key, app_secret=app_secret)
if err != "":
return "", "get token error:" + err
# 获取取审单
get_process_code_data, err = await self.get_process_code(orgid=orgid, business_id=business_id)
if err != "":
return "", "get process_code error:" + err
process_code = get_process_code_data['process_code']
approvers, err = await self.get_approvals(orgid=orgid, business_id=business_id)
if err != "":
return "", "get approvers error:" + err
form_component_data, err = await self.get_form_component_data(business_id=business_id, from_data=form_component)
if err != "":
return "", "get form_component_data error:" + err
# 发送审批
resp, err = await self.start_process_instance(token=token, process_code=process_code, originator_user_id=originator_user_id, approvers=approvers, form_component=form_component_data)
if err != "":
return "", "start process instance error:" + err
else:
if "instanceId" not in str(resp):
return "", f"发送审批失败:resp:{resp}"
app_status = "start"
try:
# 保存数据入库
await self.save_data(orgid=orgid, user_id=user_id, business_id=business_id, process_code=process_code, apv_id=resp['instanceId'], apv_title=form_component_data['title'], apv_detail=form_component_data['detail'],
apv_sender=user_id, apv_status=app_status, result_org=result_org, original_id=original_id, apv_json=apv_json)
except Exception as e:
err = f"保存审批数据失败:{e}"
return "", err
return resp['instanceId'], err
async def save_data(self, orgid, user_id, business_id, process_code, apv_id, apv_title, apv_detail, apv_sender, apv_status, result_org, original_id, apv_json) -> str:
"""
保存审批数据
:param orgid: 业主机构id
:param user_id: 用户id
:param business_id: 业务id
:param process_code: 表单id
:param apv_id: 审批id
:param apv_title: 标题
:param apv_detail: 内容
:param apv_sender: 发起人
:param apv_status: 流程状态
:param original_id: 原充值日志id
:param apv_json: json
"""
data = {
"id": uuid(),
"user_id": user_id,
"orgid": orgid,
"result_org": result_org,
"business_id": business_id,
"process_code": process_code,
"apv_id": apv_id,
"apv_title": apv_title,
"apv_text": apv_detail,
"apv_sender": apv_sender,
"apv_start_time": f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
"apv_finish_time": None,
"apv_status": apv_status,
"original_id": original_id,
"apv_json": apv_json,
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
await sor.C("apv_data", data)
async def get_phone_by_id(orgid: str, business_id: str):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
data = await sor.R("apv_send_key", {"del_flg": "0", "orgid": orgid, "business_id": business_id})
if not data:
return None, f"get_phone_by_id ,未找到审批发送key,orgid:{orgid},business_id:{business_id}"
else:
data = data[0]
phone = data['send_dd_user_phone']
if not phone:
return None, f"get_phone_by_id ,未找到审批发送key,orgid:{orgid},business_id:{business_id}"
return phone, ""
except Exception as e:
err = f"get_phone_by_id :orgid:{orgid}:error:{e}"
return None, err
# 发送审批
async def issue_approve(phone: str, orgid: str, user_id: str, business_id: str, form_component: dict, result_org: str = "", original_id: str = "", apv_json: str = "") -> dict:
"""
发送审批
:param phone: 发送人手机号
:param orgid: 机构 id
:param user_id: 内部用户id
:param business_id: 业务id
:param form_component: 表单参数
:param result_org: 被执行机构
:param original_id: 原充值日志id
:param apv_json: json
:return: 发送审批结果
"""
a = APV()
response = {
"status": False
}
print(f"send apv args,"
f"phone:{phone},"
f"orgid:{orgid},"
f"user_id:{user_id},"
f"business_id:{business_id},"
)
if None in [phone, orgid, user_id, business_id, form_component]:
response['msg'] = "参数不能为空, 请检查参数"
return response
if len(phone) == 0:
sql_phone, err = await get_phone_by_id(orgid=orgid, business_id=business_id)
if err:
response['msg'] = err
return response
else:
phone = sql_phone
try:
instance_id, err = await a.send_apv(phone, orgid, user_id, business_id, form_component, result_org, original_id, apv_json)
if err != "":
response['msg'] = f"发送审批失败:send_apv:,{err}"
return response
response['status'] = True
response['instanceId'] = instance_id
except Exception as e:
response['msg'] = f"发送审批失败:{e}"
print(f"send_response:{response}")
return response