diff --git a/appPublic/zmqapi.py b/appPublic/zmqapi.py index cf41167..f765617 100755 --- a/appPublic/zmqapi.py +++ b/appPublic/zmqapi.py @@ -42,19 +42,18 @@ async def run_proxy(): debug('run_proxy stopped...........') async def zmq_subcribe(special_key: str, callback=None): - config = getConfig() - subscribe_url = config.subscribe_url or 'tcp://127.0.0.1:5556' - ctx = Context() - sock = ctx.socket(zmq.SUB) - sock.connect(subscribe_url) - - # 设置过滤:只接收以 special_key 开头的消息 - sock.setsockopt_string(zmq.SUBSCRIBE, special_key) - - debug(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") - content = '' try: + config = getConfig() + subscribe_url = config.subscribe_url or 'tcp://127.0.0.1:5556' + ctx = Context() + sock = ctx.socket(zmq.SUB) + sock.connect(subscribe_url) + + # 设置过滤:只接收以 special_key 开头的消息 + sock.setsockopt_string(zmq.SUBSCRIBE, special_key) + debug(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") + while True: # 接收消息(multipart 格式,第一部分是 key,第二部分是内容) if callback: @@ -75,6 +74,8 @@ async def zmq_subcribe(special_key: str, callback=None): except asyncio.CancelledError: exception(f'Cancelled .......') pass + except Exception as e: + exception(f'{e}\n{format_exc()}' finally: sock.close() ctx.term()