Hermes Agent b9a5810d85 feat(v3): human-in-the-loop — interactive steps, pluggable step_type registry
- New states: waiting (step/task), rejected (step)
- New tables: pipeline_human_tasks, pipeline_step_types
- New module: step_registry.py — pluggable step_type metadata
- New module: human.py — human_complete, approval_approve, approval_reject
- Executor: detects interactive step_types, creates human_tasks, enters waiting
- Reject with rollback: approval_reject(rollback_to=step) resets steps and re-runs
- API: human_task_complete, approval_approve, approval_reject, human_task_list
- API: pipeline_step_types, pipeline_register_step_type, pipeline_unregister_step_type
- Built-in interactive types: human_task, approval_gate
- Updated DDL and appcodes
2026-06-16 11:05:45 +08:00

203 lines
6.9 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# pipeline-service v3.0 — 通用产线执行引擎
## 定位
通用产线执行引擎模块。把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。
**v3.0 新增:** 人工交互步骤human_task/approval_gate、step_type 可装卸注册、多角色协作。
## 核心价值
- Hermes Agent 中用 cron/delegate/terminal 跑通的流程 → 固化为产线步骤定义 → pipeline-service 自动调度执行
- 一次验证,无限次自动执行
- 多租户并发:同一产线,不同租户同时使用,数据完全隔离
- **人机协作:** 自动步骤 + 人工步骤混合执行,支持审批驳回回退
## 架构
```
宿主应用 (pipeline-app / 其他)
├── load_pipeline_service() ← 注册函数到 ServerEnv
├── 任务生命周期
│ ├── pipeline_submit(tenant_id, pipeline_id, owner_id, title, params)
│ ├── pipeline_list(tenant_id, pipeline_id?)
│ ├── pipeline_detail(tenant_id, task_id)
│ ├── pipeline_node(tenant_id, task_id, step_name, version?)
│ ├── pipeline_modify(tenant_id, task_id, updates, rerun_from)
│ ├── pipeline_pause(tenant_id, task_id)
│ ├── pipeline_resume(tenant_id, task_id)
│ └── pipeline_cancel(tenant_id, task_id)
├── 步骤类型注册(可装卸)
│ ├── pipeline_step_types() ← 列出所有类型
│ ├── pipeline_register_step_type(type, meta) ← 注册新类型
│ └── pipeline_unregister_step_type(type) ← 卸载类型
├── 人工交互
│ ├── human_task_complete(tenant_id, task_id, step_name, data, operator)
│ ├── approval_approve(tenant_id, task_id, step_name, reviewer, comments)
│ ├── approval_reject(tenant_id, task_id, step_name, reviewer, comments, rollback_to)
│ └── human_task_list(tenant_id?, role?, user?, status?)
└── Handler管理
└── pipeline_register_handler(step_type, fn)
```
## 引擎工作原理
1. **提交任务** → 读取 pipeline_steps 表的步骤定义 → 创建 pipeline_task_steps 记录 → 启动执行
2. **执行循环** → 解析 DAG 依赖图 → 找到可执行步骤 → 判断步骤类型:
- **自动步骤:** 调用 handler → 存 artifact → 继续下一步
- **交互步骤human_task/approval_gate** 创建 human_tasks 记录 → 步骤进入 waiting → 任务进入 waiting
3. **人工完成** → 调用 human_task_complete/approval_approve → 步骤标记完成 → 恢复执行
4. **审批驳回** → 调用 approval_reject(rollback_to=步骤名) → 回退指定步骤 → 级联重跑
5. **多租户** → 所有查询按 tenant_id 隔离
## 状态机
### 任务状态
```
submitted → running → completed
→ failed
→ paused → running (resume)
→ waiting → running (human complete)
→ cancelled
```
### 步骤状态
```
pending → running → completed
→ failed
→ skipped
→ waiting → completed (human complete)
→ rejected (approval reject)
```
## step_type 可装卸
每条产线可以注册自己的 step_type引擎按类型匹配 handler 和交互协议。
```python
# 注册一个 SDLC 产线的步骤类型
from pipeline_service import register_step_type, register_handler
# 注册自动步骤
register_step_type("code_review_auto", {
"display_name": "自动代码审查",
"category": "devops",
"is_interactive": False,
})
# 注册人工步骤
register_step_type("code_review_manual", {
"display_name": "人工代码审查",
"category": "interactive",
"is_interactive": True,
"form_schema": {
"type": "object",
"properties": {
"approved": {"type": "boolean", "title": "是否通过"},
"comments": {"type": "string", "title": "审查意见"}
}
},
"timeout_hours": 48,
"on_timeout": "escalate",
})
# 注册 handler自动步骤需要交互步骤不需要
register_handler("code_review_auto", auto_review_handler)
```
### 内置交互类型
| step_type | 用途 | 行为 |
|-----------|------|------|
| human_task | 人工填写表单/执行操作 | 步骤 waiting → 人提交 → 继续 |
| approval_gate | 审批关卡 | 步骤 waiting → 通过继续 / 驳回回退 |
### 步骤定义中的配置
在 pipeline_steps 表的 step_config JSON 中指定交互参数:
```json
{
"deps": ["develop"],
"assignee_role": "reviewer",
"assignee_id": "user123",
"form_schema": {"type": "object", "properties": {...}},
"timeout_hours": 48,
"on_timeout": "escalate"
}
```
## 数据表
| 表名 | 用途 |
|------|------|
| pipeline_tasks | 任务主表tenant_id 隔离) |
| pipeline_task_steps | 任务步骤执行记录 |
| pipeline_artifacts | 步骤产物input/output支持版本 |
| **pipeline_human_tasks** | **人工任务记录v3新增** |
| **pipeline_step_types** | **步骤类型注册表v3新增** |
| pipeline_steps | 产线步骤定义(由 pipeline_core 模块管理) |
| pipelines | 产线定义(由 pipeline_core 模块管理) |
## 宿主集成
任何应用只需一行代码即可使用:
```python
from pipeline_service.init import load_pipeline_service
load_pipeline_service()
```
宿主负责:
- HTTP 路由ahserver 管)
- 用户认证RBAC 管)
- 前端交互bricks 管)
- 产线定义和定价pipeline_core/ops/dist 管)
pipeline-service 只做:调度 + 执行 + 存储 + 人工交互。
## 目录结构
```
pipeline-service/
├── pipeline_service/
│ ├── __init__.py # 包导出
│ ├── init.py # load_pipeline_service() + ServerEnv 注册
│ ├── state.py # DAG 解析、步骤状态机
│ ├── handler.py # 步骤处理器注册表
│ ├── step_registry.py # 步骤类型注册表v3新增
│ ├── human.py # 人工任务操作v3新增
│ ├── storage.py # MySQL 存储层sqlor
│ ├── executor.py # 执行循环
│ └── handlers_ktv.py # KTV产线专用 handlers
├── models/
│ ├── pipeline_tasks.json
│ ├── pipeline_task_steps.json
│ ├── pipeline_artifacts.json
│ ├── pipeline_human_tasks.json (v3新增)
│ └── pipeline_step_types.json (v3新增)
├── init/
│ └── data.json # appcodes 初始化数据
├── pyproject.toml
└── README.md
```
## 构建与部署
```bash
cd ~/repos/pipeline-service
pip install .
# 建表
json2ddl mysql models/ > mysql.ddl.sql
mysql -u root -p pipeline < mysql.ddl.sql
# 加载 appcodes
# (通过宿主应用的 build.sh 自动加载 init/data.json)
```