8.3 KiB
Topic Messaging System with ZeroMQ
这是一个基于 ZeroMQ (ZMQ) 的发布/订阅(Pub/Sub)消息系统,支持配置驱动的服务器、发布者和订阅者。该系统使用 XPUB 和 XSUB 套接字类型构建一个代理(proxy),实现多对多的消息广播机制。
📦 模块依赖
import sys
import zmq
import time
from zmq import Context
from appPublic.jsonConfig import getConfig
zmq: 使用 pyzmq 实现高性能异步消息通信。Context: ZeroMQ 上下文对象,用于管理套接字资源。getConfig(): 来自appPublic.jsonConfig的工具函数,加载 JSON 格式的配置文件。
🔧 核心组件
1. TopicServer —— 主题消息代理服务器
功能说明
TopicServer 是一个 ZeroMQ 代理服务,它通过 zmq.XSUB 接收来自多个发布者的主题消息,并通过 zmq.XPUB 将这些消息转发给所有匹配主题的订阅者。
它实现了经典的“反向代理”模式:前端接收发布者连接,后端向订阅者广播。
初始化参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
address |
str | '127.0.0.1' |
绑定地址(可改为公网IP) |
pub_port |
str/int | '5566' |
发布者连接的端口(XSUB 端) |
sub_port |
str/int | '5567' |
订阅者连接的端口(XPUB 端) |
构造函数行为
- 打印当前 ZeroMQ 库版本信息(libzmq 和 pyzmq)
- 创建全局上下文单例:
Context.instance() - 设置两个 TCP 地址:
pub_port:tcp://<address>:<pub_port>→ 发布者连接此地址sub_port:tcp://<address>:<sub_port>→ 订阅者连接此地址
- 调用
xpub_xsub_proxy()启动代理循环
方法:xpub_xsub_proxy()
创建并启动 ZeroMQ 内置代理:
zmq.proxy(frontend_pubs, backend_subs)
套接字角色
| 套接字 | 类型 | 角色 | 绑定地址 |
|---|---|---|---|
frontend_pubs |
XSUB |
接收发布者消息 | tcp://<address>:<pub_port> |
backend_subs |
XPUB |
向订阅者广播消息 | tcp://<address>:<sub_port> |
✅
bind()表示服务器监听连接;发布者与订阅者需调用connect()连接到对应端口。
输出日志
Init proxy
Try: Proxy... CONNECT!
CONNECT successful!
⚠️ 注意:
zmq.proxy()是阻塞调用,程序会一直运行在此处直到中断。
2. ConfiguredTopicServer —— 配置驱动的代理服务器
继承自 TopicServer,从外部 JSON 配置文件读取参数。
配置结构要求
{
"topicserver": {
"address": "11.11.1.11",
"pub_port": 1234,
"sub_port": 1235
}
}
⚠️ 错误拼写提示:原代码中
"sub_server"应为"sub_port",否则将导致属性访问失败。
异常处理
- 若配置不存在或缺少
topicserver字段,则抛出未定义异常MissTopicServerConfig(需提前定义)。
示例用法
server = ConfiguredTopicServer()
# 自动加载配置并启动代理
3. TopicPublisher —— 主题发布者客户端
允许向代理发送带主题的消息。
初始化参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
topic |
str | 'en' |
消息主题(如语言标识) |
address |
str | '127.0.0.1' |
代理的发布端地址 |
port |
str/int | '5566' |
代理的发布端口(对应 XSUB) |
关键行为
- 将
topic编码为 UTF-8 字节串(ZeroMQ 要求) - 创建
PUB套接字并连接到代理的pub_port - 添加
time.sleep(0.5):防止连接未建立即发送消息("slow joiner" 问题)
方法:send(message)
发送一条多部分消息:[topic_bytes, message_bytes]
publisher.send("Hello World")
# 实际发送: [b'en', b'Hello World']
✅ 支持任意字符串内容,自动编码为 UTF-8。
4. ConfiguredTopicPublisher —— 配置驱动的发布者
自动从配置文件读取代理地址和端口。
用法示例
pub = ConfiguredTopicPublisher(topic='news')
pub.send('Breaking news!')
使用配置中的
address和pub_port,无需硬编码。
5. TopicSubscriber —— 主题订阅者客户端
接收特定主题的消息。
初始化参数
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
topic |
str/list | '' |
订阅的主题(支持单个或列表) |
address |
str | '127.0.0.1' |
代理的订阅地址 |
port |
str/int | '5567' |
代理的订阅端口(对应 XPUB) |
callback |
callable | None |
收到消息时的回调函数 |
订阅逻辑
- 若
topic为字符串 → 订阅该主题 - 若
topic为列表 → 循环订阅每个主题
sub = TopicSubscriber(topic=['en', 'cn'])
ZeroMQ SUB 套接字必须显式调用
setsockopt(SUBSCRIBE, ...)才能接收消息。
方法:run()
进入无限循环,持续接收消息:
while True:
msg_received = self.sub.recv_multipart()
print("sub {}: {}".format(self.topic, msg_received))
if self.callback:
self.callback(msg_received)
输出格式示例:
sub en: [b'en', b'Hello']
6. ConfiguredTopicSubscriber —— 配置驱动的订阅者
类似其他配置类,自动加载代理连接参数。
用法示例
def on_msg(msg):
print("Received:", msg)
sub = ConfiguredTopicSubscriber(topic='en', callback=on_msg)
sub.run() # 开始监听
🌐 系统架构图(文本描述)
+-------------+ +---------------------+
| Publisher 1 |-------->| |
+-------------+ | TopicServer |
| (XPUB/XSUB Proxy) |
+-------------+ | |---------> Subscriber A (topic=en)
| Publisher 2 |-------->| |---------> Subscriber B (topic=jp)
+-------------+ +---------------------+
↑ ↑
pub_port (5566) sub_port (5567)
- 所有发布者连接到
pub_port(XSUB) - 所有订阅者连接到
sub_port(XPUB) - 代理自动完成消息路由与过滤
🛠️ 使用示例
启动代理服务器(手动指定)
server = TopicServer(address='0.0.0.0', pub_port=5566, sub_port=5567)
# 阻塞运行
或使用配置方式启动
server = ConfiguredTopicServer()
发布消息
pub = TopicPublisher(topic='en', address='localhost', port=5566)
pub.send("Hello English Users!")
订阅消息
def handle_msg(msg_parts):
topic, content = msg_parts
print(f"Topic: {topic.decode()}, Msg: {content.decode()}")
sub = TopicSubscriber(topic='en', callback=handle_msg)
sub.run()
⚠️ 注意事项
-
端口顺序不能错:
- 发布者连
pub_port(XSUB 输入) - 订阅者连
sub_port(XPUB 输出)
- 发布者连
-
延迟连接问题:
time.sleep(0.5)在发布者中是临时解决方案- 更优做法:使用同步机制(如 PULL/PUSH 协调)
-
异常类缺失:
MissTopicServerConfig未在代码中定义,应补充:class MissTopicServerConfig(Exception): pass
-
编码一致性:
- 所有主题和消息均以 UTF-8 编码传输,确保跨平台兼容性。
-
性能建议:
Context.instance()全局共享,避免频繁创建上下文- 多线程环境下注意套接字非线程安全
📁 配置文件样例 (config.json)
{
"topicserver": {
"address": "127.0.0.1",
"pub_port": 5566,
"sub_port": 5567
}
}
确保路径正确且
getConfig()可读取。
📘 总结
| 组件 | 角色 | 套接字类型 | 连接方向 |
|---|---|---|---|
TopicServer |
消息代理 | XSUB + XPUB | bind |
TopicPublisher |
消息生产者 | PUB | connect → pub_port |
TopicSubscriber |
消息消费者 | SUB | connect → sub_port |
✅ 优点:
- 解耦发布者与订阅者
- 支持动态扩展
- 高吞吐低延迟
- 支持主题过滤
🔧 适用场景:
- 日志分发
- 实时通知系统
- 微服务间事件通信
- 多语言消息广播(如 en/jp/fr)
💡 提示:结合 Docker 部署
TopicServer,可实现跨主机消息分发。