From bd3958e8ff73ea8539a077c663f8c38a1f4b474d Mon Sep 17 00:00:00 2001 From: yumoqing Date: Fri, 13 Feb 2026 14:58:39 +0800 Subject: [PATCH] bugfix --- appPublic/zmqapi.py | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/appPublic/zmqapi.py b/appPublic/zmqapi.py index c990117..c1a7036 100755 --- a/appPublic/zmqapi.py +++ b/appPublic/zmqapi.py @@ -1,24 +1,50 @@ import asyncio from collections.abc import Coroutine -# from asyncio.coroutines import iscoroutine import zmq import zmq.asyncio import json +from .worker import awaitify from zmq.asyncio import Context from zmq.utils.monitor import parse_monitor_message -async def zmq_subcribe(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None): +async def run_proxy(subscribe_url='tcp://127.0.0.1:5556', + publish_url='tcp://127.0.0.1:5555'): + + ctx = zmq.asyncio.Context() + + # 1. 面对 Publisher 的前端 (XSUB) + frontend = ctx.socket(zmq.XSUB) + frontend.bind(publish_url) # 发布者往这里 connect + + # 2. 面对 Subscriber 的后端 (XPUB) + backend = ctx.socket(zmq.XPUB) + backend.bind(subscribe_url) # 订阅者往这里 connect + + print("【Proxy】启动:Pub 连 5555,Sub 连 5556") + + # 核心:启动内置代理,它会自动把 frontend 接收的消息广播给 backend + # 并且把 backend 的订阅信息同步给 frontend + try: + await awaitify(zmq.proxy)(frontend, backend) + except Exception as e: + print(f"Proxy 异常退出: {e}") + finally: + frontend.close() + backend.close() + ctx.term() + +async def zmq_subcribe(special_key: str, subscribe_url='tcp://127.0.0.1:5556', callback=None): ctx = Context() sock = ctx.socket(zmq.SUB) - sock.connect(zmq_url) + sock.connect(subscribe_url) # 设置过滤:只接收以 special_key 开头的消息 sock.setsockopt_string(zmq.SUBSCRIBE, special_key) - # print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") + print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") content = '' try: @@ -38,12 +64,15 @@ async def zmq_subcribe(special_key: str, zmq_url='tcp://127.0.0.1:5555', callbac ctx.term() return content -async def zmq_publish(key, content, zmq_url='tcp://127.0.0.1:5555'): +async def zmq_publish(key, content, publish_url='tcp://127.0.0.1:5555'): ctx = Context() sock = ctx.socket(zmq.PUB) sock.setsockopt(zmq.LINGER, 1000) - sock.bind(zmq_url) + sock.connect(publish_url) + await asyncio.sleep(0.1) await sock.send_multipart([key.encode(), content.encode()]) + print(f"【PUB】消息已发送: {key}") + await asyncio.sleep(0.1) sock.close() ctx.term() @@ -214,3 +243,6 @@ class PairServer: if isinstance(ret,Coroutine): ret = await ret await socket.send(ret) + +if __name__ == '__main__': + asyncio.run(run_proxy())