This commit is contained in:
yumoqing 2026-03-12 13:33:24 +08:00
parent 034d6fe4d4
commit 172b7115cb
5 changed files with 104 additions and 11 deletions

View File

@ -49,13 +49,21 @@ NODE_EXECUTION_TABLE = {
"summary": [{"name": "node_execution", "primary": "id"}], "summary": [{"name": "node_execution", "primary": "id"}],
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "type", "type": "str", "length": 32},
{"name": "org_id", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32}, {"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64}, {"name": "node_id", "type": "str", "length": 64},
{"name": "input_ctx", "type": "text"}, {"name": "input_ctx", "type": "text"},
{"name": "output_ctx", "type": "text"}, {"name": "output_ctx", "type": "text"},
{"name": "ctx", "type": "text"},
{"name": "ctx_ext", "type": "text"},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "error", "type": "text"}, {"name": "error", "type": "text"},
{"name": "input", "type": "text"},
{"name": "output", "type": "text"},
{"name": "subinst_id", "type": "str", "length": 32},
{"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64},
{"name": "running_at", "type": "timestamp"}, {"name": "running_at", "type": "timestamp"},
{"name": "stopping_at", "type": "timestamp"}, {"name": "stopping_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
@ -176,6 +184,13 @@ class FlowEngine:
# ------------------------------ # ------------------------------
async def step(self, org_id, instance_id): async def step(self, org_id, instance_id):
"""
从节点执行表中取出所有状态不是结束的执行节点检查是否可以推进一步
结束状态为
* succeeded
* failed
* cancelled
推进如果
env = ServerEnv() env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor: async with get_sor_context(env, 'dagflow') as sor:
insts = await sor.R('flow_instance', { insts = await sor.R('flow_instance', {
@ -224,7 +239,7 @@ class FlowEngine:
next_nodes.add(node_id) next_nodes.add(node_id)
continue continue
if ntype == 'task': if ntype == 'task':
if node.get('join') if node.get('join'):
await sor.C('node_execution', { await sor.C('node_execution', {
'id': env.uuid(), 'id': env.uuid(),
'org_id': org_id, 'org_id': org_id,
@ -292,6 +307,30 @@ class FlowEngine:
nnodes.add((edge['to'], str(e))) nnodes.add((edge['to'], str(e)))
return nnodes return nnodes
async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst):
join = node.get('join)
if join is None:
return True
return False
async def create_new_node_exe(self, sor, node, edges, inst):
flg = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
if flg:
ctx = inst['ctx'].copy,
ctx_ext = ext_data or {}
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'org_id': inst['org_id'],
'instance_id': inst['id],
'node_id': node['id'],
'ctx': json.dumps(ctx, ensure_ascii=False),
'ctx_ext': json.dumps(ctx_ext, ensure_ascii=False),
'input_ctx': node['input_ctx'],
'status': 'pendding'
})
return id
async def is_ok_to_start(self, sor, node, edges, inst): async def is_ok_to_start(self, sor, node, edges, inst):
join = node.get('join') join = node.get('join')
if not join: if not join:
@ -401,5 +440,27 @@ where instance_id=${instance_id}$
}) })
async def add_new_workflow(request, params_kw={}): async def add_new_workflow(request, params_kw={}):
arams_kw.name name = params_kw.name
params_kw.dsl dsl = params_kw.dsl
env = request._run_ns
orgid = await env.get_userorgid()
async with get_sor_context(env, 'dagflow') as sor:
ns = {
'id': env.uuid(),
'name': name,
'dsl': dsl,
'version': '1',
'created_at': timestampstr(),
'org_id': orgid
}
await sor.C('dagflow_definition', ns)
return {
'status': 'SUCCEEDED',
'data': {
'dagflow_id': ns['id']
}
}
return {
'status': 'FAILED',
'error': 'add workflow error'
}

View File

@ -8,8 +8,10 @@
"fields": [ "fields": [
{"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "name", "title": "流程名称", "type": "str", "length": 128}, {"name": "name", "title": "流程名称", "type": "str", "length": 128},
{"name": "description", "title": "流程描述", "type": "text"},
{"name": "version", "title": "版本", "type": "str", "length": 32}, {"name": "version", "title": "版本", "type": "str", "length": 32},
{"name": "dsl", "title": "YAML DSL", "type": "text"}, {"name": "dsl", "title": "YAML DSL", "type": "text"},
{"name": "ctxfields", "title": "上下文属性集", "type": "text"},
{"name": "created_at", "title": "创建时间", "type": "timestamp"} {"name": "created_at", "title": "创建时间", "type": "timestamp"}
], ],
"indexes": [ "indexes": [

View File

@ -7,6 +7,7 @@
}], }],
"fields": [ "fields": [
{"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "title": "机构id", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, {"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16}, {"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "ctx", "title": "上下文(JSON)", "type": "text"}, {"name": "ctx", "title": "上下文(JSON)", "type": "text"},

17
models/nod_transfer.json Normal file
View File

@ -0,0 +1,17 @@
{
"summary": [{
"name": "node_transfer",
"title": "节点转移表",
"primary": "id",
"catelog": "relation"
}],
"fields": [
{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32},
{"name": "from_neid", "title": "从节点id", "type": "str", "length": 32},
{"name": "to_neid", "title": "到节点id", "type": "str", "length": 32},
],
"indexes": [
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}
]
}

View File

@ -6,14 +6,26 @@
"catelog": "relation" "catelog": "relation"
}], }],
"fields": [ "fields": [
{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, {"name": "type", "title": "节点类型", "type": "str", "length": 32},
{"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, {"name": "org_id", "title": "机构id", "type": "str", "length": 32},
{"name": "input_ctx", "title": "输入上下文", "type": "text"}, {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32},
{"name": "output_ctx", "title": "输出上下文", "type": "text"}, {"name": "node_id", "title": "节点ID", "type": "str", "length": 64},
{"name": "status", "title": "状态", "type": "str", "length": 16}, {"name": "input_ctx", "title": "输入上下文", "type": "text"},
{"name": "error", "title": "错误信息", "type": "text"}, {"name": "output_ctx", "title": "输出上下文", "type": "text"},
{"name": "created_at", "title": "执行时间", "type": "timestamp"} {"name": "ctx", "title": "执行时上下文", "type": "text"},
{"name": "ctx_ext", "title": "执行时附加上下文", "type": "text"},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "error", "title": "错误信息", "type": "text"},
{"name": "input", "title": "执行实际输入", "type": "text"},
{"name": "output", "title": "执行实际输出", "type": "text"},
{"name": "subinst_id", "title": "子流程实例id", "type": "str", "length": 32},
{"name": "role", "title": "执行人角色", "type": "str", "length": 64},
{"name": "assignee", "title": "执行人id", "type": "str", "length": 64},
{"name": "running_at", "title": "执行时点", "type": "timestamp"},
{"name": "stopping_at", "title": "结束时点", "type": "timestamp"},
{"name": "created_at", "title": "执行时间", "type": "timestamp"}
{"name": "next_nodeid", "title": "下一节点id", "type", "str", "length": 32}
], ],
"indexes": [ "indexes": [
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}