apppublic/aidocs/zmqapi.md
2025-10-05 11:23:33 +08:00

9.3 KiB
Raw Permalink Blame History

ZeroMQ 异步通信模块技术文档

本模块基于 zmq.asyncio 提供了一组用于构建异步消息通信系统的 Python 类,支持多种 ZeroMQ 模式:发布/订阅 (Pub/Sub)请求/响应 (Req/Rep)推/拉 (Push/Pull)对等连接 (Pair)。所有类均为异步设计,适用于 asyncio 协程环境。


📦 依赖

  • Python 3.7+
  • pyzmqpip install pyzmq
  • asyncio

注意:使用前请确保已安装 pyzmq 并支持 asyncio 集成。

pip install pyzmq

🧩 核心功能概览

类名 功能描述 ZeroMQ 模式
Publisher 发布消息到多个订阅者 PUB
Subscriber 订阅特定消息 ID 的消息 SUB
RRServer 请求-响应模式的服务端 REP
RRClient 请求-响应模式的客户端 REQ
PPPusher 推送任务或数据 PUSH
PPPuller 拉取并处理任务或数据 PULL
PairClient 点对点双向通信客户端 PAIR
PairServer 点对点双向通信服务端(可带处理器) PAIR

🔧 使用说明与 API 文档

1. Publisher - 发布者PUB

用于向多个订阅者广播消息。

初始化

pub = Publisher(port, coding='utf-8', msgid=1000)
  • port: int绑定的 TCP 端口。
  • coding: str默认 'utf-8',编码格式。
  • msgid: int默认 1000,默认消息 ID 前缀。

📡 方法:publish(msg, msgtype='text', msgid=-1)

发送一条消息。

  • msg: 要发送的内容(字符串或可序列化对象)。
  • msgtype: 消息类型,支持 'text' 或其他如 'json'
  • msgid: 自定义消息 ID若为 -1 则使用实例默认值。

msgtype != 'text',会自动将 msg 通过 json.dumps() 序列化,并强制设为 'json' 类型。

示例:
await pub.publish("Hello World")
await pub.publish({"data": 42}, msgtype="json")
消息格式(在传输中):
<msgid> <msgtype> <body>

例如:

1000 json {"data": 42}

💡 注意事项

  • 多个订阅者可通过不同端口连接同一发布者。
  • 不保证消息送达ZMQ PUB/SUB 特性)。

2. Subscriber - 订阅者SUB

接收来自一个或多个发布者的特定消息。

初始化

sub = Subscriber(host, ports, msgid, coding='utf-8')
  • host: str发布者主机地址'localhost')。
  • ports: list of int要连接的一个或多个端口。
  • msgid: int只订阅以该 ID 开头的消息。
  • coding: str解码方式默认 'utf-8'

内部设置 SUBSCRIBE 过滤器为 b'<msgid>',仅接收匹配前缀的消息。

方法:addPort(port)

动态添加一个新的发布者端口进行连接。

sub.addPort(5556)

📥 方法:subscribe() → str or dict

异步接收并解析下一条消息。

  • 返回值:
    • msgtype == 'json':返回反序列化的 dict
    • 否则返回原始字符串。
示例:
data = await sub.subscribe()
print(data)  # 可能是字符串或字典

💡 注意事项

  • 必须确保发布的 msgid 与订阅者设置的一致才能接收到消息。
  • 支持多播场景,可同时连接多个发布者端口。

3. RRServer - 请求/响应服务端REP

实现同步式的请求-应答服务器。

初始化

server = RRServer(port, handler=None)
  • port: int监听端口。
  • handler: callable可选回调函数处理请求并返回响应。
    • 支持普通函数和 async def 协程。

▶️ 方法:run()

启动服务循环,持续监听请求。

处理逻辑:
  1. 接收请求消息 rmsgbytes
  2. 若有 handler,调用它生成响应 wmsg
  3. 如果 handler 返回的是协程,则等待其完成。
  4. 将响应发回客户端。
示例 Handler
def my_handler(msg):
    return b"Echo: " + msg

# 或异步版本
async def async_handler(msg):
    await asyncio.sleep(0.1)
    return b"Processed"
启动服务:
await server.run()

⚠️ 此方法为无限循环,需运行在事件循环中。


4. RRClient - 请求/响应客户端REQ

RRServer 发起请求并等待响应。

初始化

