# ZMQ Request-Reply 模块技术文档 `zmq_reqresp.py` 是一个基于 ZeroMQ 的请求-响应(Request-Response)通信模式的 Python 封装模块,支持同步与异步两种运行模式。该模块提供了 `ZmqRequester` 和 `ZmqReplier` 两个核心类,分别用于客户端发送请求和服务器端接收并回复请求。 --- ## 目录 - [概述](#概述) - [依赖](#依赖) - [类说明](#类说明) - [`ZmqRequester`](#zmqrqester) - [`ZmqReplier`](#zmqreplier) - [使用示例](#使用示例) - [同步模式示例](#同步模式示例) - [异步模式示例](#异步模式示例) - [异常处理](#异常处理) - [注意事项](#注意事项) --- ## 概述 本模块封装了 ZeroMQ 的 `REQ/REP` 套接字模式,实现了: - 支持字符串和字节流的消息传输; - 同步与异步双模式支持; - 可配置超时重连机制; - 简洁易用的 API 接口。 适用于构建轻量级远程调用、微服务间通信等场景。 --- ## 依赖 ```txt pyzmq >= 22.0 Python >= 3.7 ``` > 注:异步功能依赖于 `asyncio` 和 `zmq.asyncio`。 --- ## 类说明 ### `ZmqRequester` 代表一个 ZeroMQ 客户端(REQ 套接字),用于向服务端发送请求并接收响应。 #### 初始化参数 | 参数 | 类型 | 描述 | |------|------|------| | `url` | str | 连接的服务端地址,如 `"tcp://127.0.0.1:5555"` | | `async_mode` | bool | 是否启用异步模式,默认为 `False` | | `timeout` | int | 接收响应的超时时间(单位:秒),设为 0 表示阻塞等待 | > ⚠️ 若 `timeout > 0`,内部会使用 `Poller` 实现非阻塞轮询,并在超时时自动重连。 #### 方法 ##### `send(msg: str) -> str or None` - **描述**:同步发送字符串消息,返回响应字符串。 - **参数**: - `msg`: 要发送的字符串(UTF-8 编码) - **返回值**:服务端返回的字符串,或 `None`(超时) - **异常**:若处于异步模式则抛出异常 ##### `send_b(b: bytes) -> bytes or None` - **描述**:同步发送字节数据,返回字节响应。 - **参数**: - `b`: 要发送的字节对象 - **返回值**:响应字节数据,或 `None`(超时) ##### `async asend(msg: str) -> str or None` - **描述**:异步发送字符串消息。 - **参数**: - `msg`: 字符串消息 - **返回值**:响应字符串或 `None` - **异常**:若未启用异步模式则抛出异常 ##### `async asend_b(b: bytes) -> bytes or None` - **描述**:异步发送字节数据。 - **参数**: - `b`: 字节数据 - **返回值**:响应字节数据或 `None` ##### `_connect()` - 内部方法,用于创建上下文并连接到服务端。 ##### `_close()` - 内部方法,关闭套接字和上下文资源。 > ✅ 自动在析构函数中调用 `_close()` 释放资源。 --- ### `ZmqReplier` 代表一个 ZeroMQ 服务端(REP 套接字),监听请求并调用处理器函数进行响应。 #### 初始化参数 | 参数 | 类型 | 描述 | |------|------|------| | `url` | str | 绑定地址,如 `"tcp://*:5555"` | | `handler` | callable 或 coroutine function | 处理请求的回调函数 | | `async_mode` | bool | 是否以异步模式运行 | > ⚠️ 若 `async_mode=False`,但 `handler` 是协程函数,则会抛出错误。 #### 回调函数要求 - 输入:`bytes` 类型的请求数据 - 输出:可为 `bytes` 或 `str`(自动编码为 UTF-8) #### 方法 ##### `run()` - **描述**:启动服务端(同步模式) - **行为**:在后台线程中运行 `_run()`,不阻塞主线程 - 使用 `Background` 类实现多线程执行 ##### `async_run()` - **描述**:异步运行服务端主循环(需在事件循环中 await) - **注意**:此方法不会自动启动任务,需手动调度至 `asyncio` 事件循环 ##### `_run()` - 同步主循环逻辑:接收请求 → 调用处理器 → 发送响应 ##### `stop()` - **描述**:停止服务端运行 - 设置 `keep_running = False` 并尝试 `join()`(注:当前 `join()` 未定义,见[注意事项](#注意事项)) --- ## 使用示例 ### 同步模式示例 #### 服务端(Replier) ```python def echo_handler(data: bytes) -> bytes: return b"Echo: " + data replier = ZmqReplier("tcp://*:5555", echo_handler, async_mode=False) replier.run() # 主程序保持运行 import time try: while True: time.sleep(1) except KeyboardInterrupt: replier.stop() ``` #### 客户端(Requester) ```python req = ZmqRequester("tcp://127.0.0.1:5555", async_mode=False, timeout=5) response = req.send("Hello") print(response) # 输出: Echo: Hello ``` --- ### 异步模式示例 #### 异步服务端 ```python import asyncio async def async_handler(data: bytes) -> bytes: await asyncio.sleep(0.1) # 模拟异步操作 return b"Async reply to " + data replier = ZmqReplier("tcp://*:5555", async_handler, async_mode=True) async def main(): await replier.async_run() asyncio.run(main()) ``` #### 异步客户端 ```python import asyncio from zmq_reqresp import ZmqRequester async def client(): req = ZmqRequester("tcp://127.0.0.1:5555", async_mode=True, timeout=5) response = await req.asend("Test message") print(response) asyncio.run(client()) ``` --- ## 异常处理 | 场景 | 抛出异常 | |------|----------| | 在异步模式下调用同步 `send` 方法 | `Exception('ZMQ_Requester: in async mode, use asend instead')` | | 在同步模式下调用异步方法 | `Exception('ZMQ_Requester: not in async mode, use send instead')` | | 非异步模式下传入协程处理器 | `TypeError`(原代码有误,应为 `raise TypeError(...)`) | > ❗ 原始代码中的 `raise('not in async mode...')` 存在语法错误,应改为: ```python raise TypeError('not in async mode, handler can not be a coroutine') ``` --- ## 注意事项 1. **资源管理** - `ZmqRequester.__del__` 中调用了 `_close()`,但在某些情况下 `__del__` 不一定被及时调用。建议显式管理生命周期。 2. **超时重连机制** - 当设置 `timeout > 0` 时,若接收超时,会自动关闭并重新连接。这可能导致短暂中断,适合容忍短暂失败的场景。 3. **ZmqReplier.stop() 中的 `join()` 错误** - 当前代码中 `self.join()` 未定义,因为 `Background` 对象未暴露或继承自 `Thread`。 - 应确保 `Background` 类支持 `.join()` 方法,否则需移除或修复此行。 4. **异步模式下的 Poller 使用问题** - `asend_b` 中混合使用了 `poller.poll()`(同步)与 `await self.sock.recv_multipart()`(异步),这会导致阻塞风险。 - 在异步模式下应避免使用 `Poller().poll()`,推荐使用 `asyncio.wait_for()` 包裹 `recv_multipart()`。 ✅ 推荐改进方式: ```python try: r = await asyncio.wait_for(self.sock.recv_multipart(), timeout=self.timeout) return r[0] except asyncio.TimeoutError: self._close() self._connect() return None ``` 5. **REP 套接字状态一致性** - 必须保证每次 `recv` 后都有对应的 `send`,否则套接字将进入非法状态。本模块通过结构化流程保障这一点。 6. **线程安全** - `ZmqRequester` 不是线程安全的,每个线程应使用独立实例。 --- ## 版本建议改进 | 问题 | 建议修复 | |------|---------| | `raise('string')` 语法错误 | 改为 `raise Exception(...)` | | `stop()` 调用未定义 `join()` | 移除或确保 `Background` 支持 `.join()` | | 异步超时检测使用同步 Poller | 改用 `asyncio.wait_for()` | | `rb =self.self.handler(b)` 拼写错误 | 应为 `rb = self.handler(b)` | ✅ 已发现拼写错误: ```python rb =self.self.handler(b) # 错误! ``` 应修正为: ```python rb = self.handler(b) ``` --- ## 总结 `zmq_reqresp.py` 提供了一个简洁实用的 ZeroMQ 请求-响应通信封装,支持同步与异步双模式,在微服务、RPC 场景中有良好应用潜力。尽管存在少量 bug 和设计瑕疵,但整体架构清晰,易于扩展和维护。 > 建议结合 `contextlib` 或 `async context manager` 进一步增强资源管理能力。 --- 📌 **作者**:未知 📅 **最后更新**:2025年4月5日