From 00f9e241990f8c61a410d4a4b931a5ce59954782 Mon Sep 17 00:00:00 2001 From: remilia Date: Mon, 24 Mar 2025 11:50:13 +0800 Subject: [PATCH] merge mux req rep --- src/nodes/Beamformer.py | 51 +++++++------- src/nodes/Device.py | 143 +++++++++++++++++++++------------------- src/nodes/Muxer.py | 72 ++++++++++++++------ src/utils/RfMeta.py | 6 ++ test/testDevice.py | 6 +- 5 files changed, 166 insertions(+), 112 deletions(-) diff --git a/src/nodes/Beamformer.py b/src/nodes/Beamformer.py index 08f29fd..5f51187 100644 --- a/src/nodes/Beamformer.py +++ b/src/nodes/Beamformer.py @@ -3,10 +3,12 @@ import time import cupy as cp import cv2 +import zmq from config import VIDEO_WIDTH, VIDEO_HEIGHT from nodes.Node import Node -from utils.Msg import BMMsg, ImageArgMsg, SetSeqMetaMsg, Msg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg +from utils.Msg import BMMsg, ImageArgMsg, SetSeqMetaMsg, Msg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg, \ + SeqMetaMsg from utils.RfFile import RfSequenceMeta from utils.RfMat import RfMat @@ -18,6 +20,12 @@ class Beamformer(Node): def __init__(self, level=logging.INFO): super(Beamformer, self).__init__(level=level) + self.req_muxer_socket: zmq.Socket = None + + def custom_setup(self): + self.req_muxer_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ) + self.req_muxer_socket.connect('tcp://localhost:5560') + self.c.poller.register(self.req_muxer_socket, zmq.POLLIN) def process(self, data: RfMat, arg: ImageArgMsg): if data is None: @@ -36,26 +44,25 @@ class Beamformer(Node): def loop(self): time.sleep(1) while True: - self.send(RequestRfFrameMsg()) - r = dict(self.c.poller.poll(10)) + self.req_muxer_socket.send(b'') + r = dict(self.c.poller.poll()) if self.c.sub in r: msg = self.recv() - if isinstance(msg, BeamformerMsg): - if msg.value == b'init': - time.sleep(1) - continue - if msg.value == b'nop': - continue - r = msg.value - id2 = r.index(Msg.magic(), 1) - id3 = r.index(Msg.magic(), id2 + 1) - seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[:id2]) - arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3]) - b_msg: RfFrameWithMetaMsg = Msg.decode_msg(r[id3:]) - s = b_msg.data - fb1 = cp.frombuffer(s, dtype=cp.int16) - seq_meta = RfSequenceMeta.from_name(seq_msg.name) - mat = RfMat(fb1.reshape(seq_meta.shape), b_msg.meta, seq_meta) - self.process(mat, arg_msg) - else: - logger.debug('package lose') + if self.req_muxer_socket in r: + msg:BeamformerMsg = Msg.decode_msg(self.req_muxer_socket.recv()) + if msg.value == b'init': + time.sleep(1) + continue + if msg.value == b'nop': + continue + r = msg.value + id2 = r.index(Msg.magic(), 1) + id3 = r.index(Msg.magic(), id2 + 1) + seq_msg: SeqMetaMsg = Msg.decode_msg(r[:id2]) + arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3]) + b_msg: RfFrameWithMetaMsg = Msg.decode_msg(r[id3:]) + s = b_msg.data + fb1 = cp.frombuffer(s, dtype=cp.int16) + seq_meta = RfSequenceMeta.from_name(seq_msg.name) + mat = RfMat(fb1.reshape(seq_meta.shape), b_msg.meta, seq_meta) + self.process(mat, arg_msg) diff --git a/src/nodes/Device.py b/src/nodes/Device.py index e0ee51b..f858020 100644 --- a/src/nodes/Device.py +++ b/src/nodes/Device.py @@ -33,6 +33,7 @@ class DeviceCmd(Enum): SetEnableOff = auto() SetZero = auto() + class Device(Node): magic = 7355608 topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg, RequestRfFrameMsg, DeviceZero, @@ -43,7 +44,8 @@ class Device(Node): super(Device, self).__init__(level=level) self.arg = ImageArgMsg('', t_start=0, t_end=1499) self.seq_meta = None - self.device_rep_socket: zmq.Socket = None + self.req_driver_socket: zmq.Socket = None + self.rep_socket: zmq.Socket = None self.ok = b'ok\x00' self.loop2_t = None self.switch = False @@ -54,17 +56,17 @@ class Device(Node): time.sleep(1) def device_cmd(self, cmd: DeviceCmd, v: bytes = b''): - self.device_rep_socket.send(struct.pack('i', self.magic) + struct.pack('i', cmd.value) + v) + self.req_driver_socket.send(struct.pack('i', self.magic) + struct.pack('i', cmd.value) + v) def connect(self): - temp_client = BusClient(SetDeviceSwitchMsg,RefreshDeviceMsg, poller=True) - temp_client.poller.register(self.device_rep_socket, zmq.POLLIN) + temp_client = BusClient(SetDeviceSwitchMsg, RefreshDeviceMsg, poller=True) + temp_client.poller.register(self.req_driver_socket, zmq.POLLIN) self.device_cmd(DeviceCmd.SetConnectionOn) logger.warning('onn') d = dict(temp_client.poller.poll()) - if self.device_rep_socket in d: - rb = self.device_rep_socket.recv() + if self.req_driver_socket in d: + rb = self.req_driver_socket.recv() if rb == self.ok: self.send(DeviceConnectedMsg(True)) else: @@ -75,23 +77,23 @@ class Device(Node): if not msg.value: self.switch = False logger.warning(f"interrupt connecting") - self.device_rep_socket.close() + self.req_driver_socket.close() self.context.term() self.context = zmq.Context() - self.device_rep_socket = self.context.socket(zmq.REQ) - self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") - if isinstance(msg,RefreshDeviceMsg): + self.req_driver_socket = self.context.socket(zmq.REQ) + self.req_driver_socket.connect(f"tcp://{LIVE_REP_SOCKET}") + if isinstance(msg, RefreshDeviceMsg): self.switch = False logger.warning(f"interrupt connecting") - self.device_rep_socket.close() + self.req_driver_socket.close() self.context.term() self.context = zmq.Context() - self.device_rep_socket = self.context.socket(zmq.REQ) - self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") + self.req_driver_socket = self.context.socket(zmq.REQ) + self.req_driver_socket.connect(f"tcp://{LIVE_REP_SOCKET}") def disconnect(self): self.device_cmd(DeviceCmd.SetConnectionOff) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() if rb == self.ok: self.send(DeviceConnectedMsg(False)) else: @@ -99,7 +101,7 @@ class Device(Node): def enable(self): self.device_cmd(DeviceCmd.SetEnableOn) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() if rb == self.ok: self.send(DeviceEnabledMsg(True)) return True @@ -109,7 +111,7 @@ class Device(Node): def disable(self): self.device_cmd(DeviceCmd.SetEnableOff) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() if rb == self.ok: self.send(DeviceEnabledMsg(False)) return True @@ -131,7 +133,7 @@ class Device(Node): def get_enable(self): self.device_cmd(DeviceCmd.GetEnable) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() match rb: case b'true': self.send(DeviceEnabledMsg(True)) @@ -144,13 +146,13 @@ class Device(Node): def get_seq_meta_name(self): self.device_cmd(DeviceCmd.GetName) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() if rb != b'': self.send(SeqMetaMsg('live', rb.decode())) def get_connection(self): self.device_cmd(DeviceCmd.GetConnection) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() match rb: case b'true': self.send(DeviceConnectedMsg(True)) @@ -175,7 +177,7 @@ class Device(Node): name_encoded = name.encode() self.device_cmd(DeviceCmd.SetNameAndFileOnly, struct.pack('I', name_encoded.__len__()) + name.encode() + txt.encode()) - rb = self.device_rep_socket.recv() + rb = self.req_driver_socket.recv() if rb == self.ok: return True else: @@ -184,15 +186,19 @@ class Device(Node): def get_data(self): self.device_cmd(DeviceCmd.GetData) - return self.device_rep_socket.recv() + return self.req_driver_socket.recv() def set_zero(self): self.device_cmd(DeviceCmd.SetZero) - return self.device_rep_socket.recv() + return self.req_driver_socket.recv() def custom_setup(self): - self.device_rep_socket = self.context.socket(zmq.REQ) - self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") + self.rep_socket = self.context.socket(zmq.REP) + self.rep_socket.bind(f"tcp://localhost:5561") + self.c.poller.register(self.rep_socket, zmq.POLLIN) + + self.req_driver_socket = self.context.socket(zmq.REQ) + self.req_driver_socket.connect(f"tcp://{LIVE_REP_SOCKET}") self.loop2_t = Thread(target=self.loop2) self.loop2_t.start() @@ -210,45 +216,50 @@ class Device(Node): # self.setfile(arr[0][1]) logger.debug(f'device start loop') while True: - msg = self.recv() - if isinstance(msg, KillMsg): - if msg.name == '': - return - elif isinstance(msg, SetDeviceEnabledMsg): - if msg.value: - self.enable() - else: - self.disable() - elif isinstance(msg, SetDeviceConnectedMsg): - logger.info('connecting to device') - if msg.value: - self.connect() - else: - self.disconnect() - elif isinstance(msg, SetDeviceConfigMsg): - self.set_name_and_file(msg.name, msg.txt) - self.send(SeqMetaMsg('live', msg.name)) - elif isinstance(msg, RequestRfFrameMsg): - if self.switch: - braw = self.get_data() - if braw == b'': - logger.warning('empty msg!') - continue - _, sequence_id, encoder = struct.unpack_from('=iqi', braw) - buffer = braw[4 + 8 + 4:] - # logger.debug('send') - self.send(RfFrameWithMetaMsg(0, RfFrameMeta( - encoder=encoder, sequence_id=sequence_id - ), buffer)) - else: - logger.warning('device not online') - elif isinstance(msg, DeviceZero): - self.set_zero() - elif isinstance(msg, DeviceSwitchMsg): - self.switch = msg.value == 'GREEN' - # elif isinstance(msg, SetDeviceSwitchMsg): - # logger.info(f'{msg.value}') - # if msg.value: - # self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") - # else: - # self.device_rep_socket.close() + d = dict(self.c.poller.poll()) + if self.c.sub in d: + msg = self.recv() + if isinstance(msg, KillMsg): + if msg.name == '': + return + elif isinstance(msg, SetDeviceEnabledMsg): + if msg.value: + self.enable() + else: + self.disable() + elif isinstance(msg, SetDeviceConnectedMsg): + logger.info('connecting to device') + if msg.value: + self.connect() + else: + self.disconnect() + elif isinstance(msg, SetDeviceConfigMsg): + self.set_name_and_file(msg.name, msg.txt) + self.send(SeqMetaMsg('live', msg.name)) + elif isinstance(msg, RequestRfFrameMsg): + if self.switch: + braw = self.get_data() + if braw == b'': + logger.warning('empty msg!') + continue + _, sequence_id, encoder = struct.unpack_from('=iqi', braw) + buffer = braw[4 + 8 + 4:] + # logger.debug('send') + self.send(RfFrameWithMetaMsg(0, RfFrameMeta( + encoder=encoder, sequence_id=sequence_id + ), buffer)) + else: + logger.warning('device not online') + elif isinstance(msg, DeviceZero): + self.set_zero() + elif isinstance(msg, DeviceSwitchMsg): + self.switch = msg.value == 'GREEN' + # elif isinstance(msg, SetDeviceSwitchMsg): + # logger.info(f'{msg.value}') + # if msg.value: + # self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") + # else: + # self.device_rep_socket.close() + if self.rep_socket in d: + self.rep_socket.recv() + self.rep_socket.send(self.get_data()) diff --git a/src/nodes/Muxer.py b/src/nodes/Muxer.py index 2d087e6..5530c36 100644 --- a/src/nodes/Muxer.py +++ b/src/nodes/Muxer.py @@ -1,21 +1,21 @@ import logging +import struct import time -from pathlib import Path -import numpy as np import zmq from config import IMAGING_CONFIG from nodes.Node import Node from utils.Msg import ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, \ - ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg, RobotRtsiMsg, SeqMetaMsg + ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RobotRtsiMsg, SeqMetaMsg from utils.RfFile import RfSequenceMeta +from utils.RfMeta import RfFrameMeta logger = logging.getLogger(__name__) class Muxer(Node): - topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, RfFrameWithMetaMsg, RequestRfFrameMsg, + topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, RfFrameWithMetaMsg, ImageArgMsg, RobotRtsiMsg, SeqMetaMsg] def __init__(self, level=logging.INFO): @@ -24,6 +24,16 @@ class Muxer(Node): self.seq_meta_live = None self.seq_meta_playback = None self.play_mode = None + self.rep_socket: zmq.Socket = None + self.req_driver_socket: zmq.Socket = None + self.record_rf_msg: RfFrameWithMetaMsg | None = None + + def custom_setup(self): + self.rep_socket: zmq.Socket = self.c.ctx.socket(zmq.REP) + self.rep_socket.bind('tcp://localhost:5560') + self.req_driver_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ) + self.req_driver_socket.connect('tcp://localhost:5561') + self.c.poller.register(self.rep_socket, zmq.POLLIN) @property def current_meta(self): @@ -33,13 +43,46 @@ class Muxer(Node): }.get(self.play_mode, None) return seq_meta + def process_request(self): + self.rep_socket.recv() + if self.play_mode is None: + self.rep_socket.send(BeamformerMsg(b'init').encode_msg()) + return + match self.play_mode: + case 'playback': + if self.record_rf_msg is None: + self.rep_socket.send(BeamformerMsg(b'nop').encode_msg()) + return + data_msg = self.record_rf_msg + case 'live': + self.req_driver_socket.send(b'') + driver_data_raw = self.req_driver_socket.recv() + if driver_data_raw == b'': + # todo fixit driver no empty + self.rep_socket.send(BeamformerMsg(b'nop').encode_msg()) + return + _, sequence_id, encoder = struct.unpack_from('=iqi', driver_data_raw) + driver_data_body = driver_data_raw[4 + 8 + 4:] + data_msg = RfFrameWithMetaMsg(0, RfFrameMeta( + encoder=encoder, sequence_id=sequence_id + ), driver_data_body) + case _: + raise NotImplementedError() + if (data_msg.data.__len__() // 2) != self.current_meta.prod(): + self.rep_socket.send(BeamformerMsg(b'nop').encode_msg()) + self.rep_socket.send(BeamformerMsg( + SeqMetaMsg('any', self.current_meta.name).encode_msg() + + self.arg.encode_msg() + + data_msg.encode_msg() + ).encode_msg()) + def loop(self): self.rtsi = RobotRtsiMsg( pos=(0, 0, 0, 0, 0, 0), force=(0, 0, 0, 0, 0, 0), ) device_socket = self.context.socket(zmq.PULL) - self.sf = None + self.arg = ImageArgMsg('', t_start=0, t_end=1499) self.c.poller.register(device_socket, zmq.POLLIN) time.sleep(1) @@ -49,6 +92,8 @@ class Muxer(Node): for k in socks: if k == device_socket: pass + if k == self.rep_socket: + self.process_request() if k == self.c.sub: msg = self.recv() if isinstance(msg, RfFrameWithMetaMsg): @@ -71,21 +116,8 @@ class Muxer(Node): msg.encode_msg() )) elif msg.sender == 1: - self.sf = msg - elif isinstance(msg, RequestRfFrameMsg): - if self.play_mode is None: - self.send(BeamformerMsg(b'init')) - elif self.play_mode == 'playback': - if self.sf is None: - self.send(BeamformerMsg(b'nop')) - elif (self.sf.data.__len__() // 2) != np.prod(self.seq_meta_playback.shape): - self.send(BeamformerMsg(b'nop')) - else: - self.send(BeamformerMsg( - SetSeqMetaMsg('any', self.seq_meta_playback.name).encode_msg() + - self.arg.encode_msg() + - self.sf.encode_msg() - )) + self.record_rf_msg = msg + elif isinstance(msg, KillMsg): if msg.name == '': return diff --git a/src/utils/RfMeta.py b/src/utils/RfMeta.py index be6d9dc..5da82f8 100644 --- a/src/utils/RfMeta.py +++ b/src/utils/RfMeta.py @@ -114,6 +114,12 @@ class RfSequenceMeta(RfMeta): mode: Annotated[RfSequenceMode, 'M'] = RfSequenceMode.PWI us: Annotated[int, 'U'] = None + def prod(self): + res = 1 + for i in self.shape: + res *= int(i) + return res + if __name__ == '__main__': r = RfSequenceMeta.from_name('asdasd1,S=(64 1501),M=PWI,U=30') diff --git a/test/testDevice.py b/test/testDevice.py index 70ef186..b6cc20a 100644 --- a/test/testDevice.py +++ b/test/testDevice.py @@ -22,10 +22,8 @@ def test2(): # d.enable() # d.setfile(Path('/home/lambda/source/scarlet/flandre/config/device/AA256,U=30,M=PWI,S=(256 1502).txt').read_text()) # time.sleep(2) - while True: - r = d.get_data() - print(r.__len__()) - break + r = d.get_data() + print(r.__len__()) # d.disable() # d.disconnect()