diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index e962a77..3bf8d89 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -108,7 +108,6 @@ class FlowEngine: env = ServerEnv() self.aio = [] async with (env, 'workflow') as sor: - fid = env.uuid() self.aio = await sor.R('flow_instance', {'status': 'running'}) while True: for r in self.aio: diff --git a/dagflow/init.py b/dagflow/init.py index e3b201f..efa55fb 100644 --- a/dagflow/init.py +++ b/dagflow/init.py @@ -1,10 +1,17 @@ +from functools import partial from ahserver.serverenv import ServerEnv -from ahserver.configuredServer import add_startup +from ahserver.configuredServer import add_cleanupctx from .dagflow import FlowEngine +async def dagbacktask(engine, app): + task = asyncio.create_task(engine.bgtask()) + yield + task.cancel() + def load_dagflow(): engine = FlowEngine() - add_startup(engine.bgtask) + f = partial(dagbacktask, engine) + add_cleanupctx(f) env = ServerEnv() env.workflow_engine = engine