"""Human task operations — complete, reject, approve. When the executor encounters an interactive step_type (human_task/approval_gate), it creates a pipeline_human_tasks record and puts the step into WAITING state. This module handles the human-side operations to resume execution. """ import json import logging from appPublic.uniqueID import getID from .state import ( STATE_WAITING, STATE_COMPLETED, STATE_REJECTED, STATE_PENDING, TASK_RUNNING, TASK_WAITING, TASK_FAILED, HUMAN_PENDING, HUMAN_SUBMITTED, HUMAN_APPROVED, HUMAN_REJECTED, build_step_graph, find_next_step, get_rerun_from_next, ) from .storage import ( get_task, get_step_states, get_pipeline_steps, update_step_state, update_task_state, save_artifact, reset_steps, ) from .executor import resume_task logger = logging.getLogger("pipeline.human") async def human_complete(tenant_id, task_id, step_name, result_data, operator_id=None): """Complete a human_task step — submit form data and resume execution. Args: tenant_id: tenant task_id: task step_name: the waiting step result_data: dict — form submission data operator_id: who completed it Returns: JSON string """ result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) # Verify step is waiting step_states = await get_step_states(task_id) if step_states.get(step_name) != STATE_WAITING: result["message"] = f"步骤 {step_name} 不在等待状态 (当前: {step_states.get(step_name)})" return json.dumps(result, ensure_ascii=False) # Save result as output artifact version = task.get("current_version", task.get("current_Version", 1)) if isinstance(version, str): version = int(version) await save_artifact(task_id, version, step_name, "output", result_data) # Mark step completed await update_step_state(task_id, step_name, STATE_COMPLETED) # Update human_tasks record await _update_human_task(task_id, step_name, HUMAN_SUBMITTED, result_data, operator_id) # Resume execution await update_task_state(task_id, TASK_RUNNING) await resume_task(task_id) result["success"] = True result["message"] = f"步骤 {step_name} 已完成,产线继续执行" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def approval_approve(tenant_id, task_id, step_name, reviewer_id, comments=None): """Approve an approval_gate step. Args: reviewer_id: who approved comments: optional comments Returns: JSON string """ result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) step_states = await get_step_states(task_id) if step_states.get(step_name) != STATE_WAITING: result["message"] = f"步骤 {step_name} 不在等待状态" return json.dumps(result, ensure_ascii=False) # Save approval as artifact version = task.get("current_version", task.get("current_Version", 1)) if isinstance(version, str): version = int(version) approval_data = {"approved": True, "reviewer_id": reviewer_id, "comments": comments} await save_artifact(task_id, version, step_name, "output", approval_data) # Mark step completed await update_step_state(task_id, step_name, STATE_COMPLETED) await _update_human_task(task_id, step_name, HUMAN_APPROVED, approval_data, reviewer_id) # Resume execution await update_task_state(task_id, TASK_RUNNING) await resume_task(task_id) result["success"] = True result["message"] = f"步骤 {step_name} 审批通过,产线继续执行" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def approval_reject(tenant_id, task_id, step_name, reviewer_id, comments=None, rollback_to=None): """Reject an approval_gate step — optionally rollback to a previous step. Args: reviewer_id: who rejected comments: rejection reason rollback_to: step_name to rollback to (if None, just mark rejected) Returns: JSON string """ result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) step_states = await get_step_states(task_id) if step_states.get(step_name) != STATE_WAITING: result["message"] = f"步骤 {step_name} 不在等待状态" return json.dumps(result, ensure_ascii=False) rejection_data = {"approved": False, "reviewer_id": reviewer_id, "comments": comments} if rollback_to: # Rollback: reset target step and all steps between pipeline_id = task.get("pipeline_id", task.get("Pipeline_id", "")) step_records = await get_pipeline_steps(pipeline_id) step_graph = build_step_graph(step_records) # Get all steps from rollback_to to step_name (inclusive) affected = _get_steps_between(step_graph, rollback_to, step_name) if not affected: result["message"] = f"无法计算从 {rollback_to} 到 {step_name} 的回退路径" return json.dumps(result, ensure_ascii=False) # Reset affected steps to pending await reset_steps(task_id, affected) # Save rejection info as artifact on the rollback target version = task.get("current_version", task.get("current_Version", 1)) if isinstance(version, str): version = int(version) await save_artifact(task_id, version, rollback_to, "input", { "__rejection__": rejection_data }) # Resume from rollback point await update_task_state(task_id, TASK_RUNNING) await resume_task(task_id) result["success"] = True result["message"] = f"审批驳回,回退到 {rollback_to},重跑 {len(affected)} 个步骤" result["rerun_steps"] = affected else: # Just reject — task fails await update_step_state(task_id, step_name, STATE_REJECTED, comments) await _update_human_task(task_id, step_name, HUMAN_REJECTED, rejection_data, reviewer_id) await update_task_state(task_id, TASK_FAILED) result["success"] = True result["message"] = f"审批驳回,任务已标记失败" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def human_list(tenant_id=None, assignee_role=None, assignee_id=None, status=None): """List human tasks, optionally filtered by role/user/status. Returns: JSON string with human_tasks list """ result = {"success": False} try: from .storage import list_human_tasks tasks = await list_human_tasks(tenant_id, assignee_role, assignee_id, status) result["success"] = True result["tasks"] = tasks result["total"] = len(tasks) except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) def _get_steps_between(step_graph: dict, from_step: str, to_step: str) -> list: """BFS from from_step to to_step through dependents. Returns ordered list including both endpoints.""" if from_step not in step_graph or to_step not in step_graph: return [] visited = set() queue = [from_step] found = False while queue: current = queue.pop(0) if current in visited: continue visited.add(current) if current == to_step: found = True break for dep in step_graph.get(current, {}).get("dependents", []): if dep not in visited: queue.append(dep) if not found: return [] result = sorted(visited, key=lambda s: step_graph.get(s, {}).get("order", 999)) return result async def _update_human_task(task_id, step_name, status, result_data, operator_id=None): """Update the human_tasks record for this step.""" from .storage import update_human_task_record await update_human_task_record(task_id, step_name, status, result_data, operator_id)