- New states: waiting (step/task), rejected (step) - New tables: pipeline_human_tasks, pipeline_step_types - New module: step_registry.py — pluggable step_type metadata - New module: human.py — human_complete, approval_approve, approval_reject - Executor: detects interactive step_types, creates human_tasks, enters waiting - Reject with rollback: approval_reject(rollback_to=step) resets steps and re-runs - API: human_task_complete, approval_approve, approval_reject, human_task_list - API: pipeline_step_types, pipeline_register_step_type, pipeline_unregister_step_type - Built-in interactive types: human_task, approval_gate - Updated DDL and appcodes
240 lines
8.7 KiB
Python
240 lines
8.7 KiB
Python
"""Human task operations — complete, reject, approve.
|
|
|
|
When the executor encounters an interactive step_type (human_task/approval_gate),
|
|
it creates a pipeline_human_tasks record and puts the step into WAITING state.
|
|
This module handles the human-side operations to resume execution.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from appPublic.uniqueID import getID
|
|
|
|
from .state import (
|
|
STATE_WAITING, STATE_COMPLETED, STATE_REJECTED, STATE_PENDING,
|
|
TASK_RUNNING, TASK_WAITING, TASK_FAILED,
|
|
HUMAN_PENDING, HUMAN_SUBMITTED, HUMAN_APPROVED, HUMAN_REJECTED,
|
|
build_step_graph, find_next_step, get_rerun_from_next,
|
|
)
|
|
from .storage import (
|
|
get_task, get_step_states, get_pipeline_steps,
|
|
update_step_state, update_task_state, save_artifact,
|
|
reset_steps,
|
|
)
|
|
from .executor import resume_task
|
|
|
|
logger = logging.getLogger("pipeline.human")
|
|
|
|
|
|
async def human_complete(tenant_id, task_id, step_name, result_data, operator_id=None):
|
|
"""Complete a human_task step — submit form data and resume execution.
|
|
|
|
Args:
|
|
tenant_id: tenant
|
|
task_id: task
|
|
step_name: the waiting step
|
|
result_data: dict — form submission data
|
|
operator_id: who completed it
|
|
|
|
Returns:
|
|
JSON string
|
|
"""
|
|
result = {"success": False}
|
|
try:
|
|
task = await get_task(tenant_id, task_id)
|
|
if not task:
|
|
result["message"] = "任务不存在"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
# Verify step is waiting
|
|
step_states = await get_step_states(task_id)
|
|
if step_states.get(step_name) != STATE_WAITING:
|
|
result["message"] = f"步骤 {step_name} 不在等待状态 (当前: {step_states.get(step_name)})"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
# Save result as output artifact
|
|
version = task.get("current_version", task.get("current_Version", 1))
|
|
if isinstance(version, str):
|
|
version = int(version)
|
|
await save_artifact(task_id, version, step_name, "output", result_data)
|
|
|
|
# Mark step completed
|
|
await update_step_state(task_id, step_name, STATE_COMPLETED)
|
|
|
|
# Update human_tasks record
|
|
await _update_human_task(task_id, step_name, HUMAN_SUBMITTED, result_data, operator_id)
|
|
|
|
# Resume execution
|
|
await update_task_state(task_id, TASK_RUNNING)
|
|
await resume_task(task_id)
|
|
|
|
result["success"] = True
|
|
result["message"] = f"步骤 {step_name} 已完成,产线继续执行"
|
|
except Exception as e:
|
|
result["message"] = str(e)
|
|
return json.dumps(result, ensure_ascii=False, default=str)
|
|
|
|
|
|
async def approval_approve(tenant_id, task_id, step_name, reviewer_id, comments=None):
|
|
"""Approve an approval_gate step.
|
|
|
|
Args:
|
|
reviewer_id: who approved
|
|
comments: optional comments
|
|
|
|
Returns:
|
|
JSON string
|
|
"""
|
|
result = {"success": False}
|
|
try:
|
|
task = await get_task(tenant_id, task_id)
|
|
if not task:
|
|
result["message"] = "任务不存在"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
step_states = await get_step_states(task_id)
|
|
if step_states.get(step_name) != STATE_WAITING:
|
|
result["message"] = f"步骤 {step_name} 不在等待状态"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
# Save approval as artifact
|
|
version = task.get("current_version", task.get("current_Version", 1))
|
|
if isinstance(version, str):
|
|
version = int(version)
|
|
approval_data = {"approved": True, "reviewer_id": reviewer_id, "comments": comments}
|
|
await save_artifact(task_id, version, step_name, "output", approval_data)
|
|
|
|
# Mark step completed
|
|
await update_step_state(task_id, step_name, STATE_COMPLETED)
|
|
await _update_human_task(task_id, step_name, HUMAN_APPROVED, approval_data, reviewer_id)
|
|
|
|
# Resume execution
|
|
await update_task_state(task_id, TASK_RUNNING)
|
|
await resume_task(task_id)
|
|
|
|
result["success"] = True
|
|
result["message"] = f"步骤 {step_name} 审批通过,产线继续执行"
|
|
except Exception as e:
|
|
result["message"] = str(e)
|
|
return json.dumps(result, ensure_ascii=False, default=str)
|
|
|
|
|
|
async def approval_reject(tenant_id, task_id, step_name, reviewer_id, comments=None, rollback_to=None):
|
|
"""Reject an approval_gate step — optionally rollback to a previous step.
|
|
|
|
Args:
|
|
reviewer_id: who rejected
|
|
comments: rejection reason
|
|
rollback_to: step_name to rollback to (if None, just mark rejected)
|
|
|
|
Returns:
|
|
JSON string
|
|
"""
|
|
result = {"success": False}
|
|
try:
|
|
task = await get_task(tenant_id, task_id)
|
|
if not task:
|
|
result["message"] = "任务不存在"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
step_states = await get_step_states(task_id)
|
|
if step_states.get(step_name) != STATE_WAITING:
|
|
result["message"] = f"步骤 {step_name} 不在等待状态"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
rejection_data = {"approved": False, "reviewer_id": reviewer_id, "comments": comments}
|
|
|
|
if rollback_to:
|
|
# Rollback: reset target step and all steps between
|
|
pipeline_id = task.get("pipeline_id", task.get("Pipeline_id", ""))
|
|
step_records = await get_pipeline_steps(pipeline_id)
|
|
step_graph = build_step_graph(step_records)
|
|
|
|
# Get all steps from rollback_to to step_name (inclusive)
|
|
affected = _get_steps_between(step_graph, rollback_to, step_name)
|
|
if not affected:
|
|
result["message"] = f"无法计算从 {rollback_to} 到 {step_name} 的回退路径"
|
|
return json.dumps(result, ensure_ascii=False)
|
|
|
|
# Reset affected steps to pending
|
|
await reset_steps(task_id, affected)
|
|
|
|
# Save rejection info as artifact on the rollback target
|
|
version = task.get("current_version", task.get("current_Version", 1))
|
|
if isinstance(version, str):
|
|
version = int(version)
|
|
await save_artifact(task_id, version, rollback_to, "input", {
|
|
"__rejection__": rejection_data
|
|
})
|
|
|
|
# Resume from rollback point
|
|
await update_task_state(task_id, TASK_RUNNING)
|
|
await resume_task(task_id)
|
|
|
|
result["success"] = True
|
|
result["message"] = f"审批驳回,回退到 {rollback_to},重跑 {len(affected)} 个步骤"
|
|
result["rerun_steps"] = affected
|
|
else:
|
|
# Just reject — task fails
|
|
await update_step_state(task_id, step_name, STATE_REJECTED, comments)
|
|
await _update_human_task(task_id, step_name, HUMAN_REJECTED, rejection_data, reviewer_id)
|
|
await update_task_state(task_id, TASK_FAILED)
|
|
|
|
result["success"] = True
|
|
result["message"] = f"审批驳回,任务已标记失败"
|
|
|
|
except Exception as e:
|
|
result["message"] = str(e)
|
|
return json.dumps(result, ensure_ascii=False, default=str)
|
|
|
|
|
|
async def human_list(tenant_id=None, assignee_role=None, assignee_id=None, status=None):
|
|
"""List human tasks, optionally filtered by role/user/status.
|
|
|
|
Returns:
|
|
JSON string with human_tasks list
|
|
"""
|
|
result = {"success": False}
|
|
try:
|
|
from .storage import list_human_tasks
|
|
tasks = await list_human_tasks(tenant_id, assignee_role, assignee_id, status)
|
|
result["success"] = True
|
|
result["tasks"] = tasks
|
|
result["total"] = len(tasks)
|
|
except Exception as e:
|
|
result["message"] = str(e)
|
|
return json.dumps(result, ensure_ascii=False, default=str)
|
|
|
|
|
|
def _get_steps_between(step_graph: dict, from_step: str, to_step: str) -> list:
|
|
"""BFS from from_step to to_step through dependents. Returns ordered list including both endpoints."""
|
|
if from_step not in step_graph or to_step not in step_graph:
|
|
return []
|
|
|
|
visited = set()
|
|
queue = [from_step]
|
|
found = False
|
|
|
|
while queue:
|
|
current = queue.pop(0)
|
|
if current in visited:
|
|
continue
|
|
visited.add(current)
|
|
if current == to_step:
|
|
found = True
|
|
break
|
|
for dep in step_graph.get(current, {}).get("dependents", []):
|
|
if dep not in visited:
|
|
queue.append(dep)
|
|
|
|
if not found:
|
|
return []
|
|
|
|
result = sorted(visited, key=lambda s: step_graph.get(s, {}).get("order", 999))
|
|
return result
|
|
|
|
|
|
async def _update_human_task(task_id, step_name, status, result_data, operator_id=None):
|
|
"""Update the human_tasks record for this step."""
|
|
from .storage import update_human_task_record
|
|
await update_human_task_record(task_id, step_name, status, result_data, operator_id)
|