"""pipeline_service - 通用产线执行引擎模块。 把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。 支持多租户隔离、DAG 步骤调度、可插拔步骤处理器、artifact 版本管理。 支持人工交互步骤(human_task/approval_gate):人机协作产线。 任何宿主应用都可以通过 load_pipeline_service() 加载本模块。 """ import json from ahserver.serverenv import ServerEnv from appPublic.uniqueID import getID from appPublic.log import debug from .state import ( TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, TASK_WAITING, build_step_graph, get_cascade_rerun_steps, get_rerun_from_next, ) from .storage import ( create_task, init_task_steps, get_task, get_task_steps, get_artifact, get_all_artifacts, list_tasks, update_task_state, update_task_version, get_pipeline_steps, reset_steps, save_artifact, get_human_task, ) from .executor import start_task, resume_task, stop_task, is_running from .handler import register_handler, list_handlers, register_default_handler from .step_registry import ( register_step_type, get_step_type, list_step_types, unregister_step_type, load_builtin_types, ) from .human import human_complete, approval_approve, approval_reject, human_list MODULE_NAME = "pipeline_service" MODULE_VERSION = "3.1.0" async def pipeline_submit(tenant_id, pipeline_id, owner_id, title, params=None): """提交新产线任务。 Args: tenant_id: 租户ID(由宿主应用提供,可以是 org_id、user_id 等) pipeline_id: 产线定义ID(来自 pipelines 表) owner_id: 提交人ID title: 任务标题 params: 提交参数(dict) Returns: JSON string with status, task_id """ result = {"success": False} try: if not tenant_id or not pipeline_id: result["message"] = "缺少 tenant_id 或 pipeline_id" return json.dumps(result, ensure_ascii=False) params = params or {} task_id = await create_task(tenant_id, pipeline_id, owner_id, title, params) # Read step definitions from pipeline_steps table step_records = await get_pipeline_steps(pipeline_id) if not step_records: result["message"] = f"产线 {pipeline_id} 没有步骤定义" return json.dumps(result, ensure_ascii=False) # Create step execution records await init_task_steps(task_id, step_records) # Start execution await start_task(task_id) result["success"] = True result["task_id"] = task_id result["message"] = "任务已提交并开始执行" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False) async def pipeline_list(tenant_id, pipeline_id=None, limit=100): """查询租户的任务列表。""" result = {"success": False} try: tasks = await list_tasks(tenant_id, pipeline_id, limit) result["success"] = True result["tasks"] = tasks result["total"] = len(tasks) except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def pipeline_detail(tenant_id, task_id): """获取任务详情 + 步骤状态树。""" result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) steps = await get_task_steps(task_id) # Enrich steps with human task info for interactive steps for step in steps: if step.get('state') == 'waiting': ht = await get_human_task(task_id, step['step_name']) if ht: step['human_task'] = ht task["steps"] = steps task["is_running"] = is_running(task_id) result["success"] = True result["task"] = task except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def pipeline_node(tenant_id, task_id, step_name, version=None): """获取某节点某版本的 input/output artifact。""" result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) v = version or task.get("current_version", 1) if isinstance(v, str): v = int(v) input_data = await get_artifact(task_id, v, step_name, "input") output_data = await get_artifact(task_id, v, step_name, "output") result["success"] = True result["step_name"] = step_name result["version"] = v result["input"] = input_data result["output"] = output_data except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def pipeline_modify(tenant_id, task_id, updates, rerun_from="node"): """修改节点 artifact 并触发级联重跑。 Args: updates: {step_name: {content: {...}}, ...} rerun_from: "node" = 从该节点重跑, "next" = 从下游节点重跑 """ result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) if is_running(task_id): result["message"] = "任务正在执行中,请先暂停" return json.dumps(result, ensure_ascii=False) pipeline_id = task.get("pipeline_id", task.get("Pipeline_id", "")) current_version = task.get("current_version", task.get("current_Version", 1)) if isinstance(current_version, str): current_version = int(current_version) # Load step graph step_records = await get_pipeline_steps(pipeline_id) step_graph = build_step_graph(step_records) # Calculate affected steps all_rerun = set() for step_name in updates: if step_name not in step_graph: result["message"] = f"未知步骤: {step_name}" return json.dumps(result, ensure_ascii=False) if rerun_from == "node": affected = get_cascade_rerun_steps(step_graph, step_name) else: affected = get_rerun_from_next(step_graph, step_name) all_rerun.update(affected) # Create new version new_version = current_version + 1 await update_task_version(task_id, new_version) # Save modified artifacts for step_name, step_update in updates.items(): content = step_update.get("content", step_update) io_type = "input" if rerun_from == "node" else "output" await save_artifact(task_id, new_version, step_name, io_type, content) # Reset affected steps all_rerun_list = sorted(all_rerun, key=lambda s: step_graph.get(s, {}).get("order", 999)) await reset_steps(task_id, all_rerun_list) # Update task state and resume await update_task_state(task_id, TASK_RUNNING) await resume_task(task_id) result["success"] = True result["new_version"] = new_version result["rerun_steps"] = all_rerun_list result["message"] = f"创建 v{new_version},重跑 {len(all_rerun_list)} 个步骤" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False, default=str) async def pipeline_pause(tenant_id, task_id): """暂停任务。""" result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) await stop_task(task_id) await update_task_state(task_id, TASK_PAUSED) result["success"] = True result["message"] = "任务已暂停" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False) async def pipeline_resume(tenant_id, task_id): """恢复任务。""" result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) await update_task_state(task_id, TASK_RUNNING) await resume_task(task_id) result["success"] = True result["message"] = "任务已恢复" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False) async def pipeline_cancel(tenant_id, task_id): """取消任务。""" result = {"success": False} try: task = await get_task(tenant_id, task_id) if not task: result["message"] = "任务不存在" return json.dumps(result, ensure_ascii=False) await stop_task(task_id) await update_task_state(task_id, TASK_CANCELLED) result["success"] = True result["message"] = "任务已取消" except Exception as e: result["message"] = str(e) return json.dumps(result, ensure_ascii=False) def pipeline_handlers(): """查看已注册的步骤处理器。""" return json.dumps(list_handlers(), ensure_ascii=False) def pipeline_step_types(): """查看所有注册的步骤类型(含元数据)。""" return json.dumps(list_step_types(), ensure_ascii=False) def pipeline_register_step_type(step_type, metadata): """注册步骤类型(可装卸)。""" register_step_type(step_type, metadata) return json.dumps({"success": True, "step_type": step_type}, ensure_ascii=False) def pipeline_unregister_step_type(step_type): """卸载步骤类型。""" unregister_step_type(step_type) return json.dumps({"success": True, "step_type": step_type}, ensure_ascii=False) def load_pipeline_service(): """注册所有函数到 ServerEnv。任何宿主应用调用此函数即可使用产线引擎。""" env = ServerEnv() # Task lifecycle env.pipeline_submit = pipeline_submit env.pipeline_list = pipeline_list env.pipeline_detail = pipeline_detail env.pipeline_node = pipeline_node env.pipeline_modify = pipeline_modify env.pipeline_pause = pipeline_pause env.pipeline_resume = pipeline_resume env.pipeline_cancel = pipeline_cancel # Handler management env.pipeline_register_handler = register_handler env.pipeline_handlers = pipeline_handlers # Step type registry (pluggable) env.pipeline_step_types = pipeline_step_types env.pipeline_register_step_type = pipeline_register_step_type env.pipeline_unregister_step_type = pipeline_unregister_step_type # Human task operations env.human_task_complete = human_complete env.approval_approve = approval_approve env.approval_reject = approval_reject env.human_task_list = human_list # Register default handler register_default_handler() # Load built-in interactive step types load_builtin_types() debug(f"[{MODULE_NAME}] v{MODULE_VERSION} loaded — pipeline engine with human-in-the-loop support") return True