apppublic/appPublic/zmqapi.py
2026-02-13 11:13:37 +08:00

217 lines
5.2 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from collections.abc import Coroutine
# from asyncio.coroutines import iscoroutine
import zmq
import zmq.asyncio
import json
from zmq.asyncio import Context
from zmq.utils.monitor import parse_monitor_message
async def zmq_subcribe(special_key: str, zmq_url='tcp://127.0.0.1:5555', callback=None):
ctx = Context()
sock = ctx.socket(zmq.SUB)
sock.connect(zmq_url)
# 设置过滤:只接收以 special_key 开头的消息
sock.setsockopt_string(zmq.SUBSCRIBE, special_key)
# print(f"【服务端】启动,仅等待 Key 为 '{special_key}' 的消息...")
content = ''
try:
while True:
# 接收消息multipart 格式,第一部分是 key第二部分是内容
msg = await sock.recv_multipart()
key = msg[0].decode()
content = msg[1].decode()
# print(f"【服务端】匹配成功Key: {key}, 内容: {content}")
if callback is None:
break
await callback(content)
except asyncio.CancelledError:
pass
finally:
sock.close()
ctx.term()
return content
async def zmq_publish(key, content, zmq_url='tcp://127.0.0.1:5555'):
ctx = Context()
sock = ctx.socket(zmq.PUB)
sock.setsockopt(zmq.LINGER, 1000)
sock.bind(zmq_url)
await sock.send_multipart([key.encode(), content.encode()])
sock.close()
ctx.term()
class Publisher:
def __init__(self,port,coding='utf-8',msgid=1000):
self.port = port
self.socket = None
self.coding = coding
self.msgid = msgid
context = zmq.asyncio.Context()
self.socket = context.socket(zmq.PUB)
self.socket.bind('tcp://*:%d' % self.port)
async def publish(self,msg,msgtype='text',msgid=-1):
print(msg,msgtype)
if msgid == -1:
msgid = self.msgid
if msgtype != 'text':
msg = json.dumps(msg)
msgtype = 'json'
s = '%d %s %s' % (msgid,msgtype,msg)
print(s,msgtype,msgid)
b = s.encode(self.coding)
await self.socket.send(b)
def __del__(self):
self.socket.close()
class Subscriber:
def __init__(self,host,ports,msgid,coding='utf-8'):
self.host = host
self.ports = ports
self.msgid = msgid
self.coding = coding
context = zmq.asyncio.Context()
self.socket = context.socket(zmq.SUB)
f = b'%d' % self.msgid
self.socket.setsockopt(zmq.SUBSCRIBE, f)
for p in self.ports:
self.socket.connect("tcp://%s:%d" % (self.host,p))
def addPort(self,port):
self.socket.connect("tcp://%s:%d" % (self.host,port))
#f = b'%d' % self.msgid
#self.socket.setsockopt(zmq.SUBSCRIBE, f)
async def subscribe(self):
ret = await self.socket.recv()
ret = ret.decode(self.coding)
msgid, msgtype, body = ret.split(' ',2)
print('msgid=',msgid,'msgtype=',msgtype,'body=',body)
if msgtype == 'json':
return json.loads(body)
return body
def __del__(self):
self.socket.close()
class RRServer:
"""
a request / response mode server
"""
def __init__(self,port,handler=None):
self.port = port
self.handler = handler
print(type(self.handler))
async def run(self):
running = True
context = zmq.asyncio.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:%s' % self.port)
while running:
rmsg = await socket.recv()
wmsg = rmsg
if self.handler is not None:
wmsg = self.handler(rmsg)
if isinstance(wmsg,Coroutine):
wmsg = await wmsg
await socket.send(wmsg)
socket.close()
class RRClient:
"""
a request / response mode client
"""
def __init__(self,host,port):
self.host = host
self.port = port
context = zmq.asyncio.Context()
self.socket = context.socket(zmq.REQ)
self.socket.connect('tcp://%s:%d' % (self.host,self.port))
async def request(self,msg):
await self.socket.send(msg)
return await self.socket.recv()
class PPPusher:
"""
pusher of Push / Pull mode
"""
def __init__(self,host,port):
self.host = host
self.port = port
context = zmq.asyncio.Context()
self.socket = context.socket(zmq.PUSH)
self.socket.bind('tcp://%s:%d' % (self.host,self.port))
async def push(self,msg):
await self.socket.send(msg)
class PPPuller:
"""
puller of Push / Pull mode
"""
def __init__(self,host,port,handler=None):
self.host = host
self.port = port
self.handler = handler
async def run(self):
self.running = True
context = zmq.asyncio.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://%s:%d' % (self.host,self.port))
while self.running:
msg = await self.socket.recv()
if self.handler is not None:
x = self.handler(msg)
if isinstance(x,Coroutine):
await x
class PairClient:
"""
client of Pair mode
"""
def __init__(self,host,port):
self.host = host
self.port = port
context = zmq.asyncio.Context()
self.socket = context.socket(zmq.PAIR)
self.socket.bind('tcp://%s:%d' % (self.host,self.port))
async def request(self,msg):
await self.socket.send(msg)
return await self.socket.recv()
class PairServer:
"""
server of Pair mode
"""
def __init__(self,port,handler=None):
self.port = port
self.handler = handler
self.running = True
async def run(self):
self.running = True
context = zmq.asyncio.Context()
socket = context.socket(zmq.PAIR)
socket.bind('tcp://*:%d' % self.port)
while self.running:
msg = await socket.recv()
ret = msg
if self.handler is not None:
ret = self.handler()
if isinstance(ret,Coroutine):
ret = await ret
await socket.send(ret)