add req client

This commit is contained in:
flandre 2025-04-19 17:50:09 +08:00
parent 4dd023a350
commit 41b0e01d7d
9 changed files with 91 additions and 138 deletions

View File

@ -1,7 +1,7 @@
import zmq import zmq
from zmq import Context from zmq import Context
from flandre.utils.Msg import Msg from flandre.utils.Msg import Msg, InterruptMsg
class BusClient: class BusClient:
@ -14,7 +14,8 @@ class BusClient:
pub=True, pub=True,
sub=True, sub=True,
conflare=False, conflare=False,
poller=False poller=False,
req_socket_str: str = None,
): ):
if ctx is None: if ctx is None:
self.ctx: Context = zmq.Context() self.ctx: Context = zmq.Context()
@ -35,6 +36,13 @@ class BusClient:
if pub: if pub:
self.pub = self.ctx.socket(zmq.PUB) self.pub = self.ctx.socket(zmq.PUB)
self.pub.connect(f'tcp://127.0.0.1:{self.fp}') self.pub.connect(f'tcp://127.0.0.1:{self.fp}')
self.req_socket = None
if req_socket_str is not None:
self.req_socket_str = req_socket_str
self.poller.register(self.req_socket, zmq.POLLIN)
self.req_socket = self.ctx.socket(zmq.REQ)
self.req_socket.connect(req_socket_str)
# todo fix poller
def recv(self): def recv(self):
b = self.sub.recv() b = self.sub.recv()
@ -49,3 +57,18 @@ class BusClient:
async def send_async(self, msg: Msg): async def send_async(self, msg: Msg):
return self.pub.send(msg.encode_msg()) return self.pub.send(msg.encode_msg())
def req_interrupt(self, b: bytes, interrupt_name: str, timeout=3000):
while True:
self.req_socket.send(b)
r = dict(self.poller.poll(timeout))
if self.req_socket in r:
return self.req_socket.recv()
self.req_socket.close()
self.req_socket = self.ctx.socket(zmq.REQ)
self.req_socket.connect(self.req_socket_str)
if self.sub in r:
msg = self.recv()
if isinstance(msg, InterruptMsg):
if msg.value == interrupt_name:
return None

View File

@ -94,7 +94,6 @@ class Adv(QMainWindow, Ui_MainWindow):
# self.b_play_live.clicked.connect(self.on_play_live) # self.b_play_live.clicked.connect(self.on_play_live)
# self.b_play_playback.clicked.connect(self.on_play_playback) # self.b_play_playback.clicked.connect(self.on_play_playback)
self.b_record.clicked.connect(self.on_record)
self.record = False self.record = False
self.device_connected = False self.device_connected = False
self.device_enabled = False self.device_enabled = False
@ -178,7 +177,7 @@ class Adv(QMainWindow, Ui_MainWindow):
"Set New Imaging Config Name", "Set New Imaging Config Name",
"Config Name:", "Config Name:",
QLineEdit.EchoMode.Normal, QLineEdit.EchoMode.Normal,
"") self.c_imaging_config.currentText())
if okPressed and filename != '': if okPressed and filename != '':
(C.imaging_config_folder / f'{filename}.json').write_text(self.arg.json()) (C.imaging_config_folder / f'{filename}.json').write_text(self.arg.json())
idx = self.c_imaging_config.findText(filename) idx = self.c_imaging_config.findText(filename)
@ -324,15 +323,16 @@ class Adv(QMainWindow, Ui_MainWindow):
pass pass
self.b_device_connection.clicked.connect(self.on_device_connect) self.b_device_connection.clicked.connect(self.on_device_connect)
def on_record(self): @pyqtSlot()
def on_b_record_clicked(self):
if self.record: if self.record:
self.l_record_commit.setEnabled(True) self.l_record_commit.setEnabled(True)
self.p.send(SetRecordMsg(False)) self.p.send(SetRecordMsg(False))
self.record = False self.record = False
self.b_record.setStyleSheet('') self.b_record.setStyleSheet('')
else: else:
self.l_record_commit.setEnabled(False)
if self.l_record_commit.text() != '': if self.l_record_commit.text() != '':
self.l_record_commit.setEnabled(False)
self.record_size_cnt = 0 self.record_size_cnt = 0
self.record_frame_cnt = 0 self.record_frame_cnt = 0
self.p.send(SetRecordMsg(True, self.l_record_commit.text(), self.l_base.text())) self.p.send(SetRecordMsg(True, self.l_record_commit.text(), self.l_base.text()))

View File

