This commit is contained in:
yumoqing 2026-03-02 20:00:45 +08:00
parent d24a404b82
commit 85197704f6

View File

@ -10,7 +10,7 @@ from .worker import awaitify
from zmq.asyncio import Context from zmq.asyncio import Context
from zmq.utils.monitor import parse_monitor_message from zmq.utils.monitor import parse_monitor_message
from appPublic.jsonConfig import getConfig from appPublic.jsonConfig import getConfig
from appPublic.log import exception from appPublic.log import exception, debug
async def run_proxy(): async def run_proxy():
config = getConfig() config = getConfig()
@ -26,17 +26,18 @@ async def run_proxy():
backend = ctx.socket(zmq.XPUB) backend = ctx.socket(zmq.XPUB)
backend.bind(subscribe_url) # 订阅者往这里 connect backend.bind(subscribe_url) # 订阅者往这里 connect
print(f"【Proxy】启动Pub 连 {publish_url}Sub 连 {subscribe_url}") debug(f"【Proxy】启动Pub 连 {publish_url}Sub 连 {subscribe_url}")
# 核心:启动内置代理,它会自动把 frontend 接收的消息广播给 backend # 核心:启动内置代理,它会自动把 frontend 接收的消息广播给 backend
# 并且把 backend 的订阅信息同步给 frontend # 并且把 backend 的订阅信息同步给 frontend
try: try:
await awaitify(zmq.proxy)(frontend, backend) await awaitify(zmq.proxy)(frontend, backend)
except Exception as e: except Exception as e:
print(f"Proxy 异常退出: {e}") exception(f"Proxy 异常退出: {e}{format_exc()")
finally: finally:
frontend.close() frontend.close()
backend.close() backend.close()
ctx.term() ctx.term()
debug('run_proxy stopped...........')
async def zmq_subcribe(special_key: str, callback=None): async def zmq_subcribe(special_key: str, callback=None):
config = getConfig() config = getConfig()
@ -48,12 +49,13 @@ async def zmq_subcribe(special_key: str, callback=None):
# 设置过滤:只接收以 special_key 开头的消息 # 设置过滤:只接收以 special_key 开头的消息
sock.setsockopt_string(zmq.SUBSCRIBE, special_key) sock.setsockopt_string(zmq.SUBSCRIBE, special_key)
print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...") debug(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...")
content = '' content = ''
try: try:
while True: while True:
# 接收消息multipart 格式,第一部分是 key第二部分是内容 # 接收消息multipart 格式,第一部分是 key第二部分是内容
debug('## RECEIVING message on {special_key}')
msg = await sock.recv_multipart() msg = await sock.recv_multipart()
key = msg[0].decode() key = msg[0].decode()
content = msg[1].decode() content = msg[1].decode()
@ -66,6 +68,7 @@ async def zmq_subcribe(special_key: str, callback=None):
exception(f'{e}\n{format_exc()}') exception(f'{e}\n{format_exc()}')
except asyncio.CancelledError: except asyncio.CancelledError:
exception(f'Cancelled .......')
pass pass
finally: finally:
sock.close() sock.close()