This commit is contained in:
yumoqing 2026-02-12 18:06:05 +08:00
parent 5db566eec0
commit 7f0ed97e5a

View File

@ -13,42 +13,42 @@ from zmq.asyncio import Context
from zmq.utils.monitor import parse_monitor_message from zmq.utils.monitor import parse_monitor_message
async def zmq_subcriber(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None): async def zmq_subcriber(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None):
ctx = Context() ctx = Context()
sock = ctx.socket(zmq.SUB) sock = ctx.socket(zmq.SUB)
sock.connect(zmq_url) sock.connect(zmq_url)
# 设置过滤:只接收以 special_key 开头的消息 # 设置过滤:只接收以 special_key 开头的消息
sock.setsockopt_string(zmq.SUBSCRIBE, special_key) sock.setsockopt_string(zmq.SUBSCRIBE, special_key)
# print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") # print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...")
content = '' content = ''
try: try:
while True: while True:
# 接收消息multipart 格式,第一部分是 key第二部分是内容 # 接收消息multipart 格式,第一部分是 key第二部分是内容
msg = await sock.recv_multipart() msg = await sock.recv_multipart()
key = msg[0].decode() key = msg[0].decode()
content = msg[1].decode() content = msg[1].decode()
# print(f"【服务端】匹配成功Key: {key}, 内容: {content}") # print(f"【服务端】匹配成功Key: {key}, 内容: {content}")
if callback is None: if callback is None:
break break
await callback(content) await callback(content)
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
finally: finally:
sock.close() sock.close()
ctx.term() ctx.term()
return content return content
async def zmq_publish(key, content, zmq_url): async def zmq_publish(key, content, zmq_url):
ctx = Context() ctx = Context()
sock = ctx.socket(zmq.PUB) sock = ctx.socket(zmq.PUB)
monitor = sock.get_monitor_socket() monitor = sock.get_monitor_socket()
sock.bind(zmq_url) sock.bind(zmq_url)
sock.setsockopt(zmq.LINGER, 1000) sock.setsockopt(zmq.LINGER, 1000)
await sock.send_multipart([key.encode(), content.encode()]) await sock.send_multipart([key.encode(), content.encode()])
sock.close() sock.close()
ctx.term() ctx.term()
class Publisher: class Publisher:
def __init__(self,port,coding='utf-8',msgid=1000): def __init__(self,port,coding='utf-8',msgid=1000):