From 5db566eec0bcfe8cdc0e0886467a07c23ac4bfc2 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Thu, 12 Feb 2026 18:05:44 +0800 Subject: [PATCH] bugfix --- appPublic/zmqapi.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/appPublic/zmqapi.py b/appPublic/zmqapi.py index bc643a9..2313c73 100755 --- a/appPublic/zmqapi.py +++ b/appPublic/zmqapi.py @@ -7,6 +7,49 @@ import zmq import zmq.asyncio import json +import asyncio +import zmq +from zmq.asyncio import Context +from zmq.utils.monitor import parse_monitor_message + +async def zmq_subcriber(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None): + ctx = Context() + sock = ctx.socket(zmq.SUB) + sock.connect(zmq_url) + + # 设置过滤:只接收以 special_key 开头的消息 + sock.setsockopt_string(zmq.SUBSCRIBE, special_key) + + # print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") + + content = '' + try: + while True: + # 接收消息(multipart 格式,第一部分是 key,第二部分是内容) + msg = await sock.recv_multipart() + key = msg[0].decode() + content = msg[1].decode() + # print(f"【服务端】匹配成功!Key: {key}, 内容: {content}") + if callback is None: + break + await callback(content) + except asyncio.CancelledError: + pass + finally: + sock.close() + ctx.term() + return content + +async def zmq_publish(key, content, zmq_url): + ctx = Context() + sock = ctx.socket(zmq.PUB) + monitor = sock.get_monitor_socket() + sock.bind(zmq_url) + sock.setsockopt(zmq.LINGER, 1000) + await sock.send_multipart([key.encode(), content.encode()]) + sock.close() + ctx.term() + class Publisher: def __init__(self,port,coding='utf-8',msgid=1000): self.port = port