52 lines
1.4 KiB
Python
52 lines
1.4 KiB
Python
import zmq
|
|
from zmq import Context
|
|
|
|
from flandre.utils.Msg import Msg
|
|
|
|
|
|
class BusClient:
|
|
fp = 5001
|
|
bp = 5002
|
|
|
|
def __init__(self,
|
|
*msgs: type(Msg),
|
|
ctx=None,
|
|
pub=True,
|
|
sub=True,
|
|
conflare=False,
|
|
poller=False
|
|
):
|
|
if ctx is None:
|
|
self.ctx: Context = zmq.Context()
|
|
else:
|
|
self.ctx = ctx
|
|
if sub:
|
|
self.sub = self.ctx.socket(zmq.SUB)
|
|
for msg in msgs:
|
|
self.sub.setsockopt(zmq.SUBSCRIBE, msg.magic() + msg.eid())
|
|
if msgs.__len__() == 0:
|
|
self.sub.setsockopt(zmq.SUBSCRIBE, b'')
|
|
if conflare:
|
|
self.sub.setsockopt(zmq.CONFLATE, 1)
|
|
self.sub.connect(f'tcp://127.0.0.1:{self.bp}')
|
|
if poller:
|
|
self.poller = zmq.Poller()
|
|
self.poller.register(self.sub, zmq.POLLIN)
|
|
if pub:
|
|
self.pub = self.ctx.socket(zmq.PUB)
|
|
self.pub.connect(f'tcp://127.0.0.1:{self.fp}')
|
|
|
|
def recv(self):
|
|
b = self.sub.recv()
|
|
return Msg.decode_msg(b)
|
|
|
|
def send(self, msg: Msg):
|
|
return self.pub.send(msg.encode_msg())
|
|
|
|
async def recv_async(self):
|
|
b = await self.sub.recv()
|
|
return Msg.decode_msg(b)
|
|
|
|
async def send_async(self, msg: Msg):
|
|
return self.pub.send(msg.encode_msg())
|