dagflow/dagflow/init.py
2026-03-16 16:29:15 +08:00

43 lines
1.0 KiB
Python

import asyncio
from functools import partial
from appPublic.jsonConfig import getConfig
from ahserver.serverenv import ServerEnv
from ahserver.configuredServer import add_cleanupctx
from .dagflow import (
FlowEngine,
get_org_flow_definition,
get_exists_workflows,
get_my_flow_works,
new_instance,
list_org_instances,
add_new_workflow
)
async def dagbacktask(engines, app):
tasks = []
for e in engines:
task = asyncio.create_task(e.bgtask())
tasks.append(task)
yield
for task in tasks:
task.cancel()
def load_dagflow():
config = getConfig()
flow_engines = []
cnt = config.dagflow_job_cnt or 1
for i in range(cnt):
e = FlowEngine(i)
flow_engines.append(e)
f = partial(dagbacktask, flow_engines)
# add_cleanupctx(f)
env = ServerEnv()
env.flow_engines = flow_engines
env.add_new_workflow = add_new_workflow
env.new_instance = new_instance
env.get_exists_workflows = get_exists_workflows
env.get_org_flow_definition = get_org_flow_definition
env.get_my_flow_works = get_my_flow_works
env.list_org_instances = list_org_instances