This commit is contained in:
yumoqing 2026-02-13 14:58:39 +08:00
parent 20e994c7f3
commit bd3958e8ff

View File

@ -1,24 +1,50 @@
import asyncio
from collections.abc import Coroutine
# from asyncio.coroutines import iscoroutine
import zmq
import zmq.asyncio
import json
from .worker import awaitify
from zmq.asyncio import Context
from zmq.utils.monitor import parse_monitor_message
async def zmq_subcribe(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None):
async def run_proxy(subscribe_url='tcp://127.0.0.1:5556',
publish_url='tcp://127.0.0.1:5555'):
ctx = zmq.asyncio.Context()
# 1. 面对 Publisher 的前端 (XSUB)
frontend = ctx.socket(zmq.XSUB)
frontend.bind(publish_url) # 发布者往这里 connect
# 2. 面对 Subscriber 的后端 (XPUB)
backend = ctx.socket(zmq.XPUB)
backend.bind(subscribe_url) # 订阅者往这里 connect
print("【Proxy】启动Pub 连 5555Sub 连 5556")
# 核心:启动内置代理,它会自动把 frontend 接收的消息广播给 backend
# 并且把 backend 的订阅信息同步给 frontend
try:
await awaitify(zmq.proxy)(frontend, backend)
except Exception as e:
print(f"Proxy 异常退出: {e}")
finally:
frontend.close()
backend.close()
ctx.term()
async def zmq_subcribe(special_key: str, subscribe_url='tcp://127.0.0.1:5556', callback=None):
ctx = Context()
sock = ctx.socket(zmq.SUB)
sock.connect(zmq_url)
sock.connect(subscribe_url)
# 设置过滤:只接收以 special_key 开头的消息
sock.setsockopt_string(zmq.SUBSCRIBE, special_key)
# print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...")
print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...")
content = ''
try:
@ -38,12 +64,15 @@ async def zmq_subcribe(special_key: str, zmq_url='tcp://127.0.0.1:5555', callbac
ctx.term()
return content
async def zmq_publish(key, content, zmq_url='tcp://127.0.0.1:5555'):
async def zmq_publish(key, content, publish_url='tcp://127.0.0.1:5555'):
ctx = Context()
sock = ctx.socket(zmq.PUB)
sock.setsockopt(zmq.LINGER, 1000)
sock.bind(zmq_url)
sock.connect(publish_url)
await asyncio.sleep(0.1)
await sock.send_multipart([key.encode(), content.encode()])
print(f"【PUB】消息已发送: {key}")
await asyncio.sleep(0.1)
sock.close()
ctx.term()
@ -214,3 +243,6 @@ class PairServer:
if isinstance(ret,Coroutine):
ret = await ret
await socket.send(ret)
if __name__ == '__main__':
asyncio.run(run_proxy())