diff --git a/flandre/BusClient.py b/flandre/BusClient.py index 40cb1ca..fcd150b 100644 --- a/flandre/BusClient.py +++ b/flandre/BusClient.py @@ -1,7 +1,7 @@ import zmq from zmq import Context -from flandre.utils.Msg import Msg +from flandre.utils.Msg import Msg, InterruptMsg class BusClient: @@ -14,7 +14,8 @@ class BusClient: pub=True, sub=True, conflare=False, - poller=False + poller=False, + req_socket_str: str = None, ): if ctx is None: self.ctx: Context = zmq.Context() @@ -35,6 +36,13 @@ class BusClient: if pub: self.pub = self.ctx.socket(zmq.PUB) self.pub.connect(f'tcp://127.0.0.1:{self.fp}') + self.req_socket = None + if req_socket_str is not None: + self.req_socket_str = req_socket_str + self.poller.register(self.req_socket, zmq.POLLIN) + self.req_socket = self.ctx.socket(zmq.REQ) + self.req_socket.connect(req_socket_str) + # todo fix poller def recv(self): b = self.sub.recv() @@ -49,3 +57,18 @@ class BusClient: async def send_async(self, msg: Msg): return self.pub.send(msg.encode_msg()) + + def req_interrupt(self, b: bytes, interrupt_name: str, timeout=3000): + while True: + self.req_socket.send(b) + r = dict(self.poller.poll(timeout)) + if self.req_socket in r: + return self.req_socket.recv() + self.req_socket.close() + self.req_socket = self.ctx.socket(zmq.REQ) + self.req_socket.connect(self.req_socket_str) + if self.sub in r: + msg = self.recv() + if isinstance(msg, InterruptMsg): + if msg.value == interrupt_name: + return None diff --git a/flandre/nodes/MainUI.py b/flandre/nodes/MainUI.py index a964df7..b010aa4 100644 --- a/flandre/nodes/MainUI.py +++ b/flandre/nodes/MainUI.py @@ -94,7 +94,6 @@ class Adv(QMainWindow, Ui_MainWindow): # self.b_play_live.clicked.connect(self.on_play_live) # self.b_play_playback.clicked.connect(self.on_play_playback) - self.b_record.clicked.connect(self.on_record) self.record = False self.device_connected = False self.device_enabled = False @@ -178,7 +177,7 @@ class Adv(QMainWindow, Ui_MainWindow): "Set New Imaging Config Name", "Config Name:", QLineEdit.EchoMode.Normal, - "") + self.c_imaging_config.currentText()) if okPressed and filename != '': (C.imaging_config_folder / f'{filename}.json').write_text(self.arg.json()) idx = self.c_imaging_config.findText(filename) @@ -324,15 +323,16 @@ class Adv(QMainWindow, Ui_MainWindow): pass self.b_device_connection.clicked.connect(self.on_device_connect) - def on_record(self): + @pyqtSlot() + def on_b_record_clicked(self): if self.record: self.l_record_commit.setEnabled(True) self.p.send(SetRecordMsg(False)) self.record = False self.b_record.setStyleSheet('') else: - self.l_record_commit.setEnabled(False) if self.l_record_commit.text() != '': + self.l_record_commit.setEnabled(False) self.record_size_cnt = 0 self.record_frame_cnt = 0 self.p.send(SetRecordMsg(True, self.l_record_commit.text(), self.l_base.text())) diff --git a/flandre/nodes/Node.py b/flandre/nodes/Node.py index a57c057..55d3c2d 100644 --- a/flandre/nodes/Node.py +++ b/flandre/nodes/Node.py @@ -13,12 +13,13 @@ class Node: bp = BusClient.bp topics = [] - def __init__(self, enable_init=True, level=logging.INFO, conflare=False, broker=False): + def __init__(self, enable_init=True, level=logging.INFO, conflare=False, broker=False, req=False): self.enable_init = enable_init self.isalive = True self.level = level self.conflare = conflare self.broker = broker + self.req = req def recv(self): return self.c.recv() @@ -55,7 +56,8 @@ class Node: self.context = zmq.Context() if self.enable_init: - self.c = BusClient(*([KillMsg, Msg1] + self.topics), poller=True, conflare=self.conflare) + self.c = BusClient(*([KillMsg, Msg1] + self.topics), + poller=True, conflare=self.conflare, req_socket_str=self.req) def __call__(self, *args, **kwargs): self.setup() diff --git a/flandre/pyqt/Main.py b/flandre/pyqt/Main.py index 692f3e8..d18fc6a 100644 --- a/flandre/pyqt/Main.py +++ b/flandre/pyqt/Main.py @@ -29,7 +29,7 @@ class Ui_MainWindow(object): self.s_beta = QtWidgets.QSlider(parent=self.centralwidget) self.s_beta.setMinimum(1) self.s_beta.setMaximum(60) - self.s_beta.setProperty("value", 10) + self.s_beta.setProperty("value", 30) self.s_beta.setOrientation(QtCore.Qt.Orientation.Horizontal) self.s_beta.setObjectName("s_beta") self.gridLayout_5.addWidget(self.s_beta, 9, 1, 1, 1) @@ -177,11 +177,9 @@ class Ui_MainWindow(object): self.label_17.setObjectName("label_17") self.horizontalLayout_12.addWidget(self.label_17) self.l_record_commit = QtWidgets.QLineEdit(parent=self.g_live) - self.l_record_commit.setEnabled(False) self.l_record_commit.setObjectName("l_record_commit") self.horizontalLayout_12.addWidget(self.l_record_commit) self.b_record = QtWidgets.QPushButton(parent=self.g_live) - self.b_record.setEnabled(False) self.b_record.setObjectName("b_record") self.horizontalLayout_12.addWidget(self.b_record) self.verticalLayout_6.addLayout(self.horizontalLayout_12) diff --git a/flandre/pyqt/Main.ui b/flandre/pyqt/Main.ui index 9b9c8cc..9d53f82 100644 --- a/flandre/pyqt/Main.ui +++ b/flandre/pyqt/Main.ui @@ -40,7 +40,7 @@ 60 - 10 + 30 Qt::Orientation::Horizontal @@ -341,17 +341,10 @@ - - - false - - + - - false - Record diff --git a/flandre/utils/Msg.py b/flandre/utils/Msg.py index 387d515..2d25954 100644 --- a/flandre/utils/Msg.py +++ b/flandre/utils/Msg.py @@ -21,6 +21,7 @@ class BG(Enum): StrMsg = auto() IntMsg = auto() MaxMsg = auto() + InterruptMsg = auto() MoveAxisMsg = auto() ImageArgMsg = auto() SetSeqMetaMsg = auto() @@ -155,6 +156,9 @@ class IntMsg(Msg): class KeyPressMsg(StrMsg): pass +@dataclasses.dataclass +class InterruptMsg(StrMsg): + pass @dataclasses.dataclass class BoolMsg(Msg): diff --git a/flandre/utils/RfFile.py b/flandre/utils/RfFile.py deleted file mode 100644 index 5201643..0000000 --- a/flandre/utils/RfFile.py +++ /dev/null @@ -1,118 +0,0 @@ -# import json -# import zipfile -# from pathlib import Path -# -# import cupy as cp -# import numpy as np -# -# from flandre.utils.RfMeta import RfFrameMeta, RfSequenceMeta -# -# -# class RfFrame: -# def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None): -# self.data = data -# self.meta = meta -# self._seq = seq -# self.is_zip = False -# if isinstance(self.data, Path): -# if self.data.parent.suffix == '.zip': -# self.is_zip = True -# -# def save(self, folder: Path): -# (folder / self.meta.name).write_bytes(self.bytes) -# -# @property -# def seq(self): -# if self._seq is None and isinstance(self.data, Path): -# return RfSequence.from_folder(self.data.parent) -# return self._seq -# -# @property -# def bytes(self): -# if self.is_zip: -# return self.data.__str__().encode() -# if isinstance(self.data, bytes): -# return self.data -# if isinstance(self.data, Path): -# return self.data.read_bytes() -# -# def mat(self, device='gpu'): -# from flandre.utils.RfMat import RfMat -# -# if device == 'gpu': -# arr = cp.frombuffer(self.bytes, dtype=cp.int16) -# else: -# arr = np.frombuffer(self.bytes, dtype=np.int16) -# return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta) -# -# -# class RfSequence: -# def __init__(self, frames: list[RfFrame], meta: RfSequenceMeta): -# self.frames = frames -# self.meta = meta -# -# @classmethod -# def from_folder(cls, folder: Path | str) -> 'RfSequence': -# folder = Path(folder) -# if not folder.exists(): -# raise FileNotFoundError -# meta = RfSequenceMeta.from_path(folder) -# rs = RfSequence([], meta) -# for f in folder.glob('*.bin'): -# rs.frames.append(RfFrame(f, RfFrameMeta.from_path(f), seq=rs)) -# return rs -# -# @classmethod -# def from_zip(cls, z: Path | str) -> 'RfSequence': -# z = Path(z) -# if not z.exists(): -# raise FileNotFoundError -# meta = RfSequenceMeta.from_name(z.stem) -# rs = RfSequence([], meta) -# zz = zipfile.ZipFile(z) -# max_idx = max([int(Path(item.filename).stem) for item in zz.infolist()]) -# for i in range(max_idx + 1): -# j = json.loads(zz.open(f'{i}.json').read()) -# rs.frames.append(RfFrame(z / f'{i}.zst', RfFrameMeta(**j), seq=rs)) -# return rs -# -# @property -# def all(self): -# pass -# -# def query(self): -# pass -# -# @property -# def seq_id_minmax(self): -# mmin = 2 ** 32 -# mmax = 0 -# for f in self.frames: -# mmin = min(mmin, f.meta.sequence_id) -# mmax = max(mmax, f.meta.sequence_id) -# return mmin, mmax -# -# -# -# -# -# if __name__ == '__main__': -# f = RfSequence2('/mnt/16T/private_dataset/us/steel-top,U=30,M=PWI,S=(256 1502).zip').frames -# for f2 in f: -# print(f2.__bytes__().__len__()) -# # print(Path('xxx/asdasdasd_asd=a1,ds1.123').with_suffix('')) -# # print(Path('xxx/asdasdasd_asd=a1,ds1.123').stem) -# # t = (1, 2) -# # f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI') -# # print(f.commit) -# # print(f.name) -# # print(RfSequence.RfSequenceMeta.p2t) -# # f = RfFrame.RfFrameMeta(123, 345) -# # print(f.name) -# # rs = RfSequence([], RfSequenceMeta()) -# # rs = RfSequence.from_folder( -# # '/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)') -# # # print(rs.meta) -# # for frame in rs.frames: -# # if frame.mat().rotate90().show((1080, 1920)) == ord('q'): -# # break diff --git a/test/test_zmq_req_clientr.py b/test/test_zmq_req_clientr.py new file mode 100644 index 0000000..b4fcc8e --- /dev/null +++ b/test/test_zmq_req_clientr.py @@ -0,0 +1,34 @@ +import zmq +import time + + +class MyReqClient: + def __init__(self, s, context=None, timeout=4000): + if context is None: + context = zmq.Context() + self.context = context + self.timeout = timeout + self.socket = context.socket(zmq.REQ) + self.socket.connect(s) + self.s = s + + def recv(self): + r = self.socket.poll(self.timeout) + print(r) + if r == 1: + return self.socket.recv() + self.socket.close() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect(self.s) + + def send(self, data): + self.socket.send(data) + + def request(self, data: bytes): + self.socket.send(data) + return self.recv() + + +c = MyReqClient(f"tcp://127.0.0.1:5555") +while True: + print(c.request(b'asd')) diff --git a/test/test_zmq_req_server.py b/test/test_zmq_req_server.py new file mode 100644 index 0000000..bcfa7ea --- /dev/null +++ b/test/test_zmq_req_server.py @@ -0,0 +1,17 @@ +import time + +import zmq + +c = zmq.Context() +s = c.socket(zmq.REP) +# s.setsockopt(zmq.HEARTBEAT_IVL, 1000) +# s.setsockopt(zmq.HEARTBEAT_TTL, 1000) +# s.setsockopt(zmq.HEARTBEAT_TIMEOUT, 1000) +s.bind('tcp://127.0.0.1:5555') +while True: + s.recv() + print('recv') + time.sleep(2) + print('send') + s.send(b'hello') + # break