@ -13,12 +13,13 @@ class Node:
bp = BusClient.bp bp = BusClient.bp
topics = [] topics = []
def __init__(self, enable_init=True, level=logging.INFO, conflare=False, broker=False): def __init__(self, enable_init=True, level=logging.INFO, conflare=False, broker=False, req=False):
self.enable_init = enable_init self.enable_init = enable_init
self.isalive = True self.isalive = True
self.level = level self.level = level
self.conflare = conflare self.conflare = conflare
self.broker = broker self.broker = broker
self.req = req
def recv(self): def recv(self):
return self.c.recv() return self.c.recv()
@ -55,7 +56,8 @@ class Node:
self.context = zmq.Context() self.context = zmq.Context()
if self.enable_init: if self.enable_init:
self.c = BusClient(*([KillMsg, Msg1] + self.topics), poller=True, conflare=self.conflare) self.c = BusClient(*([KillMsg, Msg1] + self.topics),
poller=True, conflare=self.conflare, req_socket_str=self.req)
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
self.setup() self.setup()

View File

@ -29,7 +29,7 @@ class Ui_MainWindow(object):
self.s_beta = QtWidgets.QSlider(parent=self.centralwidget) self.s_beta = QtWidgets.QSlider(parent=self.centralwidget)
self.s_beta.setMinimum(1) self.s_beta.setMinimum(1)
self.s_beta.setMaximum(60) self.s_beta.setMaximum(60)
self.s_beta.setProperty("value", 10) self.s_beta.setProperty("value", 30)
self.s_beta.setOrientation(QtCore.Qt.Orientation.Horizontal) self.s_beta.setOrientation(QtCore.Qt.Orientation.Horizontal)
self.s_beta.setObjectName("s_beta") self.s_beta.setObjectName("s_beta")
self.gridLayout_5.addWidget(self.s_beta, 9, 1, 1, 1) self.gridLayout_5.addWidget(self.s_beta, 9, 1, 1, 1)
@ -177,11 +177,9 @@ class Ui_MainWindow(object):
self.label_17.setObjectName("label_17") self.label_17.setObjectName("label_17")
self.horizontalLayout_12.addWidget(self.label_17) self.horizontalLayout_12.addWidget(self.label_17)
self.l_record_commit = QtWidgets.QLineEdit(parent=self.g_live) self.l_record_commit = QtWidgets.QLineEdit(parent=self.g_live)
self.l_record_commit.setEnabled(False)
self.l_record_commit.setObjectName("l_record_commit") self.l_record_commit.setObjectName("l_record_commit")
self.horizontalLayout_12.addWidget(self.l_record_commit) self.horizontalLayout_12.addWidget(self.l_record_commit)
self.b_record = QtWidgets.QPushButton(parent=self.g_live) self.b_record = QtWidgets.QPushButton(parent=self.g_live)
self.b_record.setEnabled(False)
self.b_record.setObjectName("b_record") self.b_record.setObjectName("b_record")
self.horizontalLayout_12.addWidget(self.b_record) self.horizontalLayout_12.addWidget(self.b_record)
self.verticalLayout_6.addLayout(self.horizontalLayout_12) self.verticalLayout_6.addLayout(self.horizontalLayout_12)

View File

@ -40,7 +40,7 @@
<number>60</number> <number>60</number>
</property> </property>
<property name="value"> <property name="value">
<number>10</number> <number>30</number>
</property> </property>
<property name="orientation"> <property name="orientation">
<enum>Qt::Orientation::Horizontal</enum> <enum>Qt::Orientation::Horizontal</enum>
@ -341,17 +341,10 @@
</widget> </widget>
</item> </item>
<item> <item>
<widget class="QLineEdit" name="l_record_commit"> <widget class="QLineEdit" name="l_record_commit"/>
<property name="enabled">
<bool>false</bool>
</property>
</widget>
</item> </item>
<item> <item>
<widget class="QPushButton" name="b_record"> <widget class="QPushButton" name="b_record">
<property name="enabled">
<bool>false</bool>
</property>
<property name="text"> <property name="text">
<string>Record</string> <string>Record</string>
</property> </property>

View File

@ -21,6 +21,7 @@ class BG(Enum):
StrMsg = auto() StrMsg = auto()
IntMsg = auto() IntMsg = auto()
MaxMsg = auto() MaxMsg = auto()
InterruptMsg = auto()
MoveAxisMsg = auto() MoveAxisMsg = auto()
ImageArgMsg = auto() ImageArgMsg = auto()
SetSeqMetaMsg = auto() SetSeqMetaMsg = auto()
@ -155,6 +156,9 @@ class IntMsg(Msg):
class KeyPressMsg(StrMsg): class KeyPressMsg(StrMsg):
pass pass
@dataclasses.dataclass
class InterruptMsg(StrMsg):
pass
@dataclasses.dataclass @dataclasses.dataclass
class BoolMsg(Msg): class BoolMsg(Msg):

View File

