This commit is contained in:
yumoqing 2026-02-12 18:05:44 +08:00
parent 4921a5c62c
commit 5db566eec0

View File

@ -7,6 +7,49 @@ import zmq
import zmq.asyncio
import json
import asyncio
import zmq
from zmq.asyncio import Context
from zmq.utils.monitor import parse_monitor_message
async def zmq_subcriber(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None):
ctx = Context()
sock = ctx.socket(zmq.SUB)
sock.connect(zmq_url)
# 设置过滤:只接收以 special_key 开头的消息
sock.setsockopt_string(zmq.SUBSCRIBE, special_key)
# print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...")
content = ''
try:
while True:
# 接收消息multipart 格式,第一部分是 key第二部分是内容
msg = await sock.recv_multipart()
key = msg[0].decode()
content = msg[1].decode()
# print(f"【服务端】匹配成功Key: {key}, 内容: {content}")
if callback is None:
break
await callback(content)
except asyncio.CancelledError:
pass
finally:
sock.close()
ctx.term()
return content
async def zmq_publish(key, content, zmq_url):
ctx = Context()
sock = ctx.socket(zmq.PUB)
monitor = sock.get_monitor_socket()
sock.bind(zmq_url)
sock.setsockopt(zmq.LINGER, 1000)
await sock.send_multipart([key.encode(), content.encode()])
sock.close()
ctx.term()
class Publisher:
def __init__(self,port,coding='utf-8',msgid=1000):
self.port = port