client = RRClient(host, port)
  • host: str服务器地址。
  • port: int服务器端口。

📤 方法:request(msg) → bytes

发送请求并返回响应。

response = await client.request(b"Hello")
print(response)  # b'Echo: Hello'

REQ/REP 是严格的一问一答模式,不能连续两次发送。


5. PPPusher - 推送器PUSH

用于向一个或多个 PPPuller 推送任务或消息。

初始化

pusher = PPPusher(host, port)
  • host: 绑定的 IP 地址(通常 '*''localhost')。
  • port: 绑定端口。

使用 PUSH 模式,适合负载均衡分发任务。

🚀 方法:push(msg)

推送一条消息bytes 或字符串需编码)。

await pusher.push(b"Task 1")

6. PPPuller - 拉取器PULL

PPPusher 接收消息并处理。

初始化

puller = PPPuller(host, port, handler=None)
  • host: 监听地址。
  • port: 监听端口。
  • handler: 接收到消息后的处理函数(可为协程)。

▶️ 方法:run()

启动拉取循环,持续接收消息并交由 handler 处理。

async def handle_task(msg):
    print("Processing:", msg)

puller = PPPuller('localhost', 5558, handle_task)
await puller.run()

支持异步处理器,自动 await 协程结果。


7. PairClient - 对等客户端PAIR

实现简单的点对点双向通信中的“客户端”角色。

初始化

client = PairClient(host, port)
  • host: 连接目标 IP。
  • port: 连接端口。

实际上是主动 bind 的一方,在 PAIR 模式中双方只能一对一。

📤 方法:request(msg) → bytes

发送消息并等待对方回复。

resp = await client.request(b"Ping")

8. PairServer - 对等服务端PAIR

接收来自 PairClient 的消息并响应。

初始化

server = PairServer(port, handler=None)
  • port: 监听端口。
  • handler: 处理函数,决定返回内容(支持协程)。

▶️ 方法:run()

开启监听循环:

async def echo_handler():
    return b"OK"

server = PairServer(5560, echo_handler)
await server.run()

PAIR 模式不支持多客户端,仅用于两个节点间的直接通信。


🔄 通信模式对比

模式 典型用途 特点
PUB/SUB 广播通知、事件流 多播,无确认,可能丢包
REQ/REP 同步远程调用 严格同步,一问一答
PUSH/PULL 工作队列、任务分发 流水线架构,支持扇出
PAIR 点对点控制信道 简单双向通信,限一对一双向

🧪 示例:简单发布/订阅

# publisher.py
import asyncio
from your_module import Publisher

async def main():
    pub = Publisher(5555)
    while True:
        await pub.publish("Hi there!", msgtype="text")
        await asyncio.sleep(1)

asyncio.run(main())
# subscriber.py
import asyncio
from your_module import Subscriber

async def main():
    sub = Subscriber('localhost', [5555], msgid=1000)
    while True:
        msg = await sub.subscribe()
        print("Received:", msg)

asyncio.run(main())

⚠️ 注意事项与最佳实践

  1. 资源释放

    • 所有类在析构时自动关闭 socket但仍建议显式管理生命周期。
    • 避免未关闭上下文导致文件描述符泄漏。
  2. 消息过滤SUB

    • ZeroMQ SUB 的订阅是基于前缀匹配的字节流。
    • 当前实现使用 setsockopt(zmq.SUBSCRIBE, b'1000'),只会收到以 1000 开头的消息。
  3. 编码一致性

    • 发送与接收方必须使用相同编码(默认 UTF-8
  4. 异步处理器兼容性

    • RRServerPPPullerPairServer 均支持同步和异步处理器。
    • 使用 isinstance(x, Coroutine) 判断是否需要 await
  5. 错误处理建议

    • 生产环境中应在 try-except 中包裹 recv()send() 操作。
    • 添加超时机制防止阻塞。
  6. 性能提示

    • 使用 context = zmq.asyncio.Context.instance() 可共享上下文实例提升效率。

📚 扩展建议

  • 添加日志输出替代 print()
  • 增加心跳、重连机制。
  • 支持 TLS 加密传输。
  • 提供更高级的消息封装协议(如带时间戳、来源标识等)。

📎 版本信息

  • 编写日期2025年4月5日
  • 作者AI Assistant
  • 许可MIT假设代码开源

📌 提示:将此模块保存为 zmq_async.py,即可导入使用:

from zmq_async import Publisher, Subscriber, ...