@ -1,118 +0,0 @@
# import json
# import zipfile
# from pathlib import Path
#
# import cupy as cp
# import numpy as np
#
# from flandre.utils.RfMeta import RfFrameMeta, RfSequenceMeta
#
#
# class RfFrame:
# def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None):
# self.data = data
# self.meta = meta
# self._seq = seq
# self.is_zip = False
# if isinstance(self.data, Path):
# if self.data.parent.suffix == '.zip':
# self.is_zip = True
#
# def save(self, folder: Path):
# (folder / self.meta.name).write_bytes(self.bytes)
#
# @property
# def seq(self):
# if self._seq is None and isinstance(self.data, Path):
# return RfSequence.from_folder(self.data.parent)
# return self._seq
#
# @property
# def bytes(self):
# if self.is_zip:
# return self.data.__str__().encode()
# if isinstance(self.data, bytes):
# return self.data
# if isinstance(self.data, Path):
# return self.data.read_bytes()
#
# def mat(self, device='gpu'):
# from flandre.utils.RfMat import RfMat
#
# if device == 'gpu':
# arr = cp.frombuffer(self.bytes, dtype=cp.int16)
# else:
# arr = np.frombuffer(self.bytes, dtype=np.int16)
# return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta)
#
#
# class RfSequence:
# def __init__(self, frames: list[RfFrame], meta: RfSequenceMeta):
# self.frames = frames
# self.meta = meta
#
# @classmethod
# def from_folder(cls, folder: Path | str) -> 'RfSequence':
# folder = Path(folder)
# if not folder.exists():
# raise FileNotFoundError
# meta = RfSequenceMeta.from_path(folder)
# rs = RfSequence([], meta)
# for f in folder.glob('*.bin'):
# rs.frames.append(RfFrame(f, RfFrameMeta.from_path(f), seq=rs))
# return rs
#
# @classmethod
# def from_zip(cls, z: Path | str) -> 'RfSequence':
# z = Path(z)
# if not z.exists():
# raise FileNotFoundError
# meta = RfSequenceMeta.from_name(z.stem)
# rs = RfSequence([], meta)
# zz = zipfile.ZipFile(z)
# max_idx = max([int(Path(item.filename).stem) for item in zz.infolist()])
# for i in range(max_idx + 1):
# j = json.loads(zz.open(f'{i}.json').read())
# rs.frames.append(RfFrame(z / f'{i}.zst', RfFrameMeta(**j), seq=rs))
# return rs
#
# @property
# def all(self):
# pass
#
# def query(self):
# pass
#
# @property
# def seq_id_minmax(self):
# mmin = 2 ** 32
# mmax = 0
# for f in self.frames:
# mmin = min(mmin, f.meta.sequence_id)
# mmax = max(mmax, f.meta.sequence_id)
# return mmin, mmax
#
#
#
#
#
# if __name__ == '__main__':
# f = RfSequence2('/mnt/16T/private_dataset/us/steel-top,U=30,M=PWI,S=(256 1502).zip').frames
# for f2 in f:
# print(f2.__bytes__().__len__())
# # print(Path('xxx/asdasdasd_asd=a1,ds1.123').with_suffix(''))
# # print(Path('xxx/asdasdasd_asd=a1,ds1.123').stem)
# # t = (1, 2)
# # f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI')
# # print(f.commit)
# # print(f.name)
# # print(RfSequence.RfSequenceMeta.p2t)
# # f = RfFrame.RfFrameMeta(123, 345)
# # print(f.name)
# # rs = RfSequence([], RfSequenceMeta())
# # rs = RfSequence.from_folder(
# # '/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)')
# # # print(rs.meta)
# # for frame in rs.frames:
# # if frame.mat().rotate90().show((1080, 1920)) == ord('q'):
# # break

View File

@ -0,0 +1,34 @@
import zmq
import time
class MyReqClient:
def __init__(self, s, context=None, timeout=4000):
if context is None:
context = zmq.Context()
self.context = context
self.timeout = timeout
self.socket = context.socket(zmq.REQ)
self.socket.connect(s)
self.s = s
def recv(self):
r = self.socket.poll(self.timeout)
print(r)
if r == 1:
return self.socket.recv()
self.socket.close()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(self.s)
def send(self, data):
self.socket.send(data)
def request(self, data: bytes):
self.socket.send(data)
return self.recv()
c = MyReqClient(f"tcp://127.0.0.1:5555")
while True:
print(c.request(b'asd'))

View File

@ -0,0 +1,17 @@
import time
import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
# s.setsockopt(zmq.HEARTBEAT_IVL, 1000)
# s.setsockopt(zmq.HEARTBEAT_TTL, 1000)
# s.setsockopt(zmq.HEARTBEAT_TIMEOUT, 1000)
s.bind('tcp://127.0.0.1:5555')
while True:
s.recv()
print('recv')
time.sleep(2)
print('send')
s.send(b'hello')
# break