- New states: waiting (step/task), rejected (step) - New tables: pipeline_human_tasks, pipeline_step_types - New module: step_registry.py — pluggable step_type metadata - New module: human.py — human_complete, approval_approve, approval_reject - Executor: detects interactive step_types, creates human_tasks, enters waiting - Reject with rollback: approval_reject(rollback_to=step) resets steps and re-runs - API: human_task_complete, approval_approve, approval_reject, human_task_list - API: pipeline_step_types, pipeline_register_step_type, pipeline_unregister_step_type - Built-in interactive types: human_task, approval_gate - Updated DDL and appcodes
203 lines
6.9 KiB
Markdown
203 lines
6.9 KiB
Markdown
# pipeline-service v3.0 — 通用产线执行引擎
|
||
|
||
## 定位
|
||
|
||
通用产线执行引擎模块。把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。
|
||
|
||
**v3.0 新增:** 人工交互步骤(human_task/approval_gate)、step_type 可装卸注册、多角色协作。
|
||
|
||
## 核心价值
|
||
|
||
- Hermes Agent 中用 cron/delegate/terminal 跑通的流程 → 固化为产线步骤定义 → pipeline-service 自动调度执行
|
||
- 一次验证,无限次自动执行
|
||
- 多租户并发:同一产线,不同租户同时使用,数据完全隔离
|
||
- **人机协作:** 自动步骤 + 人工步骤混合执行,支持审批驳回回退
|
||
|
||
## 架构
|
||
|
||
```
|
||
宿主应用 (pipeline-app / 其他)
|
||
│
|
||
├── load_pipeline_service() ← 注册函数到 ServerEnv
|
||
│
|
||
├── 任务生命周期
|
||
│ ├── pipeline_submit(tenant_id, pipeline_id, owner_id, title, params)
|
||
│ ├── pipeline_list(tenant_id, pipeline_id?)
|
||
│ ├── pipeline_detail(tenant_id, task_id)
|
||
│ ├── pipeline_node(tenant_id, task_id, step_name, version?)
|
||
│ ├── pipeline_modify(tenant_id, task_id, updates, rerun_from)
|
||
│ ├── pipeline_pause(tenant_id, task_id)
|
||
│ ├── pipeline_resume(tenant_id, task_id)
|
||
│ └── pipeline_cancel(tenant_id, task_id)
|
||
│
|
||
├── 步骤类型注册(可装卸)
|
||
│ ├── pipeline_step_types() ← 列出所有类型
|
||
│ ├── pipeline_register_step_type(type, meta) ← 注册新类型
|
||
│ └── pipeline_unregister_step_type(type) ← 卸载类型
|
||
│
|
||
├── 人工交互
|
||
│ ├── human_task_complete(tenant_id, task_id, step_name, data, operator)
|
||
│ ├── approval_approve(tenant_id, task_id, step_name, reviewer, comments)
|
||
│ ├── approval_reject(tenant_id, task_id, step_name, reviewer, comments, rollback_to)
|
||
│ └── human_task_list(tenant_id?, role?, user?, status?)
|
||
│
|
||
└── Handler管理
|
||
└── pipeline_register_handler(step_type, fn)
|
||
```
|
||
|
||
## 引擎工作原理
|
||
|
||
1. **提交任务** → 读取 pipeline_steps 表的步骤定义 → 创建 pipeline_task_steps 记录 → 启动执行
|
||
2. **执行循环** → 解析 DAG 依赖图 → 找到可执行步骤 → 判断步骤类型:
|
||
- **自动步骤:** 调用 handler → 存 artifact → 继续下一步
|
||
- **交互步骤(human_task/approval_gate):** 创建 human_tasks 记录 → 步骤进入 waiting → 任务进入 waiting
|
||
3. **人工完成** → 调用 human_task_complete/approval_approve → 步骤标记完成 → 恢复执行
|
||
4. **审批驳回** → 调用 approval_reject(rollback_to=步骤名) → 回退指定步骤 → 级联重跑
|
||
5. **多租户** → 所有查询按 tenant_id 隔离
|
||
|
||
## 状态机
|
||
|
||
### 任务状态
|
||
```
|
||
submitted → running → completed
|
||
→ failed
|
||
→ paused → running (resume)
|
||
→ waiting → running (human complete)
|
||
→ cancelled
|
||
```
|
||
|
||
### 步骤状态
|
||
```
|
||
pending → running → completed
|
||
→ failed
|
||
→ skipped
|
||
→ waiting → completed (human complete)
|
||
→ rejected (approval reject)
|
||
```
|
||
|
||
## step_type 可装卸
|
||
|
||
每条产线可以注册自己的 step_type,引擎按类型匹配 handler 和交互协议。
|
||
|
||
```python
|
||
# 注册一个 SDLC 产线的步骤类型
|
||
from pipeline_service import register_step_type, register_handler
|
||
|
||
# 注册自动步骤
|
||
register_step_type("code_review_auto", {
|
||
"display_name": "自动代码审查",
|
||
"category": "devops",
|
||
"is_interactive": False,
|
||
})
|
||
|
||
# 注册人工步骤
|
||
register_step_type("code_review_manual", {
|
||
"display_name": "人工代码审查",
|
||
"category": "interactive",
|
||
"is_interactive": True,
|
||
"form_schema": {
|
||
"type": "object",
|
||
"properties": {
|
||
"approved": {"type": "boolean", "title": "是否通过"},
|
||
"comments": {"type": "string", "title": "审查意见"}
|
||
}
|
||
},
|
||
"timeout_hours": 48,
|
||
"on_timeout": "escalate",
|
||
})
|
||
|
||
# 注册 handler(自动步骤需要,交互步骤不需要)
|
||
register_handler("code_review_auto", auto_review_handler)
|
||
```
|
||
|
||
### 内置交互类型
|
||
|
||
| step_type | 用途 | 行为 |
|
||
|-----------|------|------|
|
||
| human_task | 人工填写表单/执行操作 | 步骤 waiting → 人提交 → 继续 |
|
||
| approval_gate | 审批关卡 | 步骤 waiting → 通过继续 / 驳回回退 |
|
||
|
||
### 步骤定义中的配置
|
||
|
||
在 pipeline_steps 表的 step_config JSON 中指定交互参数:
|
||
|
||
```json
|
||
{
|
||
"deps": ["develop"],
|
||
"assignee_role": "reviewer",
|
||
"assignee_id": "user123",
|
||
"form_schema": {"type": "object", "properties": {...}},
|
||
"timeout_hours": 48,
|
||
"on_timeout": "escalate"
|
||
}
|
||
```
|
||
|
||
## 数据表
|
||
|
||
| 表名 | 用途 |
|
||
|------|------|
|
||
| pipeline_tasks | 任务主表(tenant_id 隔离) |
|
||
| pipeline_task_steps | 任务步骤执行记录 |
|
||
| pipeline_artifacts | 步骤产物(input/output,支持版本) |
|
||
| **pipeline_human_tasks** | **人工任务记录(v3新增)** |
|
||
| **pipeline_step_types** | **步骤类型注册表(v3新增)** |
|
||
| pipeline_steps | 产线步骤定义(由 pipeline_core 模块管理) |
|
||
| pipelines | 产线定义(由 pipeline_core 模块管理) |
|
||
|
||
## 宿主集成
|
||
|
||
任何应用只需一行代码即可使用:
|
||
|
||
```python
|
||
from pipeline_service.init import load_pipeline_service
|
||
load_pipeline_service()
|
||
```
|
||
|
||
宿主负责:
|
||
- HTTP 路由(ahserver 管)
|
||
- 用户认证(RBAC 管)
|
||
- 前端交互(bricks 管)
|
||
- 产线定义和定价(pipeline_core/ops/dist 管)
|
||
|
||
pipeline-service 只做:调度 + 执行 + 存储 + 人工交互。
|
||
|
||
## 目录结构
|
||
|
||
```
|
||
pipeline-service/
|
||
├── pipeline_service/
|
||
│ ├── __init__.py # 包导出
|
||
│ ├── init.py # load_pipeline_service() + ServerEnv 注册
|
||
│ ├── state.py # DAG 解析、步骤状态机
|
||
│ ├── handler.py # 步骤处理器注册表
|
||
│ ├── step_registry.py # 步骤类型注册表(v3新增)
|
||
│ ├── human.py # 人工任务操作(v3新增)
|
||
│ ├── storage.py # MySQL 存储层(sqlor)
|
||
│ ├── executor.py # 执行循环
|
||
│ └── handlers_ktv.py # KTV产线专用 handlers
|
||
├── models/
|
||
│ ├── pipeline_tasks.json
|
||
│ ├── pipeline_task_steps.json
|
||
│ ├── pipeline_artifacts.json
|
||
│ ├── pipeline_human_tasks.json (v3新增)
|
||
│ └── pipeline_step_types.json (v3新增)
|
||
├── init/
|
||
│ └── data.json # appcodes 初始化数据
|
||
├── pyproject.toml
|
||
└── README.md
|
||
```
|
||
|
||
## 构建与部署
|
||
|
||
```bash
|
||
cd ~/repos/pipeline-service
|
||
pip install .
|
||
|
||
# 建表
|
||
json2ddl mysql models/ > mysql.ddl.sql
|
||
mysql -u root -p pipeline < mysql.ddl.sql
|
||
|
||
# 加载 appcodes
|
||
# (通过宿主应用的 build.sh 自动加载 init/data.json)
|
||
```
|