diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index e39ebe4..103b8c1 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -112,7 +112,7 @@ class FlowEngine: self.aio = [] cnt = 0 while True: - async with (env, 'workflow') as sor: + async with get_sor_context(env, 'dagflow') as sor: self.aio = await sor.R('flow_instance', {'status': 'running'}) if cnt >= 60: debug(f'{self.aio=}') @@ -124,7 +124,7 @@ class FlowEngine: async def create_definition(self, org_id, name, version, dsl_text): env = ServerEnv() - async with (env, 'workflow') as sor: + async with get_sor_context(env, 'dagflow') as sor: fid = env.uuid() await sor.C('flow_definition', { 'id': fid, @@ -238,7 +238,7 @@ class FlowEngine: async def list_human_tasks(self, org_id, user_roles): env = ServerEnv() - async with (env, 'workflow') as sor: + async with get_sor_context(env, 'dagflow') as sor: return await sor.R('human_task', { 'org_id': org_id, 'status': 'pending', @@ -247,7 +247,7 @@ class FlowEngine: async def complete_human_task(self, org_id, task_id, user_id, output): env = ServerEnv() - async with (env, 'workflow') as sor: + async with get_sor_context(env, 'dagflow') as sor: rows = await sor.R('human_task', { 'id': task_id, 'org_id': org_id,