diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index c7969aa..e962a77 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -17,74 +17,74 @@ Features: # --------------------------------------------------------------------- FLOW_DEFINITION_TABLE = { - "summary": [{"name": "flow_definition", "primary": "id"}], - "fields": [ - {"name": "id", "type": "str", "length": 32, "nullable": "no"}, - {"name": "org_id", "type": "str", "length": 32}, - {"name": "name", "type": "str", "length": 128}, - {"name": "version", "type": "str", "length": 32}, - {"name": "dsl", "type": "text"}, - {"name": "created_at", "type": "timestamp"} - ] + "summary": [{"name": "flow_definition", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "name", "type": "str", "length": 128}, + {"name": "version", "type": "str", "length": 32}, + {"name": "dsl", "type": "text"}, + {"name": "created_at", "type": "timestamp"} + ] } FLOW_INSTANCE_TABLE = { - "summary": [{"name": "flow_instance", "primary": "id"}], - "fields": [ - {"name": "id", "type": "str", "length": 32, "nullable": "no"}, - {"name": "org_id", "type": "str", "length": 32, "nullable": "no"}, - {"name": "flow_def_id", "type": "str", "length": 32}, - {"name": "status", "type": "str", "length": 16}, - {"name": "ctx", "type": "text"}, - {"name": "active_nodes", "type": "text"}, - {"name": "created_at", "type": "timestamp"} - ] + "summary": [{"name": "flow_instance", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "flow_def_id", "type": "str", "length": 32}, + {"name": "status", "type": "str", "length": 16}, + {"name": "ctx", "type": "text"}, + {"name": "active_nodes", "type": "text"}, + {"name": "created_at", "type": "timestamp"} + ] } NODE_EXECUTION_TABLE = { - "summary": [{"name": "node_execution", "primary": "id"}], - "fields": [ - {"name": "id", "type": "str", "length": 32, "nullable": "no"}, - {"name": "org_id", "type": "str", "length": 32}, - {"name": "instance_id", "type": "str", "length": 32}, - {"name": "node_id", "type": "str", "length": 64}, - {"name": "input_ctx", "type": "text"}, - {"name": "output_ctx", "type": "text"}, - {"name": "status", "type": "str", "length": 16}, - {"name": "error", "type": "text"}, - {"name": "created_at", "type": "timestamp"} - ] + "summary": [{"name": "node_execution", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, + {"name": "input_ctx", "type": "text"}, + {"name": "output_ctx", "type": "text"}, + {"name": "status", "type": "str", "length": 16}, + {"name": "error", "type": "text"}, + {"name": "created_at", "type": "timestamp"} + ] } SUBFLOW_INSTANCE_TABLE = { - "summary": [{"name": "subflow_instance", "primary": "id"}], - "fields": [ - {"name": "id", "type": "str", "length": 32, "nullable": "no"}, - {"name": "org_id", "type": "str", "length": 32}, - {"name": "parent_instance_id", "type": "str", "length": 32}, - {"name": "parent_node_id", "type": "str", "length": 64}, - {"name": "child_instance_id", "type": "str", "length": 32}, - {"name": "status", "type": "str", "length": 16}, - {"name": "created_at", "type": "timestamp"} - ] + "summary": [{"name": "subflow_instance", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "parent_instance_id", "type": "str", "length": 32}, + {"name": "parent_node_id", "type": "str", "length": 64}, + {"name": "child_instance_id", "type": "str", "length": 32}, + {"name": "status", "type": "str", "length": 16}, + {"name": "created_at", "type": "timestamp"} + ] } HUMAN_TASK_TABLE = { - "summary": [{"name": "human_task", "primary": "id"}], - "fields": [ - {"name": "id", "type": "str", "length": 32, "nullable": "no"}, - {"name": "org_id", "type": "str", "length": 32}, - {"name": "instance_id", "type": "str", "length": 32}, - {"name": "node_id", "type": "str", "length": 64}, - {"name": "role", "type": "str", "length": 64}, - {"name": "assignee", "type": "str", "length": 64}, - {"name": "status", "type": "str", "length": 16}, - {"name": "input", "type": "text"}, - {"name": "output", "type": "text"}, - {"name": "timeout_at", "type": "timestamp"}, - {"name": "created_at", "type": "timestamp"}, - {"name": "completed_at", "type": "timestamp"} - ] + "summary": [{"name": "human_task", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, + {"name": "role", "type": "str", "length": 64}, + {"name": "assignee", "type": "str", "length": 64}, + {"name": "status", "type": "str", "length": 16}, + {"name": "input", "type": "text"}, + {"name": "output", "type": "text"}, + {"name": "timeout_at", "type": "timestamp"}, + {"name": "created_at", "type": "timestamp"}, + {"name": "completed_at", "type": "timestamp"} + ] } # --------------------------------------------------------------------- @@ -98,176 +98,176 @@ from ahserver.serverenv import ServerEnv class FlowEngine: - # ------------------------------ - # Definition / Instance - # ------------------------------ + # ------------------------------ + # Definition / Instance + # ------------------------------ def __init__(self): self.aio = [] async def bgtask(self, app): - env = ServerEnv() + env = ServerEnv() self.aio = [] - async with (env, 'workflow') as sor: - fid = env.uuid() - self.aio = await sor.R('flow_instance', {'status': 'running'}) + async with (env, 'workflow') as sor: + fid = env.uuid() + self.aio = await sor.R('flow_instance', {'status': 'running'}) while True: for r in self.aio: self.step(r.org_id, r.id) await asyncio.sleep(0.5) - async def create_definition(self, org_id, name, version, dsl_text): - env = ServerEnv() - async with (env, 'workflow') as sor: - fid = env.uuid() - await sor.C('flow_definition', { - 'id': fid, - 'org_id': org_id, - 'name': name, - 'version': version, - 'dsl': dsl_text - }) - return fid + async def create_definition(self, org_id, name, version, dsl_text): + env = ServerEnv() + async with (env, 'workflow') as sor: + fid = env.uuid() + await sor.C('flow_definition', { + 'id': fid, + 'org_id': org_id, + 'name': name, + 'version': version, + 'dsl': dsl_text + }) + return fid - async def create_instance(self, org_id, flow_def_id, ctx=None): - env = ServerEnv() - async with (env, 'workflow') as sor: - iid = env.uuid() - await sor.C('flow_instance', { - 'id': iid, - 'org_id': org_id, - 'flow_def_id': flow_def_id, - 'status': 'running', - 'ctx': json.dumps(ctx or {}), - 'active_nodes': json.dumps([]) - }) - return iid + async def create_instance(self, org_id, flow_def_id, ctx=None): + env = ServerEnv() + async with (env, 'workflow') as sor: + iid = env.uuid() + await sor.C('flow_instance', { + 'id': iid, + 'org_id': org_id, + 'flow_def_id': flow_def_id, + 'status': 'running', + 'ctx': json.dumps(ctx or {}), + 'active_nodes': json.dumps([]) + }) + return iid - # ------------------------------ - # Execution - # ------------------------------ + # ------------------------------ + # Execution + # ------------------------------ - async def step(self, org_id, instance_id): - env = ServerEnv() - async with get_sor_context(env, 'workflow') as sor: - insts = await sor.R('flow_instance', { - 'id': instance_id, - 'org_id': org_id - }) - if not insts: - return + async def step(self, org_id, instance_id): + env = ServerEnv() + async with get_sor_context(env, 'workflow') as sor: + insts = await sor.R('flow_instance', { + 'id': instance_id, + 'org_id': org_id + }) + if not insts: + return - inst = insts[0] - if inst['status'] != 'running': - return + inst = insts[0] + if inst['status'] != 'running': + return - flow_def = (await sor.R( - 'flow_definition', - {'id': inst['flow_def_id']} - ))[0] + flow_def = (await sor.R( + 'flow_definition', + {'id': inst['flow_def_id']} + ))[0] - dsl = yaml.safe_load(flow_def['dsl']) - ctx = json.loads(inst['ctx']) - active = set(json.loads(inst['active_nodes']) or [dsl['start']]) - next_nodes = set() + dsl = yaml.safe_load(flow_def['dsl']) + ctx = json.loads(inst['ctx']) + active = set(json.loads(inst['active_nodes']) or [dsl['start']]) + next_nodes = set() - for node_id in active: - node = dsl['nodes'][node_id] - ntype = node['type'] + for node_id in active: + node = dsl['nodes'][node_id] + ntype = node['type'] - # -------- Human node -------- - if ntype == 'human': - rows = await sor.R('human_task', { - 'instance_id': instance_id, - 'node_id': node_id, - 'status': 'pending' - }) - if not rows: - await sor.C('human_task', { - 'id': env.uuid(), - 'org_id': org_id, - 'instance_id': instance_id, - 'node_id': node_id, - 'role': node.get('role'), - 'status': 'pending', - 'input': json.dumps(ctx), - 'timeout_at': env.after(node.get('timeout', 0)) - }) - next_nodes.add(node_id) - continue + # -------- Human node -------- + if ntype == 'human': + rows = await sor.R('human_task', { + 'instance_id': instance_id, + 'node_id': node_id, + 'status': 'pending' + }) + if not rows: + await sor.C('human_task', { + 'id': env.uuid(), + 'org_id': org_id, + 'instance_id': instance_id, + 'node_id': node_id, + 'role': node.get('role'), + 'status': 'pending', + 'input': json.dumps(ctx), + 'timeout_at': env.after(node.get('timeout', 0)) + }) + next_nodes.add(node_id) + continue - # -------- Normal node -------- - await sor.C('node_execution', { - 'id': env.uuid(), - 'org_id': org_id, - 'instance_id': instance_id, - 'node_id': node_id, - 'input_ctx': json.dumps(ctx), - 'status': 'success' - }) + # -------- Normal node -------- + await sor.C('node_execution', { + 'id': env.uuid(), + 'org_id': org_id, + 'instance_id': instance_id, + 'node_id': node_id, + 'input_ctx': json.dumps(ctx), + 'status': 'success' + }) - for edge in dsl.get('edges', []): - if edge['from'] != node_id: - continue - cond = edge.get('when') - if cond and not eval(cond, {}, {'ctx': ctx}): - continue - next_nodes.add(edge['to']) + for edge in dsl.get('edges', []): + if edge['from'] != node_id: + continue + cond = edge.get('when') + if cond and not eval(cond, {}, {'ctx': ctx}): + continue + next_nodes.add(edge['to']) - if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): - await sor.U('flow_instance', { - 'id': instance_id, - 'status': 'finished', - 'active_nodes': json.dumps(list(next_nodes)) - }) - else: - await sor.U('flow_instance', { - 'id': instance_id, - 'active_nodes': json.dumps(list(next_nodes)) - }) + if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): + await sor.U('flow_instance', { + 'id': instance_id, + 'status': 'finished', + 'active_nodes': json.dumps(list(next_nodes)) + }) + else: + await sor.U('flow_instance', { + 'id': instance_id, + 'active_nodes': json.dumps(list(next_nodes)) + }) - # ------------------------------ - # Human task APIs - # ------------------------------ + # ------------------------------ + # Human task APIs + # ------------------------------ - async def list_human_tasks(self, org_id, user_roles): - env = ServerEnv() - async with (env, 'workflow') as sor: - return await sor.R('human_task', { - 'org_id': org_id, - 'status': 'pending', - 'role': ('in', user_roles) - }) + async def list_human_tasks(self, org_id, user_roles): + env = ServerEnv() + async with (env, 'workflow') as sor: + return await sor.R('human_task', { + 'org_id': org_id, + 'status': 'pending', + 'role': ('in', user_roles) + }) - async def complete_human_task(self, org_id, task_id, user_id, output): - env = ServerEnv() - async with (env, 'workflow') as sor: - rows = await sor.R('human_task', { - 'id': task_id, - 'org_id': org_id, - 'status': 'pending' - }) - if not rows: - return + async def complete_human_task(self, org_id, task_id, user_id, output): + env = ServerEnv() + async with (env, 'workflow') as sor: + rows = await sor.R('human_task', { + 'id': task_id, + 'org_id': org_id, + 'status': 'pending' + }) + if not rows: + return - task = rows[0] - await sor.U('human_task', { - 'id': task_id, - 'assignee': user_id, - 'status': 'done', - 'output': json.dumps(output), - 'completed_at': env.now() - }) + task = rows[0] + await sor.U('human_task', { + 'id': task_id, + 'assignee': user_id, + 'status': 'done', + 'output': json.dumps(output), + 'completed_at': env.now() + }) - inst = (await sor.R( - 'flow_instance', - {'id': task['instance_id']} - ))[0] + inst = (await sor.R( + 'flow_instance', + {'id': task['instance_id']} + ))[0] - ctx = json.loads(inst['ctx']) - ctx.update(output) + ctx = json.loads(inst['ctx']) + ctx.update(output) - await sor.U('flow_instance', { - 'id': inst['id'], - 'ctx': json.dumps(ctx) - }) + await sor.U('flow_instance', { + 'id': inst['id'], + 'ctx': json.dumps(ctx) + })