From b9a5810d8570f14579a802e421deb12ed328530e Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Tue, 16 Jun 2026 11:05:45 +0800 Subject: [PATCH] =?UTF-8?q?feat(v3):=20human-in-the-loop=20=E2=80=94=20int?= =?UTF-8?q?eractive=20steps,=20pluggable=20step=5Ftype=20registry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New states: waiting (step/task), rejected (step) - New tables: pipeline_human_tasks, pipeline_step_types - New module: step_registry.py — pluggable step_type metadata - New module: human.py — human_complete, approval_approve, approval_reject - Executor: detects interactive step_types, creates human_tasks, enters waiting - Reject with rollback: approval_reject(rollback_to=step) resets steps and re-runs - API: human_task_complete, approval_approve, approval_reject, human_task_list - API: pipeline_step_types, pipeline_register_step_type, pipeline_unregister_step_type - Built-in interactive types: human_task, approval_gate - Updated DDL and appcodes --- README.md | 155 ++++++++++++++----- init/data.json | 51 ++++++- models/pipeline_human_tasks.json | 38 +++++ models/pipeline_step_types.json | 31 ++++ pipeline_service/__init__.py | 11 +- pipeline_service/executor.py | 89 +++++++++-- pipeline_service/human.py | 239 ++++++++++++++++++++++++++++++ pipeline_service/init.py | 52 ++++++- pipeline_service/state.py | 33 ++++- pipeline_service/step_registry.py | 81 ++++++++++ pipeline_service/storage.py | 106 +++++++++++++ pyproject.toml | 4 +- 12 files changed, 831 insertions(+), 59 deletions(-) create mode 100644 models/pipeline_human_tasks.json create mode 100644 models/pipeline_step_types.json create mode 100644 pipeline_service/human.py create mode 100644 pipeline_service/step_registry.py diff --git a/README.md b/README.md index 1d25a64..ef9b567 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,17 @@ -# pipeline-service 通用产线执行引擎 +# pipeline-service v3.0 — 通用产线执行引擎 ## 定位 通用产线执行引擎模块。把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。 +**v3.0 新增:** 人工交互步骤(human_task/approval_gate)、step_type 可装卸注册、多角色协作。 + ## 核心价值 - Hermes Agent 中用 cron/delegate/terminal 跑通的流程 → 固化为产线步骤定义 → pipeline-service 自动调度执行 - 一次验证,无限次自动执行 - 多租户并发:同一产线,不同租户同时使用,数据完全隔离 +- **人机协作:** 自动步骤 + 人工步骤混合执行,支持审批驳回回退 ## 架构 @@ -17,23 +20,117 @@ │ ├── load_pipeline_service() ← 注册函数到 ServerEnv │ - ├── pipeline_submit(tenant_id, pipeline_id, owner_id, title, params) - ├── pipeline_list(tenant_id, pipeline_id?) - ├── pipeline_detail(tenant_id, task_id) - ├── pipeline_node(tenant_id, task_id, step_name, version?) - ├── pipeline_modify(tenant_id, task_id, updates, rerun_from) - ├── pipeline_pause(tenant_id, task_id) - ├── pipeline_resume(tenant_id, task_id) - ├── pipeline_cancel(tenant_id, task_id) - └── pipeline_register_handler(step_type, fn) ← 注册步骤处理器 + ├── 任务生命周期 + │ ├── pipeline_submit(tenant_id, pipeline_id, owner_id, title, params) + │ ├── pipeline_list(tenant_id, pipeline_id?) + │ ├── pipeline_detail(tenant_id, task_id) + │ ├── pipeline_node(tenant_id, task_id, step_name, version?) + │ ├── pipeline_modify(tenant_id, task_id, updates, rerun_from) + │ ├── pipeline_pause(tenant_id, task_id) + │ ├── pipeline_resume(tenant_id, task_id) + │ └── pipeline_cancel(tenant_id, task_id) + │ + ├── 步骤类型注册(可装卸) + │ ├── pipeline_step_types() ← 列出所有类型 + │ ├── pipeline_register_step_type(type, meta) ← 注册新类型 + │ └── pipeline_unregister_step_type(type) ← 卸载类型 + │ + ├── 人工交互 + │ ├── human_task_complete(tenant_id, task_id, step_name, data, operator) + │ ├── approval_approve(tenant_id, task_id, step_name, reviewer, comments) + │ ├── approval_reject(tenant_id, task_id, step_name, reviewer, comments, rollback_to) + │ └── human_task_list(tenant_id?, role?, user?, status?) + │ + └── Handler管理 + └── pipeline_register_handler(step_type, fn) ``` ## 引擎工作原理 1. **提交任务** → 读取 pipeline_steps 表的步骤定义 → 创建 pipeline_task_steps 记录 → 启动执行 -2. **执行循环** → 解析 DAG 依赖图 → 找到可执行步骤(所有前置完成)→ 调用 handler → 存 artifact → 继续下一步 -3. **多租户** → 所有查询按 tenant_id 隔离 → 同一产线多租户并发不冲突 -4. **人工干预** → 修改节点 artifact → 创建新版本 → BFS 计算受影响步骤 → 级联重跑 +2. **执行循环** → 解析 DAG 依赖图 → 找到可执行步骤 → 判断步骤类型: + - **自动步骤:** 调用 handler → 存 artifact → 继续下一步 + - **交互步骤(human_task/approval_gate):** 创建 human_tasks 记录 → 步骤进入 waiting → 任务进入 waiting +3. **人工完成** → 调用 human_task_complete/approval_approve → 步骤标记完成 → 恢复执行 +4. **审批驳回** → 调用 approval_reject(rollback_to=步骤名) → 回退指定步骤 → 级联重跑 +5. **多租户** → 所有查询按 tenant_id 隔离 + +## 状态机 + +### 任务状态 +``` +submitted → running → completed + → failed + → paused → running (resume) + → waiting → running (human complete) + → cancelled +``` + +### 步骤状态 +``` +pending → running → completed + → failed + → skipped + → waiting → completed (human complete) + → rejected (approval reject) +``` + +## step_type 可装卸 + +每条产线可以注册自己的 step_type,引擎按类型匹配 handler 和交互协议。 + +```python +# 注册一个 SDLC 产线的步骤类型 +from pipeline_service import register_step_type, register_handler + +# 注册自动步骤 +register_step_type("code_review_auto", { + "display_name": "自动代码审查", + "category": "devops", + "is_interactive": False, +}) + +# 注册人工步骤 +register_step_type("code_review_manual", { + "display_name": "人工代码审查", + "category": "interactive", + "is_interactive": True, + "form_schema": { + "type": "object", + "properties": { + "approved": {"type": "boolean", "title": "是否通过"}, + "comments": {"type": "string", "title": "审查意见"} + } + }, + "timeout_hours": 48, + "on_timeout": "escalate", +}) + +# 注册 handler(自动步骤需要,交互步骤不需要) +register_handler("code_review_auto", auto_review_handler) +``` + +### 内置交互类型 + +| step_type | 用途 | 行为 | +|-----------|------|------| +| human_task | 人工填写表单/执行操作 | 步骤 waiting → 人提交 → 继续 | +| approval_gate | 审批关卡 | 步骤 waiting → 通过继续 / 驳回回退 | + +### 步骤定义中的配置 + +在 pipeline_steps 表的 step_config JSON 中指定交互参数: + +```json +{ + "deps": ["develop"], + "assignee_role": "reviewer", + "assignee_id": "user123", + "form_schema": {"type": "object", "properties": {...}}, + "timeout_hours": 48, + "on_timeout": "escalate" +} +``` ## 数据表 @@ -42,25 +139,11 @@ | pipeline_tasks | 任务主表(tenant_id 隔离) | | pipeline_task_steps | 任务步骤执行记录 | | pipeline_artifacts | 步骤产物(input/output,支持版本) | +| **pipeline_human_tasks** | **人工任务记录(v3新增)** | +| **pipeline_step_types** | **步骤类型注册表(v3新增)** | | pipeline_steps | 产线步骤定义(由 pipeline_core 模块管理) | | pipelines | 产线定义(由 pipeline_core 模块管理) | -## 步骤处理器 - -可插拔注册,统一接口: - -```python -async def handler(tenant_id, task_id, step_name, input_data, config) -> dict: - # 处理逻辑 - return output_data - -# 注册 -from pipeline_service import register_handler -register_handler("llm_generate", handler) -``` - -处理器按 `step_type` 匹配。步骤定义中的 `step_type` 对应 handler 注册名。 - ## 宿主集成 任何应用只需一行代码即可使用: @@ -76,7 +159,7 @@ load_pipeline_service() - 前端交互(bricks 管) - 产线定义和定价(pipeline_core/ops/dist 管) -pipeline-service 只做:调度 + 执行 + 存储。 +pipeline-service 只做:调度 + 执行 + 存储 + 人工交互。 ## 目录结构 @@ -87,18 +170,20 @@ pipeline-service/ │ ├── init.py # load_pipeline_service() + ServerEnv 注册 │ ├── state.py # DAG 解析、步骤状态机 │ ├── handler.py # 步骤处理器注册表 +│ ├── step_registry.py # 步骤类型注册表(v3新增) +│ ├── human.py # 人工任务操作(v3新增) │ ├── storage.py # MySQL 存储层(sqlor) -│ └── executor.py # 执行循环 +│ ├── executor.py # 执行循环 +│ └── handlers_ktv.py # KTV产线专用 handlers ├── models/ │ ├── pipeline_tasks.json │ ├── pipeline_task_steps.json -│ └── pipeline_artifacts.json +│ ├── pipeline_artifacts.json +│ ├── pipeline_human_tasks.json (v3新增) +│ └── pipeline_step_types.json (v3新增) ├── init/ │ └── data.json # appcodes 初始化数据 -├── scripts/ -│ └── load_path.py # RBAC 权限注册 ├── pyproject.toml -├── build.sh └── README.md ``` diff --git a/init/data.json b/init/data.json index 0b4f724..8976674 100644 --- a/init/data.json +++ b/init/data.json @@ -9,7 +9,8 @@ {"k": "completed", "v": "已完成"}, {"k": "failed", "v": "失败"}, {"k": "paused", "v": "已暂停"}, - {"k": "cancelled", "v": "已取消"} + {"k": "cancelled", "v": "已取消"}, + {"k": "waiting", "v": "等待人工"} ] }, { @@ -20,7 +21,9 @@ {"k": "running", "v": "执行中"}, {"k": "completed", "v": "已完成"}, {"k": "failed", "v": "失败"}, - {"k": "skipped", "v": "已跳过"} + {"k": "skipped", "v": "已跳过"}, + {"k": "waiting", "v": "等待人工"}, + {"k": "rejected", "v": "已驳回"} ] }, { @@ -42,7 +45,49 @@ {"k": "api_call", "v": "外部API调用"}, {"k": "file_process", "v": "文件处理"}, {"k": "composite", "v": "合成"}, - {"k": "custom", "v": "自定义"} + {"k": "custom", "v": "自定义"}, + {"k": "human_task", "v": "人工任务"}, + {"k": "approval_gate", "v": "审批关卡"} + ] + }, + { + "parentid": "human_task_type", + "parentname": "人工任务类型", + "items": [ + {"k": "human_task", "v": "人工任务"}, + {"k": "approval_gate", "v": "审批关卡"} + ] + }, + { + "parentid": "human_task_status", + "parentname": "人工任务状态", + "items": [ + {"k": "pending", "v": "待处理"}, + {"k": "submitted", "v": "已提交"}, + {"k": "approved", "v": "已通过"}, + {"k": "rejected", "v": "已驳回"}, + {"k": "expired", "v": "已过期"} + ] + }, + { + "parentid": "step_type_category", + "parentname": "步骤类型分类", + "items": [ + {"k": "media", "v": "媒体处理"}, + {"k": "llm", "v": "AI生成"}, + {"k": "interactive", "v": "人工交互"}, + {"k": "devops", "v": "开发运维"}, + {"k": "testing", "v": "测试验证"}, + {"k": "general", "v": "通用"} + ] + }, + { + "parentid": "timeout_policy", + "parentname": "超时策略", + "items": [ + {"k": "skip", "v": "跳过"}, + {"k": "escalate", "v": "升级通知"}, + {"k": "fail", "v": "标记失败"} ] } ] diff --git a/models/pipeline_human_tasks.json b/models/pipeline_human_tasks.json new file mode 100644 index 0000000..60a39a6 --- /dev/null +++ b/models/pipeline_human_tasks.json @@ -0,0 +1,38 @@ +{ + "summary": [ + { + "name": "pipeline_human_tasks", + "title": "人工任务表", + "primary": ["id"], + "catelog": "entity" + } + ], + "fields": [ + {"name": "id", "title": "主键ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "task_id", "title": "产线任务ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "step_name", "title": "步骤名称", "type": "str", "length": 64, "nullable": "no"}, + {"name": "version", "title": "版本号", "type": "int", "nullable": "no", "default": "1"}, + {"name": "task_type", "title": "任务类型", "type": "str", "length": 32, "nullable": "no"}, + {"name": "assignee_role", "title": "指派角色", "type": "str", "length": 64}, + {"name": "assignee_id", "title": "指派人ID", "type": "str", "length": 32}, + {"name": "form_schema", "title": "表单Schema(JSON)", "type": "longtext"}, + {"name": "result_data", "title": "提交结果(JSON)", "type": "longtext"}, + {"name": "status", "title": "任务状态", "type": "str", "length": 32, "nullable": "no", "default": "pending"}, + {"name": "submitted_by", "title": "提交人ID", "type": "str", "length": 32}, + {"name": "submitted_at", "title": "提交时间", "type": "timestamp"}, + {"name": "comments", "title": "备注", "type": "text"}, + {"name": "created_at", "title": "创建时间", "type": "timestamp", "nullable": "no"}, + {"name": "expired_at", "title": "过期时间", "type": "timestamp"} + ], + "indexes": [ + {"name": "idx_pht_task", "idxtype": "index", "idxfields": ["task_id"]}, + {"name": "idx_pht_step", "idxtype": "index", "idxfields": ["task_id", "step_name"]}, + {"name": "idx_pht_role", "idxtype": "index", "idxfields": ["assignee_role"]}, + {"name": "idx_pht_assignee", "idxtype": "index", "idxfields": ["assignee_id"]}, + {"name": "idx_pht_status", "idxtype": "index", "idxfields": ["status"]} + ], + "codes": [ + {"field": "task_type", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='human_task_type'"}, + {"field": "status", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='human_task_status'"} + ] +} diff --git a/models/pipeline_step_types.json b/models/pipeline_step_types.json new file mode 100644 index 0000000..fe01af2 --- /dev/null +++ b/models/pipeline_step_types.json @@ -0,0 +1,31 @@ +{ + "summary": [ + { + "name": "pipeline_step_types", + "title": "步骤类型注册表", + "primary": ["step_type"], + "catelog": "entity" + } + ], + "fields": [ + {"name": "step_type", "title": "步骤类型Key", "type": "str", "length": 64, "nullable": "no"}, + {"name": "display_name", "title": "显示名称", "type": "str", "length": 128, "nullable": "no"}, + {"name": "category", "title": "分类", "type": "str", "length": 32, "nullable": "no"}, + {"name": "is_interactive", "title": "是否交互式", "type": "int", "nullable": "no", "default": "0"}, + {"name": "description", "title": "描述", "type": "text"}, + {"name": "form_schema", "title": "表单Schema(JSON)", "type": "longtext"}, + {"name": "on_timeout", "title": "超时策略", "type": "str", "length": 16, "default": "fail"}, + {"name": "timeout_hours", "title": "超时小时数", "type": "int", "default": "72"}, + {"name": "pipeline_id", "title": "所属产线(NULL=全局)", "type": "str", "length": 32}, + {"name": "created_at", "title": "创建时间", "type": "timestamp", "nullable": "no"}, + {"name": "updated_at", "title": "更新时间", "type": "timestamp", "nullable": "no"} + ], + "indexes": [ + {"name": "idx_pst_category", "idxtype": "index", "idxfields": ["category"]}, + {"name": "idx_pst_pipeline", "idxtype": "index", "idxfields": ["pipeline_id"]} + ], + "codes": [ + {"field": "category", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='step_type_category'"}, + {"field": "on_timeout", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='timeout_policy'"} + ] +} diff --git a/pipeline_service/__init__.py b/pipeline_service/__init__.py index 4e5e7ff..9262e24 100644 --- a/pipeline_service/__init__.py +++ b/pipeline_service/__init__.py @@ -11,12 +11,19 @@ from .init import ( pipeline_resume, pipeline_cancel, pipeline_handlers, + pipeline_step_types, + pipeline_register_step_type, + pipeline_unregister_step_type, ) from .handler import register_handler, list_handlers, register_default_handler from .handlers_ktv import register_ktv_handlers +from .step_registry import register_step_type, get_step_type, list_step_types, load_builtin_types +from .human import human_complete, approval_approve, approval_reject, human_list from .state import ( STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, STATE_SKIPPED, - TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, + STATE_WAITING, STATE_REJECTED, + TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, TASK_WAITING, + HUMAN_PENDING, HUMAN_SUBMITTED, HUMAN_APPROVED, HUMAN_REJECTED, HUMAN_EXPIRED, ) -__version__ = "2.0.0" +__version__ = "3.0.0" diff --git a/pipeline_service/executor.py b/pipeline_service/executor.py index 9333f49..df4c7e5 100644 --- a/pipeline_service/executor.py +++ b/pipeline_service/executor.py @@ -1,22 +1,28 @@ -"""Pipeline execution engine - schedules and runs steps.""" +"""Pipeline execution engine - schedules and runs steps. + +Supports both automatic steps (machine handlers) and interactive steps +(human_task, approval_gate) that pause execution waiting for human input. +""" import asyncio +import json import logging import traceback from typing import Dict from .state import ( - STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, - TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, - build_step_graph, find_next_step, + STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, STATE_WAITING, + TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_WAITING, + build_step_graph, find_next_step, has_waiting_steps, check_all_completed, check_any_failed, ) from .storage import ( get_pipeline_steps, get_step_states, update_task_state, update_step_state, - save_artifact, get_artifact, + save_artifact, get_artifact, create_human_task, ) from .handler import get_handler +from .step_registry import is_interactive, get_step_type logger = logging.getLogger("pipeline.executor") @@ -32,7 +38,7 @@ async def start_task(task_id: str): async def resume_task(task_id: str): - """Resume a paused task.""" + """Resume a paused or waiting task.""" task = asyncio.create_task(_run_task(task_id)) _active_tasks[task_id] = task return task @@ -100,10 +106,16 @@ async def _run_task(task_id: str): logger.warning(f"Task {task_id} failed (step failure)") break + # Check if any step is waiting for human input + if has_waiting_steps(step_states): + await update_task_state(task_id, TASK_WAITING) + logger.info(f"Task {task_id} waiting for human input") + break + # Find next executable step next_step = find_next_step(step_graph, step_states) if not next_step: - # No executable step but not all completed - deadlock or waiting + # No executable step and no waiting steps - deadlock logger.warning(f"Task {task_id}: no executable step, states={step_states}") await update_task_state(task_id, TASK_FAILED) break @@ -134,7 +146,16 @@ async def _get_task_raw(task_id: str) -> dict: async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_info: dict): - """Execute a single step.""" + """Execute a single step. + + For interactive step_types (human_task, approval_gate): + - Creates a human_tasks record + - Sets step to WAITING state + - Execution loop will detect waiting and pause the task + + For automatic step_types: + - Calls handler, saves artifact, marks completed + """ step_info = step_graph[step_name] step_type = step_info["step_type"] version = task_info.get('current_version', task_info.get('current_Version', 1)) @@ -149,6 +170,13 @@ async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_inf # Save input artifact await save_artifact(task_id, version, step_name, "input", input_data) + # Check if this is an interactive step type + if is_interactive(step_type): + await _handle_interactive_step( + task_id, step_name, step_type, version, input_data, step_info + ) + return + # Look up handler by step_type handler = get_handler(step_type) if not handler: @@ -156,8 +184,16 @@ async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_inf if not handler: raise ValueError(f"No handler for step_type '{step_type}' and no default handler") + # Load step config for handler + step_config = step_info.get("step_config", {}) + if isinstance(step_config, str): + try: + step_config = json.loads(step_config) + except (json.JSONDecodeError, TypeError): + step_config = {} + # Execute handler - output_data = await handler(tenant_id, task_id, step_name, input_data, {}) + output_data = await handler(tenant_id, task_id, step_name, input_data, step_config) # Save output artifact await save_artifact(task_id, version, step_name, "output", output_data) @@ -170,6 +206,41 @@ async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_inf await update_step_state(task_id, step_name, STATE_FAILED, error_msg) +async def _handle_interactive_step(task_id, step_name, step_type, version, input_data, step_info): + """Handle an interactive step — create human task record and enter WAITING.""" + # Get step type metadata + meta = get_step_type(step_type) or {} + + # Extract assignment info from step_config + step_config = step_info.get("step_config", {}) + if isinstance(step_config, str): + try: + step_config = json.loads(step_config) + except (json.JSONDecodeError, TypeError): + step_config = {} + + assignee_role = step_config.get("assignee_role", "") + assignee_id = step_config.get("assignee_id", "") + form_schema = step_config.get("form_schema") or meta.get("form_schema") + timeout_hours = step_config.get("timeout_hours") or meta.get("timeout_hours") + + # Create human task record + await create_human_task( + task_id=task_id, + step_name=step_name, + version=version, + task_type=step_type, + assignee_role=assignee_role, + assignee_id=assignee_id, + form_schema=form_schema, + timeout_hours=timeout_hours, + ) + + # Set step to waiting + await update_step_state(task_id, step_name, STATE_WAITING) + logger.info(f"Step {step_name} waiting for human input (type={step_type}, role={assignee_role})") + + async def _gather_inputs(task_id: str, version: int, deps: list) -> dict: """Gather input data from dependency step outputs.""" inputs = {} diff --git a/pipeline_service/human.py b/pipeline_service/human.py new file mode 100644 index 0000000..339f772 --- /dev/null +++ b/pipeline_service/human.py @@ -0,0 +1,239 @@ +"""Human task operations — complete, reject, approve. + +When the executor encounters an interactive step_type (human_task/approval_gate), +it creates a pipeline_human_tasks record and puts the step into WAITING state. +This module handles the human-side operations to resume execution. +""" + +import json +import logging +from appPublic.uniqueID import getID + +from .state import ( + STATE_WAITING, STATE_COMPLETED, STATE_REJECTED, STATE_PENDING, + TASK_RUNNING, TASK_WAITING, TASK_FAILED, + HUMAN_PENDING, HUMAN_SUBMITTED, HUMAN_APPROVED, HUMAN_REJECTED, + build_step_graph, find_next_step, get_rerun_from_next, +) +from .storage import ( + get_task, get_step_states, get_pipeline_steps, + update_step_state, update_task_state, save_artifact, + reset_steps, +) +from .executor import resume_task + +logger = logging.getLogger("pipeline.human") + + +async def human_complete(tenant_id, task_id, step_name, result_data, operator_id=None): + """Complete a human_task step — submit form data and resume execution. + + Args: + tenant_id: tenant + task_id: task + step_name: the waiting step + result_data: dict — form submission data + operator_id: who completed it + + Returns: + JSON string + """ + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + # Verify step is waiting + step_states = await get_step_states(task_id) + if step_states.get(step_name) != STATE_WAITING: + result["message"] = f"步骤 {step_name} 不在等待状态 (当前: {step_states.get(step_name)})" + return json.dumps(result, ensure_ascii=False) + + # Save result as output artifact + version = task.get("current_version", task.get("current_Version", 1)) + if isinstance(version, str): + version = int(version) + await save_artifact(task_id, version, step_name, "output", result_data) + + # Mark step completed + await update_step_state(task_id, step_name, STATE_COMPLETED) + + # Update human_tasks record + await _update_human_task(task_id, step_name, HUMAN_SUBMITTED, result_data, operator_id) + + # Resume execution + await update_task_state(task_id, TASK_RUNNING) + await resume_task(task_id) + + result["success"] = True + result["message"] = f"步骤 {step_name} 已完成,产线继续执行" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def approval_approve(tenant_id, task_id, step_name, reviewer_id, comments=None): + """Approve an approval_gate step. + + Args: + reviewer_id: who approved + comments: optional comments + + Returns: + JSON string + """ + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + step_states = await get_step_states(task_id) + if step_states.get(step_name) != STATE_WAITING: + result["message"] = f"步骤 {step_name} 不在等待状态" + return json.dumps(result, ensure_ascii=False) + + # Save approval as artifact + version = task.get("current_version", task.get("current_Version", 1)) + if isinstance(version, str): + version = int(version) + approval_data = {"approved": True, "reviewer_id": reviewer_id, "comments": comments} + await save_artifact(task_id, version, step_name, "output", approval_data) + + # Mark step completed + await update_step_state(task_id, step_name, STATE_COMPLETED) + await _update_human_task(task_id, step_name, HUMAN_APPROVED, approval_data, reviewer_id) + + # Resume execution + await update_task_state(task_id, TASK_RUNNING) + await resume_task(task_id) + + result["success"] = True + result["message"] = f"步骤 {step_name} 审批通过,产线继续执行" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def approval_reject(tenant_id, task_id, step_name, reviewer_id, comments=None, rollback_to=None): + """Reject an approval_gate step — optionally rollback to a previous step. + + Args: + reviewer_id: who rejected + comments: rejection reason + rollback_to: step_name to rollback to (if None, just mark rejected) + + Returns: + JSON string + """ + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + step_states = await get_step_states(task_id) + if step_states.get(step_name) != STATE_WAITING: + result["message"] = f"步骤 {step_name} 不在等待状态" + return json.dumps(result, ensure_ascii=False) + + rejection_data = {"approved": False, "reviewer_id": reviewer_id, "comments": comments} + + if rollback_to: + # Rollback: reset target step and all steps between + pipeline_id = task.get("pipeline_id", task.get("Pipeline_id", "")) + step_records = await get_pipeline_steps(pipeline_id) + step_graph = build_step_graph(step_records) + + # Get all steps from rollback_to to step_name (inclusive) + affected = _get_steps_between(step_graph, rollback_to, step_name) + if not affected: + result["message"] = f"无法计算从 {rollback_to} 到 {step_name} 的回退路径" + return json.dumps(result, ensure_ascii=False) + + # Reset affected steps to pending + await reset_steps(task_id, affected) + + # Save rejection info as artifact on the rollback target + version = task.get("current_version", task.get("current_Version", 1)) + if isinstance(version, str): + version = int(version) + await save_artifact(task_id, version, rollback_to, "input", { + "__rejection__": rejection_data + }) + + # Resume from rollback point + await update_task_state(task_id, TASK_RUNNING) + await resume_task(task_id) + + result["success"] = True + result["message"] = f"审批驳回,回退到 {rollback_to},重跑 {len(affected)} 个步骤" + result["rerun_steps"] = affected + else: + # Just reject — task fails + await update_step_state(task_id, step_name, STATE_REJECTED, comments) + await _update_human_task(task_id, step_name, HUMAN_REJECTED, rejection_data, reviewer_id) + await update_task_state(task_id, TASK_FAILED) + + result["success"] = True + result["message"] = f"审批驳回,任务已标记失败" + + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def human_list(tenant_id=None, assignee_role=None, assignee_id=None, status=None): + """List human tasks, optionally filtered by role/user/status. + + Returns: + JSON string with human_tasks list + """ + result = {"success": False} + try: + from .storage import list_human_tasks + tasks = await list_human_tasks(tenant_id, assignee_role, assignee_id, status) + result["success"] = True + result["tasks"] = tasks + result["total"] = len(tasks) + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +def _get_steps_between(step_graph: dict, from_step: str, to_step: str) -> list: + """BFS from from_step to to_step through dependents. Returns ordered list including both endpoints.""" + if from_step not in step_graph or to_step not in step_graph: + return [] + + visited = set() + queue = [from_step] + found = False + + while queue: + current = queue.pop(0) + if current in visited: + continue + visited.add(current) + if current == to_step: + found = True + break + for dep in step_graph.get(current, {}).get("dependents", []): + if dep not in visited: + queue.append(dep) + + if not found: + return [] + + result = sorted(visited, key=lambda s: step_graph.get(s, {}).get("order", 999)) + return result + + +async def _update_human_task(task_id, step_name, status, result_data, operator_id=None): + """Update the human_tasks record for this step.""" + from .storage import update_human_task_record + await update_human_task_record(task_id, step_name, status, result_data, operator_id) diff --git a/pipeline_service/init.py b/pipeline_service/init.py index ed7db20..9553cbe 100644 --- a/pipeline_service/init.py +++ b/pipeline_service/init.py @@ -2,6 +2,7 @@ 把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。 支持多租户隔离、DAG 步骤调度、可插拔步骤处理器、artifact 版本管理。 +支持人工交互步骤(human_task/approval_gate):人机协作产线。 任何宿主应用都可以通过 load_pipeline_service() 加载本模块。 """ @@ -12,7 +13,7 @@ from appPublic.uniqueID import getID from appPublic.log import debug from .state import ( - TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, + TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, TASK_WAITING, build_step_graph, get_cascade_rerun_steps, get_rerun_from_next, ) from .storage import ( @@ -20,13 +21,19 @@ from .storage import ( get_artifact, get_all_artifacts, list_tasks, update_task_state, update_task_version, get_pipeline_steps, reset_steps, save_artifact, + get_human_task, ) from .executor import start_task, resume_task, stop_task, is_running from .handler import register_handler, list_handlers, register_default_handler +from .step_registry import ( + register_step_type, get_step_type, list_step_types, + unregister_step_type, load_builtin_types, +) +from .human import human_complete, approval_approve, approval_reject, human_list from .handlers_ktv import register_ktv_handlers MODULE_NAME = "pipeline_service" -MODULE_VERSION = "2.0.0" +MODULE_VERSION = "3.0.0" async def pipeline_submit(tenant_id, pipeline_id, owner_id, title, params=None): @@ -94,6 +101,14 @@ async def pipeline_detail(tenant_id, task_id): return json.dumps(result, ensure_ascii=False) steps = await get_task_steps(task_id) + + # Enrich steps with human task info for interactive steps + for step in steps: + if step.get('state') == 'waiting': + ht = await get_human_task(task_id, step['step_name']) + if ht: + step['human_task'] = ht + task["steps"] = steps task["is_running"] = is_running(task_id) @@ -258,6 +273,23 @@ def pipeline_handlers(): return json.dumps(list_handlers(), ensure_ascii=False) +def pipeline_step_types(): + """查看所有注册的步骤类型(含元数据)。""" + return json.dumps(list_step_types(), ensure_ascii=False) + + +def pipeline_register_step_type(step_type, metadata): + """注册步骤类型(可装卸)。""" + register_step_type(step_type, metadata) + return json.dumps({"success": True, "step_type": step_type}, ensure_ascii=False) + + +def pipeline_unregister_step_type(step_type): + """卸载步骤类型。""" + unregister_step_type(step_type) + return json.dumps({"success": True, "step_type": step_type}, ensure_ascii=False) + + def load_pipeline_service(): """注册所有函数到 ServerEnv。任何宿主应用调用此函数即可使用产线引擎。""" env = ServerEnv() @@ -276,11 +308,25 @@ def load_pipeline_service(): env.pipeline_register_handler = register_handler env.pipeline_handlers = pipeline_handlers + # Step type registry (pluggable) + env.pipeline_step_types = pipeline_step_types + env.pipeline_register_step_type = pipeline_register_step_type + env.pipeline_unregister_step_type = pipeline_unregister_step_type + + # Human task operations + env.human_task_complete = human_complete + env.approval_approve = approval_approve + env.approval_reject = approval_reject + env.human_task_list = human_list + # Register default handler register_default_handler() + # Load built-in interactive step types + load_builtin_types() + # Register KTV handlers register_ktv_handlers() - debug(f"[{MODULE_NAME}] v{MODULE_VERSION} loaded — generic pipeline execution engine") + debug(f"[{MODULE_NAME}] v{MODULE_VERSION} loaded — pipeline engine with human-in-the-loop support") return True diff --git a/pipeline_service/state.py b/pipeline_service/state.py index 29b5267..9bfaead 100644 --- a/pipeline_service/state.py +++ b/pipeline_service/state.py @@ -9,6 +9,8 @@ STATE_RUNNING = "running" STATE_COMPLETED = "completed" STATE_FAILED = "failed" STATE_SKIPPED = "skipped" +STATE_WAITING = "waiting" # 等待人工输入 +STATE_REJECTED = "rejected" # 审批驳回 # Pipeline task states TASK_SUBMITTED = "submitted" @@ -17,6 +19,14 @@ TASK_COMPLETED = "completed" TASK_FAILED = "failed" TASK_PAUSED = "paused" TASK_CANCELLED = "cancelled" +TASK_WAITING = "waiting" # 等待人工步骤完成 + +# Human task statuses +HUMAN_PENDING = "pending" +HUMAN_SUBMITTED = "submitted" +HUMAN_APPROVED = "approved" +HUMAN_REJECTED = "rejected" +HUMAN_EXPIRED = "expired" def build_step_graph(step_records: list) -> Dict[str, dict]: @@ -57,21 +67,24 @@ def build_step_graph(step_records: list) -> Dict[str, dict]: def find_next_step(step_graph: Dict[str, dict], step_states: Dict[str, str]) -> Optional[str]: - """Find the next step to execute: pending + all deps completed. + """Find the next step to execute: pending + all deps completed/skipped. Args: step_graph: from build_step_graph() step_states: {step_name: current_state} Returns: - step_name or None if all done / blocked + step_name or None if all done / blocked / waiting """ + # Terminal states that count as "done" for dependency resolution + done_states = {STATE_COMPLETED, STATE_SKIPPED} + candidates = [] for name, info in step_graph.items(): if step_states.get(name) != STATE_PENDING: continue deps_ok = all( - step_states.get(dep) == STATE_COMPLETED + step_states.get(dep) in done_states for dep in info["deps"] ) if deps_ok: @@ -85,6 +98,16 @@ def find_next_step(step_graph: Dict[str, dict], step_states: Dict[str, str]) -> return candidates[0] +def has_waiting_steps(step_states: Dict[str, str]) -> bool: + """Check if any step is in waiting state.""" + return any(s == STATE_WAITING for s in step_states.values()) + + +def has_rejected_steps(step_states: Dict[str, str]) -> bool: + """Check if any step has been rejected.""" + return any(s == STATE_REJECTED for s in step_states.values()) + + def get_cascade_rerun_steps(step_graph: Dict[str, dict], from_step: str) -> List[str]: """BFS from modified step through dependents. Returns ordered list.""" if from_step not in step_graph: @@ -132,5 +155,5 @@ def check_all_completed(step_states: Dict[str, str]) -> bool: def check_any_failed(step_states: Dict[str, str]) -> bool: - """Check if any step has failed.""" - return any(s == STATE_FAILED for s in step_states.values()) + """Check if any step has failed or been rejected.""" + return any(s in (STATE_FAILED, STATE_REJECTED) for s in step_states.values()) diff --git a/pipeline_service/step_registry.py b/pipeline_service/step_registry.py new file mode 100644 index 0000000..ff817a7 --- /dev/null +++ b/pipeline_service/step_registry.py @@ -0,0 +1,81 @@ +"""Step type registry — pluggable step_type metadata. + +Each pipeline can define its own step_types. The registry tracks: +- handler function (already in handler.py) +- metadata: display_name, category, is_interactive, form_schema, on_timeout + +Built-in interactive types: human_task, approval_gate +""" + +import logging +from typing import Dict, Optional + +logger = logging.getLogger("pipeline.step_registry") + +# step_type -> metadata dict +_REGISTRY: Dict[str, dict] = {} + +# Built-in interactive step types +BUILTIN_INTERACTIVE = { + "human_task": { + "display_name": "人工任务", + "category": "interactive", + "is_interactive": True, + "description": "需要人工填写表单或执行操作后继续", + }, + "approval_gate": { + "display_name": "审批关卡", + "category": "interactive", + "is_interactive": True, + "description": "需要审批人通过后继续,可驳回", + }, +} + + +def register_step_type(step_type: str, metadata: dict): + """Register a step type with metadata. + + Args: + step_type: unique key matching pipeline_steps.step_type + metadata: dict with keys: + - display_name (str): 显示名称 + - category (str): 分类 (media/llm/interactive/devops/...) + - is_interactive (bool): 是否需要人工介入, default False + - description (str): 描述 + - form_schema (dict): 人工任务表单JSON Schema (可选) + - on_timeout (str): 超时策略 skip/escalate/fail (可选) + - timeout_hours (int): 超时小时数 (可选) + """ + existing = _REGISTRY.get(step_type, {}) + existing.update(metadata) + _REGISTRY[step_type] = existing + logger.info(f"Registered step_type: {step_type} (interactive={metadata.get('is_interactive', False)})") + + +def get_step_type(step_type: str) -> Optional[dict]: + """Get step type metadata. Returns None if not registered.""" + return _REGISTRY.get(step_type) + + +def is_interactive(step_type: str) -> bool: + """Check if a step type requires human interaction.""" + meta = _REGISTRY.get(step_type, {}) + return meta.get("is_interactive", False) + + +def list_step_types() -> Dict[str, dict]: + """List all registered step types with metadata.""" + return dict(_REGISTRY) + + +def unregister_step_type(step_type: str): + """Remove a step type from registry.""" + removed = _REGISTRY.pop(step_type, None) + if removed: + logger.info(f"Unregistered step_type: {step_type}") + + +def load_builtin_types(): + """Load built-in interactive step types.""" + for st, meta in BUILTIN_INTERACTIVE.items(): + register_step_type(st, meta) diff --git a/pipeline_service/storage.py b/pipeline_service/storage.py index 56e2c4f..cdec6f4 100644 --- a/pipeline_service/storage.py +++ b/pipeline_service/storage.py @@ -234,3 +234,109 @@ async def reset_steps(task_id: str, step_names: list): "id": rec_id, "state": "pending", "error_msg": None, "started_at": None, "completed_at": None }) + + +# ── Human tasks storage ── + +async def create_human_task(task_id, step_name, version, task_type, assignee_role=None, + assignee_id=None, form_schema=None, timeout_hours=None): + """Create a pipeline_human_tasks record.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + data = { + "id": getID(), + "task_id": task_id, + "step_name": step_name, + "version": version, + "task_type": task_type, + "assignee_role": assignee_role or "", + "assignee_id": assignee_id or "", + "form_schema": json.dumps(form_schema, ensure_ascii=False) if form_schema else None, + "status": "pending", + } + if timeout_hours: + from datetime import datetime, timedelta + expired = datetime.now() + timedelta(hours=timeout_hours) + data["expired_at"] = expired.strftime("%Y-%m-%d %H:%M:%S") + await sor.C('pipeline_human_tasks', data) + return data["id"] + + +async def update_human_task_record(task_id, step_name, status, result_data=None, operator_id=None): + """Update a human_tasks record.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_human_tasks', {'task_id': task_id, 'step_name': step_name}) + if not recs: + return + rec = recs[0] + rec_id = rec.id if hasattr(rec, 'id') else rec['id'] + data = {"id": rec_id, "status": status} + if result_data is not None: + data["result_data"] = json.dumps(result_data, ensure_ascii=False, default=str) + if operator_id: + data["submitted_by"] = operator_id + data["submitted_at"] = "NOW()" + await sor.U('pipeline_human_tasks', data) + + +async def list_human_tasks(tenant_id=None, assignee_role=None, assignee_id=None, status=None): + """List human tasks with optional filters.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + conditions = [] + params = {} + if assignee_role: + conditions.append("assignee_role=${assignee_role}$") + params["assignee_role"] = assignee_role + if assignee_id: + conditions.append("assignee_id=${assignee_id}$") + params["assignee_id"] = assignee_id + if status: + conditions.append("status=${status}$") + params["status"] = status + + if tenant_id: + # Join with pipeline_tasks to filter by tenant + conditions.append("task_id IN (SELECT id FROM pipeline_tasks WHERE tenant_id=${tenant_id}$)") + params["tenant_id"] = tenant_id + + where = " AND ".join(conditions) if conditions else "1=1" + sql = f"SELECT * FROM pipeline_human_tasks WHERE {where} ORDER BY created_at DESC" + recs = await sor.sqlExe(sql, params) + result = [] + for rec in (recs or []): + if hasattr(rec, '__dict__'): + d = {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')} + else: + d = dict(rec) + # Parse JSON fields + for field in ('form_schema', 'result_data'): + if d.get(field) and isinstance(d[field], str): + try: + d[field] = json.loads(d[field]) + except (json.JSONDecodeError, TypeError): + pass + result.append(d) + return result + + +async def get_human_task(task_id, step_name): + """Get a specific human task record.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_human_tasks', {'task_id': task_id, 'step_name': step_name}) + if not recs: + return None + rec = recs[0] + if hasattr(rec, '__dict__'): + d = {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')} + else: + d = dict(rec) + for field in ('form_schema', 'result_data'): + if d.get(field) and isinstance(d[field], str): + try: + d[field] = json.loads(d[field]) + except (json.JSONDecodeError, TypeError): + pass + return d diff --git a/pyproject.toml b/pyproject.toml index 8961ab9..5d40bb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "pipeline_service" -version = "2.0.0" -description = "通用产线执行引擎模块 — DAG调度、多租户隔离、可插拔步骤处理器、artifact版本管理" +version = "3.0.0" +description = "通用产线执行引擎模块 — DAG调度、多租户隔离、可插拔步骤处理器、人工交互步骤、artifact版本管理" requires-python = ">=3.8" dependencies = [ "sqlor",