# `XtermProcessor` 技术文档 ## 概述 `XtermProcessor` 是一个基于 WebSocket 的终端处理器类,继承自 `PythonScriptProcessor`。它用于通过 Web 界面提供交互式 SSH 终端服务(类似 xterm.js 的后端支持),允许用户通过浏览器连接到远程服务器并执行命令。 该模块利用 `aiohttp` 提供异步 HTTP 和 WebSocket 通信能力,并结合 `appPublic.sshx.SSHServer` 实现与远程主机的 SSH 连接和终端会话管理。 --- ## 模块依赖 ```python from traceback import format_exc import asyncio import aiohttp import aiofiles import json import codecs from aiohttp import web import aiohttp_cors from appPublic.sshx import SSHServer from appPublic.dictObject import DictObject from appPublic.log import info, debug, warning, error, exception, critical from .baseProcessor import BaseProcessor, PythonScriptProcessor ``` ### 外部依赖说明: | 包/模块 | 用途 | |--------|------| | `aiohttp`, `web`, `aiohttp_cors` | 构建异步 Web 服务及处理 WebSocket 请求 | | `asyncio` | 异步任务调度 | | `aiofiles` | 异步文件操作(本代码中未直接使用) | | `json`, `codecs` | 数据序列化与编码处理 | | `appPublic.sshx.SSHServer` | 封装了 SSH 客户端功能,用于建立远程连接 | | `appPublic.dictObject.DictObject` | 字典对象封装,支持属性访问 | | `appPublic.log.*` | 日志输出接口 | --- ## 自定义异常:`ResizeException` ```python class ResizeException(Exception): def __init__(self, rows, cols): self.rows = rows self.cols = cols ``` > ⚠️ **注意**:当前代码中定义了 `ResizeException` 类但并未实际抛出或捕获此异常。可能为预留扩展功能。 --- ## 核心类:`XtermProcessor` 继承自 `PythonScriptProcessor`,实现了一个基于 WebSocket 的伪终端(PTY)代理,将前端输入转发至 SSH 子进程,并将输出回传给客户端。 ### 类方法 #### `isMe(name) -> bool` 判断当前处理器是否匹配指定名称。 ```python @classmethod def isMe(self, name): return name == 'xterm' ``` - **参数**: - `name` (str): 要检查的处理器名。 - **返回值**: - 若 `name == 'xterm'` 返回 `True`,否则 `False`。 - **用途**: - 在路由分发时用于选择合适的处理器。 --- ## 实例方法 ### `datahandle(request)` HTTP 请求入口点,调用路径处理逻辑。 ```python async def datahandle(self, request): await self.path_call(request) ``` - **参数**: - `request`: `aiohttp.web.Request` 对象。 - **说明**: - 执行 `path_call()` 方法以启动流程。 --- ### `path_call(request, params={})` 主处理入口,准备运行环境、获取登录信息并启动终端会话。 ```python async def path_call(self, request, params={}): await self.set_run_env(request, params=params) login_info = await super().path_call(request, params=params) if login_info is None: raise Exception('data error') ws = web.WebSocketResponse() await ws.prepare(request) await self.run_xterm(ws, login_info) self.retResponse = ws return ws ``` #### 流程说明: 1. 设置运行环境变量(如上下文、参数等)。 2. 调用父类 `PythonScriptProcessor.path_call()` 执行脚本逻辑,预期返回包含 SSH 登录信息的 `DictObject`。 3. 若 `login_info` 为空,则抛出异常。 4. 创建 WebSocket 响应对象并准备握手。 5. 启动 `run_xterm` 处理终端会话。 6. 保存响应对象并返回。 > ✅ **期望 `login_info` 结构示例**: ```python { "host": "192.168.1.100", "port": 22, "username": "admin", "password": "secret", # 或 key_filename / pkey 等字段 } ``` --- ### `run_xterm(ws, login_info)` 核心方法:建立 SSH 连接并启动双向数据流。 ```python async def run_xterm(self, ws, login_info): self.sshnode = SSHServer(login_info) async with self.sshnode.get_connector() as conn: self.running = True self.p_obj = await conn.create_process(term_type='xterm-256color', term_size=(80, 24)) r1 = self.ws_2_process(ws) r2 = self.process_2_ws(ws) await asyncio.gather(r1, r2) debug(f'run_xterm() ended') ``` #### 功能详解: - 使用 `SSHServer(login_info)` 初始化连接器。 - 获取连接上下文(`get_connector()`)进行资源管理。 - 创建带有彩色终端支持的进程(`create_process`),初始尺寸为 80x24。 - 并发运行两个协程: - `ws_2_process`: 接收客户端输入并发送到 SSH 子进程 stdin。 - `process_2_ws`: 读取子进程 stdout 并推送到客户端。 - 使用 `asyncio.gather()` 并行等待两者结束。 --- ### `ws_2_process(ws)` 处理来自 WebSocket 的消息,转发至 SSH 子进程。 ```python async def ws_2_process(self, ws): async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = DictObject(**json.loads(msg.data)) if data.type == 'close': debug('accept client close request, close the ws') self.running = False return if data.type == 'input': self.p_obj.stdin.write(data.data) elif data.type == 'heartbeat': await self.ws_send_heartbeat(ws) elif data.type == 'resize': try: self.p_obj._chan.change_terminal_size(data.cols, data.rows) except Exception as e: exception(f'{data=}, {e=}, {format_exc()}') elif msg.type == aiohttp.WSMsgType.ERROR: debug(f'ws connection closed with exception {ws.exception()}') return else: debug('recv from ws:{msg}+++++++++++') await asyncio.sleep(0) ``` #### 支持的消息类型: | 类型 | 描述 | |------|------| | `input` | 将 `data.data` 写入子进程标准输入 | | `resize` | 调整终端窗口大小为 `(cols, rows)` | | `heartbeat` | 回复心跳包,保持连接活跃 | | `close` | 关闭终端会话 | | 其他文本消息 | 忽略并打印日志 | > ❗ 注意:`change_terminal_size()` 可能引发异常,已捕获并记录堆栈。 --- ### `process_2_ws(ws)` 从 SSH 子进程读取输出,推送至 WebSocket。 ```python async def process_2_ws(self, ws): try: while self.running: x = await self.p_obj.stdout.read(1024) await self.ws_send_data(ws, x) await asyncio.sleep(0) finally: self.p_obj.close() ``` - 循环读取最多 1024 字节的数据。 - 调用 `ws_send_data()` 发送数据帧。 - 即使连接断开也会确保 `p_obj.close()` 被调用。 --- ### `ws_send_data(ws, d)` 发送数据内容到客户端。 ```python async def ws_send_data(self, ws, d): dic = { 'type': 'data', 'data': d } await self.ws_send(ws, dic) ``` --- ### `ws_send_heartbeat(ws)` 发送心跳响应。 ```python async def ws_send_heartbeat(self, ws): dic = { 'type': 'heartbeat' } await self.ws_send(ws, dic) ``` --- ### `ws_send(ws: WebSocketResponse, s)` 通用 WebSocket 消息发送函数。 ```python async def ws_send(self, ws: web.WebSocketResponse, s): data = { "type": 1, "data": s } await ws.send_str(json.dumps(data, indent=4, ensure_ascii=False)) ``` - **参数**: - `ws`: WebSocket 响应对象。 - `s`: 要发送的数据(通常为 dict)。 - **格式化规则**: - JSON 序列化,保留中文字符(`ensure_ascii=False`),缩进便于调试。 - **结构**: ```json { "type": 1, "data": { ... } } ``` > 🔔 提示:前端需解析 `"type":1` 表示这是一个终端数据帧。 --- ## 消息协议规范(WebSocket) ### 客户端 → 服务端 | 字段 | 类型 | 示例 | 说明 | |------|------|-------|------| | `type` | string | `"input"` | 消息类型 | | `data` | string | `"ls -l\\n"` | 输入内容(仅 `input` 类型) | | `rows`, `cols` | int | `24`, `80` | 窗口尺寸(`resize` 类型) | 示例: ```json { "type": "input", "data": "echo hello\\n" } ``` ```json { "type": "resize", "rows": 30, "cols": 120 } ``` --- ### 服务端 → 客户端 统一包装格式: ```json { "type": 1, "data": { "type": "data" | "heartbeat", "data": "output string..." } } ``` | `data.type` | 含义 | 数据格式 | |------------|------|---------| | `data` | 终端输出 | 字符串(UTF-8 编码的命令行输出) | | `heartbeat` | 心跳响应 | 空对象或仅含 type | --- ## 使用场景 适用于构建 Web 版终端应用(如运维平台、在线 IDE、云桌面等),典型架构如下: ``` [Browser] ↓ (WebSocket) [XtermProcessor] ↓ (SSH) [Remote Server] ``` 前端可配合 [xterm.js](https://xtermjs.org/) 使用,接收 `data` 类型消息写入终端,发送键盘输入作为 `input` 消息。 --- ## 日志等级说明 | 函数 | 用途 | |------|------| | `debug()` | 调试信息(连接状态、消息流转) | | `exception()` | 记录异常及完整堆栈 | | `error()/critical()` | 严重错误(当前未显式调用) | 建议开启 DEBUG 日志以便排查问题。 --- ## 注意事项与安全建议 1. **认证安全性**: - `login_info` 来源于脚本执行结果,必须严格校验来源,防止注入攻击。 - 不应在 URL 或日志中暴露密码。 2. **资源清理**: - 已在 `finally` 块中关闭 `p_obj`,但仍需确保 `SSHServer` 正确释放连接。 3. **输入验证**: - `resize` 消息未做边界检查,建议添加行列范围限制(如 10~1000)。 4. **性能优化**: - `read(1024)` 可根据负载调整缓冲区大小。 - `await asyncio.sleep(0)` 用于让出控制权,避免阻塞事件循环。 5. **CORS 配置**: - 需通过 `aiohttp_cors` 正确配置跨域策略,允许前端域名访问。 --- ## 示例配置(aiohttp 路由集成) ```python from aiohttp import web import aiohttp_cors app = web.Application() # 添加 CORS 支持 cors = aiohttp_cors.setup(app) # 注册处理器路由 app.router.add_route('GET', '/xterm/{nodeid}', xterm_processor_instance.datahandle) # 启用 CORS resource = cors.add(app.router['xterm']) resource.add_route("*", xterm_processor_instance.datahandle, web.HTTPAllowedMethodsRule(("GET",))) ``` --- ## 总结 `XtermProcessor` 是一个完整的异步 Web Terminal 后端实现,具备以下特点: ✅ 支持动态 SSH 登录配置 ✅ 双向实时通信(WebSocket ↔ SSH PTY) ✅ 终端大小调整、心跳保活、优雅关闭 ✅ 易于集成至现有 Web 框架 适合用于开发安全可控的远程终端访问系统。