Hermes Agent a44130b23b refactor(v3.1): decouple KTV handlers — pipeline-service is pure engine
- Remove handlers_ktv.py (943 lines) — KTV is now an external adapter
- Remove register_ktv_handlers() from load_pipeline_service()
- Remove KTV imports from __init__.py
- pipeline-service no longer knows about any specific pipeline type
- Version bump: 3.0.0 → 3.1.0

Each pipeline type should be a separate adapter package that calls:
  register_handler(step_type, fn)
  register_step_type(step_type, metadata)

This keeps pipeline-service stable and lets adapters evolve independently.
2026-06-16 11:11:05 +08:00

pipeline-service v3.0 — 通用产线执行引擎

定位

通用产线执行引擎模块。把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。

v3.0 新增: 人工交互步骤human_task/approval_gate、step_type 可装卸注册、多角色协作。

核心价值

  • Hermes Agent 中用 cron/delegate/terminal 跑通的流程 → 固化为产线步骤定义 → pipeline-service 自动调度执行
  • 一次验证,无限次自动执行
  • 多租户并发:同一产线,不同租户同时使用,数据完全隔离
  • 人机协作: 自动步骤 + 人工步骤混合执行,支持审批驳回回退

架构

宿主应用 (pipeline-app / 其他)
  │
  ├── load_pipeline_service()  ← 注册函数到 ServerEnv
  │
  ├── 任务生命周期
  │   ├── pipeline_submit(tenant_id, pipeline_id, owner_id, title, params)
  │   ├── pipeline_list(tenant_id, pipeline_id?)
  │   ├── pipeline_detail(tenant_id, task_id)
  │   ├── pipeline_node(tenant_id, task_id, step_name, version?)
  │   ├── pipeline_modify(tenant_id, task_id, updates, rerun_from)
  │   ├── pipeline_pause(tenant_id, task_id)
  │   ├── pipeline_resume(tenant_id, task_id)
  │   └── pipeline_cancel(tenant_id, task_id)
  │
  ├── 步骤类型注册(可装卸)
  │   ├── pipeline_step_types()                    ← 列出所有类型
  │   ├── pipeline_register_step_type(type, meta)  ← 注册新类型
  │   └── pipeline_unregister_step_type(type)      ← 卸载类型
  │
  ├── 人工交互
  │   ├── human_task_complete(tenant_id, task_id, step_name, data, operator)
  │   ├── approval_approve(tenant_id, task_id, step_name, reviewer, comments)
  │   ├── approval_reject(tenant_id, task_id, step_name, reviewer, comments, rollback_to)
  │   └── human_task_list(tenant_id?, role?, user?, status?)
  │
  └── Handler管理
      └── pipeline_register_handler(step_type, fn)

引擎工作原理

  1. 提交任务 → 读取 pipeline_steps 表的步骤定义 → 创建 pipeline_task_steps 记录 → 启动执行
  2. 执行循环 → 解析 DAG 依赖图 → 找到可执行步骤 → 判断步骤类型:
    • 自动步骤: 调用 handler → 存 artifact → 继续下一步
    • 交互步骤human_task/approval_gate 创建 human_tasks 记录 → 步骤进入 waiting → 任务进入 waiting
  3. 人工完成 → 调用 human_task_complete/approval_approve → 步骤标记完成 → 恢复执行
  4. 审批驳回 → 调用 approval_reject(rollback_to=步骤名) → 回退指定步骤 → 级联重跑
  5. 多租户 → 所有查询按 tenant_id 隔离

状态机

任务状态

submitted → running → completed
                    → failed
                    → paused → running (resume)
                    → waiting → running (human complete)
                    → cancelled

步骤状态

pending → running → completed
                  → failed
                  → skipped
                  → waiting → completed (human complete)
                            → rejected (approval reject)

step_type 可装卸

每条产线可以注册自己的 step_type引擎按类型匹配 handler 和交互协议。

# 注册一个 SDLC 产线的步骤类型
from pipeline_service import register_step_type, register_handler

# 注册自动步骤
register_step_type("code_review_auto", {
    "display_name": "自动代码审查",
    "category": "devops",
    "is_interactive": False,
})

# 注册人工步骤
register_step_type("code_review_manual", {
    "display_name": "人工代码审查",
    "category": "interactive",
    "is_interactive": True,
    "form_schema": {
        "type": "object",
        "properties": {
            "approved": {"type": "boolean", "title": "是否通过"},
            "comments": {"type": "string", "title": "审查意见"}
        }
    },
    "timeout_hours": 48,
    "on_timeout": "escalate",
})

# 注册 handler自动步骤需要交互步骤不需要
register_handler("code_review_auto", auto_review_handler)

内置交互类型

step_type 用途 行为
human_task 人工填写表单/执行操作 步骤 waiting → 人提交 → 继续
approval_gate 审批关卡 步骤 waiting → 通过继续 / 驳回回退

步骤定义中的配置

在 pipeline_steps 表的 step_config JSON 中指定交互参数:

{
    "deps": ["develop"],
    "assignee_role": "reviewer",
    "assignee_id": "user123",
    "form_schema": {"type": "object", "properties": {...}},
    "timeout_hours": 48,
    "on_timeout": "escalate"
}

数据表

表名 用途
pipeline_tasks 任务主表tenant_id 隔离)
pipeline_task_steps 任务步骤执行记录
pipeline_artifacts 步骤产物input/output支持版本
pipeline_human_tasks 人工任务记录v3新增
pipeline_step_types 步骤类型注册表v3新增
pipeline_steps 产线步骤定义(由 pipeline_core 模块管理)
pipelines 产线定义(由 pipeline_core 模块管理)

宿主集成

任何应用只需一行代码即可使用:

from pipeline_service.init import load_pipeline_service
load_pipeline_service()

宿主负责:

  • HTTP 路由ahserver 管)
  • 用户认证RBAC 管)
  • 前端交互bricks 管)
  • 产线定义和定价pipeline_core/ops/dist 管)

pipeline-service 只做:调度 + 执行 + 存储 + 人工交互。

目录结构

pipeline-service/
├── pipeline_service/
│   ├── __init__.py        # 包导出
│   ├── init.py            # load_pipeline_service() + ServerEnv 注册
│   ├── state.py           # DAG 解析、步骤状态机
│   ├── handler.py         # 步骤处理器注册表
│   ├── step_registry.py   # 步骤类型注册表v3新增
│   ├── human.py           # 人工任务操作v3新增
│   ├── storage.py         # MySQL 存储层sqlor
│   ├── executor.py        # 执行循环
│   └── handlers_ktv.py    # KTV产线专用 handlers
├── models/
│   ├── pipeline_tasks.json
│   ├── pipeline_task_steps.json
│   ├── pipeline_artifacts.json
│   ├── pipeline_human_tasks.json    (v3新增)
│   └── pipeline_step_types.json     (v3新增)
├── init/
│   └── data.json          # appcodes 初始化数据
├── pyproject.toml
└── README.md

构建与部署

cd ~/repos/pipeline-service
pip install .

# 建表
json2ddl mysql models/ > mysql.ddl.sql
mysql -u root -p pipeline < mysql.ddl.sql

# 加载 appcodes
# (通过宿主应用的 build.sh 自动加载 init/data.json)
Description
No description provided
Readme 109 KiB
Languages
Python 97.6%
Shell 2.4%