diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 3bf8d89..479bc34 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -11,6 +11,8 @@ Features: - RBAC-based task assignment - Query APIs for UI """ +from appPublic.log import debug +from sqlor.dbpools import get_sor_context # --------------------------------------------------------------------- # Table definitions @@ -104,15 +106,20 @@ class FlowEngine: def __init__(self): self.aio = [] - async def bgtask(self, app): + async def bgtask(self): env = ServerEnv() self.aio = [] - async with (env, 'workflow') as sor: - self.aio = await sor.R('flow_instance', {'status': 'running'}) + cnt = 0 while True: + async with (env, 'workflow') as sor: + self.aio = await sor.R('flow_instance', {'status': 'running'}) + if cnt >= 60: + debug(f'{self.aio=}') + cnt = 0 for r in self.aio: self.step(r.org_id, r.id) - await asyncio.sleep(0.5) + await asyncio.sleep(2) + cnt += 1 async def create_definition(self, org_id, name, version, dsl_text): env = ServerEnv() @@ -129,7 +136,7 @@ class FlowEngine: async def create_instance(self, org_id, flow_def_id, ctx=None): env = ServerEnv() - async with (env, 'workflow') as sor: + async with get_sor_context(env, 'dagflow') as sor: iid = env.uuid() await sor.C('flow_instance', { 'id': iid, @@ -147,7 +154,7 @@ class FlowEngine: async def step(self, org_id, instance_id): env = ServerEnv() - async with get_sor_context(env, 'workflow') as sor: + async with get_sor_context(env, 'dagflow') as sor: insts = await sor.R('flow_instance', { 'id': instance_id, 'org_id': org_id