import datetime from string import Template from alibabacloud_dingtalk.oauth2_1_0 import models as dingtalkoauth_2__1__0_models from alibabacloud_dingtalk.oauth2_1_0.client import Client as dingtalkoauth2_1_0Client from alibabacloud_dingtalk.workflow_1_0 import models as dingtalkworkflow__1__0_models from alibabacloud_dingtalk.workflow_1_0.client import Client as dingtalkworkflow_1_0Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models from appPublic.rc4 import unpassword from appPublic.uniqueID import getID as uuid from sqlor.dbpools import DBPools from get_id_by_phone_dd import get_id_by_phone class APV: """ 审批流 """ def __init__(self): self.client_auth2 = self.create_client_auto2() self.client_workflow = self.create_client_workflow() async def get_key(self, orgid=None) -> (dict, str): """ 根据用户id获取app_key和app_secret :param orgid: 机构id :return: app_key,app_secret """ db = DBPools() async with db.sqlorContext('kboss') as sor: try: sql = "select * from apv_key where del_flg = ${del_flg}$ and orgid = ${orgid}$ limit 1" data = await sor.sqlExe(sql, {"del_flg": "0", "orgid": orgid}) if not data: return {}, "未找到审批配置,orgid:" + orgid + ",请先配置key" else: data = data[0] data["app_key"] = unpassword(data["app_key"]) data["app_secret"] = unpassword(data["app_secret"]) data["http_aes_key"] = unpassword(data["http_aes_key"]) data["http_token"] = unpassword(data["http_token"]) return data, "" except Exception as e: return {}, f"orgid:{orgid},获取key失败:{e}" async def get_process_code(self, orgid, business_id): db = DBPools() async with db.sqlorContext('kboss') as sor: try: sql = "select process_code from apv_send_key where del_flg = ${del_flg}$ and orgid = ${orgid}$ and business_id=${business_id}$ limit 1" data = await sor.sqlExe(sql, {"del_flg": "0", "orgid": orgid, "business_id": business_id}) if not data: return {}, "未找到审批单,orgid:{},business_id:{}".format(orgid, business_id) else: data = data[0] return data, "" except Exception as e: return {}, f"未找到审批单:{orgid},获取失败:{e}" def create_client_auto2(self) -> dingtalkoauth2_1_0Client: """ 使用 Token 初始化账号Client :return: Client """ config = open_api_models.Config() config.protocol = 'https' config.region_id = 'central' return dingtalkoauth2_1_0Client(config) def create_client_workflow(self) -> dingtalkworkflow_1_0Client: """ 使用 Token 初始化账号Client @return: Client @throws Exception """ config = open_api_models.Config() config.protocol = 'https' config.region_id = 'central' return dingtalkworkflow_1_0Client(config) async def get_token(self, app_key, app_secret) -> (str, str): """ 获取token :param app_key: app_key :param app_secret: app_secret :return: token,error """ if not app_key or not app_secret: return "", "app_key或app_secret不能为空" err = "" get_access_token_request = dingtalkoauth_2__1__0_models.GetAccessTokenRequest( app_key=app_key, app_secret=app_secret) try: resp = self.client_auth2.get_access_token(get_access_token_request) token = resp.body.to_map()['accessToken'] except Exception as e: err = "获取token失败,err:" + str(e) return "", err return token, err async def start_process_instance(self, token: str, process_code: str, originator_user_id: str, approvers: list, form_component: dict) -> (str, str): """ 发起审批流 :param token: token :param process_code: 审批流模板唯一标识 :param originator_user_id: 发起人id :param approvers: 审批人id :param form_component: 表单参数 :return: error,resp """ start_process_instance_headers = dingtalkworkflow__1__0_models.StartProcessInstanceHeaders() start_process_instance_headers.x_acs_dingtalk_access_token = token start_process_instance_request = dingtalkworkflow__1__0_models.StartProcessInstanceRequest( # 发起人 originator_user_id=originator_user_id, # 审批模板 id process_code=process_code, # 直接指定的审批人列表,最大列表长度:20。 approvers=self.get_process_config(approvers), # 表单参数 form_component_values=self.get_form_component_values(form_component), ) try: resp = self.client_workflow.start_process_instance_with_options(start_process_instance_request, start_process_instance_headers, util_models.RuntimeOptions()) return resp.body.to_map(), "" except Exception as err: return "", "钉钉报错:" + str(err) # 审批流配置 def get_process_config(self, approvers_data: list) -> list: """ 审批流配置 :param approvers_data: 审批人数据 [list],第一个参数为审批类型,后面为审批人id :return: 审批流配置 """ approvers = [] for data in approvers_data: approver = dingtalkworkflow__1__0_models.StartProcessInstanceRequestApprovers(action_type=data[0], user_ids=data[1:]) approvers.append(approver) return approvers # 获取表单数据,并匹配模板 async def get_form_component_data(self, business_id: str, from_data: dict) -> (dict, str): """ 获取表单数据,并匹配模板 :param business_id: 业务id :param from_data: 表单数据 :return: 匹配模板后表单数据 """ # 获取模板 db = DBPools() async with db.sqlorContext('kboss') as sor: try: data = await sor.R("apv_content_template", {"del_flg": "0", "business_id": business_id, "sord": "update_at desc"}) if not data: return {}, f"未找到模板,请检查数据库:business_id:{business_id}" else: data = data[0] except Exception as e: err = f"获取模板失败:business_id:{business_id}:error:{e}" raise Exception(err) T = Template(data['title_template']) C = Template(data['detail_template']) # 匹配模板 try: form_component_data = { "title": T.substitute(from_data["title"]), "detail": C.substitute(from_data["detail"]), } except Exception as e: err = f"匹配模板参数失败:business_id:{business_id}:error:{e}" return {}, err return form_component_data, "" # 表单参数配置 def get_form_component_values(self, form_component_values_data: dict) -> list: """ 表单参数配置 :param form_component_values_data: 表单参数数据 :return: 表单参数配置 """ form_component_values_datas = [] form_component_values_0 = dingtalkworkflow__1__0_models.StartProcessInstanceRequestFormComponentValues( name='申请内容', value=form_component_values_data['title'], ) form_component_values_1 = dingtalkworkflow__1__0_models.StartProcessInstanceRequestFormComponentValues( name='审批详情', value=form_component_values_data['detail'], ) form_component_values_datas.append(form_component_values_0) form_component_values_datas.append(form_component_values_1) return form_component_values_datas # 获取审批流数据 async def get_approvals(self, orgid: str, business_id: str) -> (list, str): """ 获取审批流数据 :param orgid: 机构id :return: """ err = "" db = DBPools() async with db.sqlorContext('kboss') as sor: try: data = await sor.R("apv_flow", {"orgid": orgid, "del_flg": "0", "business_id": business_id, "sort": "level,apv_dd_user_id"}) if not data: err = f"未找到审批流数据,请检查数据库:orgid:{orgid},business_id:{business_id}" return [], err except Exception as e: err = f"orgid:{orgid},获取key失败:{e}" raise Exception(err) approver_data_0 = [] approver_data_1 = [] approver_data_2 = [] for data in data: if data["level"] == "1": if len(approver_data_0) == 0: approver_data_0.append(data["mode"]) approver_data_0.append(data["apv_dd_user_id"]) elif data["level"] == "2": if len(approver_data_1) == 0: approver_data_1.append(data["mode"]) approver_data_1.append(data["apv_dd_user_id"]) elif data["level"] == "3": if len(approver_data_2) == 0: approver_data_2.append(data["mode"]) approver_data_2.append(data["apv_dd_user_id"]) else: err = f"orgid:{orgid},审批人配置错误:级别不存在:lavel:{data['level']}" return [], err approvers = [] if len(approver_data_0) < 2: return [], f"orgid:{orgid},审批人配置错误:第一级审批人列表为长度0:请检查数据库" for i in [approver_data_0, approver_data_1, approver_data_2]: if len(i) > 1: approvers.append(i) if len(i) == 1: return [], f"orgid:{orgid},审批人配置错误:审批长度为0:请检查数据库" return approvers, err # 获取业主组织id async def get_base_orgid(self, orgid: str) -> str: """ 获取业主组织id :param orgid: 非业主组织id """ db = DBPools() async with db.sqlorContext('kboss') as sor: try: data = await sor.R("organization", {"del_flg": "0", "org_type": 0}) if not data: return None else: data = data[0] except Exception as e: err = f"获取组织id失败:orgid:{orgid}:error:{e}" raise Exception(err) return data['id'] # 发送审批 async def send_apv(self, phone: str, orgid: str, user_id: str, business_id: str, form_component: dict, result_org: str, original_id: str, apv_json: str) -> (str, str): """ 发送审批 :param orgid: 结构 id :param user_id: 用户id :param business_id: 业务id :param form_component: 表单数据 :param result_org: :param original_id: :param apv_json: :return: 发送审批结果 """ # 夸机构的业务,获取业主机构id if str(business_id) in ["1", "2"]: orgid = await self.get_base_orgid(orgid=orgid) if not orgid: return "", f"未找到业主机构id,business_id:{business_id}" # 通过手机号获取发送人userid resp = await get_id_by_phone(orgid=orgid, phone=phone) if not resp['status']: return "", resp['msg'] originator_user_id = resp['user_id'] # 获取key key_data, err = await self.get_key(orgid=orgid) if err != "": return "", "get key error:" + err app_key = key_data['app_key'] print(f"use keydata:{key_data}") app_secret = key_data['app_secret'] token, err = await self.get_token(app_key=app_key, app_secret=app_secret) if err != "": return "", "get token error:" + err # 获取取审单 get_process_code_data, err = await self.get_process_code(orgid=orgid, business_id=business_id) if err != "": return "", "get process_code error:" + err process_code = get_process_code_data['process_code'] approvers, err = await self.get_approvals(orgid=orgid, business_id=business_id) if err != "": return "", "get approvers error:" + err form_component_data, err = await self.get_form_component_data(business_id=business_id, from_data=form_component) if err != "": return "", "get form_component_data error:" + err # 发送审批 resp, err = await self.start_process_instance(token=token, process_code=process_code, originator_user_id=originator_user_id, approvers=approvers, form_component=form_component_data) if err != "": return "", "start process instance error:" + err else: if "instanceId" not in str(resp): return "", f"发送审批失败:resp:{resp}" app_status = "start" try: # 保存数据入库 await self.save_data(orgid=orgid, user_id=user_id, business_id=business_id, process_code=process_code, apv_id=resp['instanceId'], apv_title=form_component_data['title'], apv_detail=form_component_data['detail'], apv_sender=user_id, apv_status=app_status, result_org=result_org, original_id=original_id, apv_json=apv_json) except Exception as e: err = f"保存审批数据失败:{e}" return "", err return resp['instanceId'], err async def save_data(self, orgid, user_id, business_id, process_code, apv_id, apv_title, apv_detail, apv_sender, apv_status, result_org, original_id, apv_json) -> str: """ 保存审批数据 :param orgid: 业主机构id :param user_id: 用户id :param business_id: 业务id :param process_code: 表单id :param apv_id: 审批id :param apv_title: 标题 :param apv_detail: 内容 :param apv_sender: 发起人 :param apv_status: 流程状态 :param original_id: 原充值日志id :param apv_json: json """ data = { "id": uuid(), "user_id": user_id, "orgid": orgid, "result_org": result_org, "business_id": business_id, "process_code": process_code, "apv_id": apv_id, "apv_title": apv_title, "apv_text": apv_detail, "apv_sender": apv_sender, "apv_start_time": f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', "apv_finish_time": None, "apv_status": apv_status, "original_id": original_id, "apv_json": apv_json, } db = DBPools() async with db.sqlorContext('kboss') as sor: await sor.C("apv_data", data) async def get_phone_by_id(orgid: str, business_id: str): db = DBPools() async with db.sqlorContext('kboss') as sor: try: data = await sor.R("apv_send_key", {"del_flg": "0", "orgid": orgid, "business_id": business_id}) if not data: return None, f"get_phone_by_id ,未找到审批发送key,orgid:{orgid},business_id:{business_id}" else: data = data[0] phone = data['send_dd_user_phone'] if not phone: return None, f"get_phone_by_id ,未找到审批发送key,orgid:{orgid},business_id:{business_id}" return phone, "" except Exception as e: err = f"get_phone_by_id :orgid:{orgid}:error:{e}" return None, err # 发送审批 async def issue_approve(phone: str, orgid: str, user_id: str, business_id: str, form_component: dict, result_org: str = "", original_id: str = "", apv_json: str = "") -> dict: """ 发送审批 :param phone: 发送人手机号 :param orgid: 机构 id :param user_id: 内部用户id :param business_id: 业务id :param form_component: 表单参数 :param result_org: 被执行机构 :param original_id: 原充值日志id :param apv_json: json :return: 发送审批结果 """ a = APV() response = { "status": False } print(f"send apv args," f"phone:{phone}," f"orgid:{orgid}," f"user_id:{user_id}," f"business_id:{business_id}," ) if None in [phone, orgid, user_id, business_id, form_component]: response['msg'] = "参数不能为空, 请检查参数" return response if len(phone) == 0: sql_phone, err = await get_phone_by_id(orgid=orgid, business_id=business_id) if err: response['msg'] = err return response else: phone = sql_phone try: instance_id, err = await a.send_apv(phone, orgid, user_id, business_id, form_component, result_org, original_id, apv_json) if err != "": response['msg'] = f"发送审批失败:send_apv:,{err}" return response response['status'] = True response['instanceId'] = instance_id except Exception as e: response['msg'] = f"发送审批失败:{e}" print(f"send_response:{response}") return response