diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..403bf6e --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +*.egg + +# Virtual environment +py3/ +pkgs/ + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store + +# Logs +logs/ diff --git a/pipeline_task/__init__.py b/pipeline_task/__init__.py new file mode 100644 index 0000000..94fec18 --- /dev/null +++ b/pipeline_task/__init__.py @@ -0,0 +1,5 @@ +"""pipeline_task — 产线任务交互层""" + +from .init import load_pipeline_task + +__version__ = "1.0.0" diff --git a/pipeline_task/init.py b/pipeline_task/init.py new file mode 100644 index 0000000..f80180f --- /dev/null +++ b/pipeline_task/init.py @@ -0,0 +1,22 @@ +"""pipeline_task 交互模块 — 产线任务的用户交互层。 + +纯薄交互层,无数据表。通过 ServerEnv 调用 pipeline-service 的函数。 +任何宿主加载 pipeline_service 后,再加载 pipeline_task 即可使用。 +""" + +from ahserver.serverenv import ServerEnv + +MODULE_NAME = "pipeline_task" +MODULE_VERSION = "1.0.0" + + +def load_pipeline_task(): + """注册交互模块到 ServerEnv。""" + env = ServerEnv() + # 模块元信息 + env.pipeline_task_info = lambda: { + "module": MODULE_NAME, + "version": MODULE_VERSION, + "depends_on": "pipeline_service", + } + return True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..a1f66c7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["setuptools>=45", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "pipeline_task" +version = "1.0.0" +description = "产线任务交互模块 — 提交任务、查看状态、修改节点、暂停恢复" +requires-python = ">=3.8" +dependencies = [ + "bricks_for_python", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["pipeline_task*"] diff --git a/scripts/load_path.py b/scripts/load_path.py new file mode 100644 index 0000000..91c20c5 --- /dev/null +++ b/scripts/load_path.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""pipeline_task 模块 RBAC 权限注册。""" + +import os +import sys +import subprocess + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Find Sage root +SAGE_ROOT = None +for candidate in [ + os.path.join(SCRIPT_DIR, "..", ".."), + os.path.expanduser("~/repos/sage"), + os.path.expanduser("~/sage"), + os.path.expanduser("~/test/pipeline-app"), +]: + if os.path.isdir(os.path.join(candidate, "wwwroot")) and os.path.isdir(os.path.join(candidate, "py3")): + SAGE_ROOT = os.path.abspath(candidate) + break + +if not SAGE_ROOT: + print("ERROR: Cannot find Sage/pipeline-app root directory") + sys.exit(1) + +SET_ROLE_PERM = os.path.join(SAGE_ROOT, "set_role_perm.py") +PYTHON = os.path.join(SAGE_ROOT, "py3", "bin", "python") + +MOD = "pipeline_task" + +PATHS_LOGINED = [ + f"/{MOD}/index.ui", + f"/{MOD}/task_list.ui", + f"/{MOD}/task_detail.ui", + f"/{MOD}/task_submit.ui", + f"/{MOD}/api/task_submit.dspy", + f"/{MOD}/api/task_list.dspy", + f"/{MOD}/api/task_detail.dspy", + f"/{MOD}/api/task_node.dspy", + f"/{MOD}/api/task_modify.dspy", + f"/{MOD}/api/task_control.dspy", +] + +PATHS_ANY = [ + f"/{MOD}/pipeline_task.js", +] + + +def register_paths(): + for path in PATHS_ANY: + subprocess.run([PYTHON, SET_ROLE_PERM, "any", path], cwd=SAGE_ROOT) + print(f" any: {path}") + + for path in PATHS_LOGINED: + subprocess.run([PYTHON, SET_ROLE_PERM, "logined", path], cwd=SAGE_ROOT) + print(f" logined: {path}") + + +if __name__ == "__main__": + print(f"=== pipeline_task RBAC registration ===") + print(f"Root: {SAGE_ROOT}") + register_paths() + print("Done.") diff --git a/wwwroot/api/task_control.dspy b/wwwroot/api/task_control.dspy new file mode 100644 index 0000000..f491452 --- /dev/null +++ b/wwwroot/api/task_control.dspy @@ -0,0 +1,20 @@ +tenant_id = (await get_userorgid()) or '0' +task_id = params_kw.get('task_id', '') +action = params_kw.get('action', '') + +if not task_id: + return json.dumps({"success": False, "message": "缺少task_id"}, ensure_ascii=False) + +if action not in ('pause', 'resume', 'cancel'): + return json.dumps({"success": False, "message": "action必须是pause/resume/cancel"}, ensure_ascii=False) + +try: + if action == 'pause': + result = await pipeline_pause(tenant_id, task_id) + elif action == 'resume': + result = await pipeline_resume(tenant_id, task_id) + else: + result = await pipeline_cancel(tenant_id, task_id) + return result +except Exception as e: + return json.dumps({"success": False, "message": str(e)}, ensure_ascii=False) diff --git a/wwwroot/api/task_detail.dspy b/wwwroot/api/task_detail.dspy new file mode 100644 index 0000000..6baa232 --- /dev/null +++ b/wwwroot/api/task_detail.dspy @@ -0,0 +1,11 @@ +tenant_id = (await get_userorgid()) or '0' +task_id = params_kw.get('task_id', '') + +if not task_id: + return json.dumps({"success": False, "message": "缺少task_id"}, ensure_ascii=False) + +try: + result = await pipeline_detail(tenant_id, task_id) + return result +except Exception as e: + return json.dumps({"success": False, "message": str(e)}, ensure_ascii=False) diff --git a/wwwroot/api/task_list.dspy b/wwwroot/api/task_list.dspy new file mode 100644 index 0000000..64b7271 --- /dev/null +++ b/wwwroot/api/task_list.dspy @@ -0,0 +1,9 @@ +tenant_id = (await get_userorgid()) or '0' +pipeline_id = params_kw.get('pipeline_id', None) +limit = int(params_kw.get('limit', 100)) + +try: + result = await pipeline_list(tenant_id, pipeline_id, limit) + return result +except Exception as e: + return json.dumps({"success": False, "message": str(e)}, ensure_ascii=False) diff --git a/wwwroot/api/task_modify.dspy b/wwwroot/api/task_modify.dspy new file mode 100644 index 0000000..73c4bb9 --- /dev/null +++ b/wwwroot/api/task_modify.dspy @@ -0,0 +1,25 @@ +tenant_id = (await get_userorgid()) or '0' +task_id = params_kw.get('task_id', '') +step_name = params_kw.get('step_name', '') +content_raw = params_kw.get('content', '') +rerun_from = params_kw.get('rerun_from', 'node') + +if not task_id or not step_name: + return json.dumps({"success": False, "message": "缺少task_id或step_name"}, ensure_ascii=False) + +# Parse content +if isinstance(content_raw, str): + try: + content = json.loads(content_raw) + except Exception: + content = {"text": content_raw} +else: + content = content_raw + +updates = {step_name: {"content": content}} + +try: + result = await pipeline_modify(tenant_id, task_id, updates, rerun_from) + return result +except Exception as e: + return json.dumps({"success": False, "message": str(e)}, ensure_ascii=False) diff --git a/wwwroot/api/task_node.dspy b/wwwroot/api/task_node.dspy new file mode 100644 index 0000000..48d3847 --- /dev/null +++ b/wwwroot/api/task_node.dspy @@ -0,0 +1,13 @@ +tenant_id = (await get_userorgid()) or '0' +task_id = params_kw.get('task_id', '') +step_name = params_kw.get('step_name', '') +version = params_kw.get('version', None) + +if not task_id or not step_name: + return json.dumps({"success": False, "message": "缺少task_id或step_name"}, ensure_ascii=False) + +try: + result = await pipeline_node(tenant_id, task_id, step_name, version) + return result +except Exception as e: + return json.dumps({"success": False, "message": str(e)}, ensure_ascii=False) diff --git a/wwwroot/api/task_submit.dspy b/wwwroot/api/task_submit.dspy new file mode 100644 index 0000000..6cf8d81 --- /dev/null +++ b/wwwroot/api/task_submit.dspy @@ -0,0 +1,22 @@ +tenant_id = (await get_userorgid()) or '0' +pipeline_id = params_kw.get('pipeline_id', '') +title = params_kw.get('title', '') +owner_id = await get_user() + +if not pipeline_id: + return json.dumps({"success": False, "message": "缺少产线ID"}, ensure_ascii=False) +if not title: + return json.dumps({"success": False, "message": "缺少任务标题"}, ensure_ascii=False) + +# Collect optional params +task_params = {} +for key in ['input_audio', 'input_video', 'input_text', 'lyrics', 'mode', 'scene']: + val = params_kw.get(key, '') + if val: + task_params[key] = val + +try: + result = await pipeline_submit(tenant_id, pipeline_id, owner_id, title, task_params) + return result +except Exception as e: + return json.dumps({"success": False, "message": str(e)}, ensure_ascii=False) diff --git a/wwwroot/index.ui b/wwwroot/index.ui new file mode 100644 index 0000000..98e08b8 --- /dev/null +++ b/wwwroot/index.ui @@ -0,0 +1,28 @@ +{ + "widgettype": "VBox", + "options": {"width": "100%", "height": "100%", "padding": "20px"}, + "subwidgets": [ + { + "widgettype": "HBox", + "options": {"width": "100%", "justifyContent": "space-between", "alignItems": "center", "marginBottom": "16px"}, + "subwidgets": [ + {"widgettype": "Title", "options": {"text": "产线任务", "cfontsize": 24}}, + { + "widgettype": "Button", + "options": {"label": "提交新任务", "icon": "plus"}, + "binds": [{ + "wid": "self", "event": "click", "actiontype": "urlwidget", + "target": "app.pipeline_task_content", + "options": {"url": "{{entire_url('task_submit.ui')}}"}, + "mode": "replace" + }] + } + ] + }, + { + "widgettype": "VBox", + "id": "pipeline_task_content", + "options": {"width": "100%", "flex": "1"} + } + ] +} diff --git a/wwwroot/pipeline_task.js b/wwwroot/pipeline_task.js new file mode 100644 index 0000000..ab6b8b3 --- /dev/null +++ b/wwwroot/pipeline_task.js @@ -0,0 +1,74 @@ +// pipeline_task.js — 产线任务交互辅助函数 + +var currentTaskId = ''; + +// 加载任务列表 +function loadTaskList() { + var filterPipeline = $('[name="filter_pipeline"]').val() || ''; + var url = entire_url('api/task_list.dspy'); + if (filterPipeline) { + url += '?pipeline_id=' + filterPipeline; + } + + $.get(url, function(resp) { + var data = typeof resp === 'string' ? JSON.parse(resp) : resp; + if (!data.success) { + $('#task_table_area').html('
| 任务ID | 标题 | 状态 | 版本 | 创建时间 | 操作 | '; + html += '
|---|---|---|---|---|---|
| ' + (t.id || '').substring(0, 8) + ' | '; + html += '' + (t.title || '') + ' | '; + html += '' + (t.state || '') + ' | '; + html += 'v' + (t.current_version || 1) + ' | '; + html += '' + (t.created_at || '') + ' | '; + html += ''; + html += ' |