flandre/test/legacy/test_zmq_req_clientr.py

35 lines
811 B
Python
Raw Normal View History

2025-04-19 17:50:09 +08:00
import zmq
import time
class MyReqClient:
def __init__(self, s, context=None, timeout=4000):
if context is None:
context = zmq.Context()
self.context = context
self.timeout = timeout
self.socket = context.socket(zmq.REQ)
self.socket.connect(s)
self.s = s
def recv(self):
r = self.socket.poll(self.timeout)
print(r)
if r == 1:
return self.socket.recv()
self.socket.close()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(self.s)
def send(self, data):
self.socket.send(data)
def request(self, data: bytes):
self.socket.send(data)
return self.recv()
c = MyReqClient(f"tcp://127.0.0.1:5555")
while True:
print(c.request(b'asd'))