From 65c2254676e6770dcc6125c29b68d741fedd14e1 Mon Sep 17 00:00:00 2001 From: flandre Date: Thu, 12 Jun 2025 14:05:19 +0800 Subject: [PATCH] refactor: change beamformer req/rep to sub and thread loop --- flandre/nodes/Beamformer.py | 121 +++++++++++++++++++---------------- flandre/nodes/Muxer.py | 124 ++++++++++++++++++++---------------- flandre/utils/RfFrame.py | 4 +- 3 files changed, 137 insertions(+), 112 deletions(-) diff --git a/flandre/nodes/Beamformer.py b/flandre/nodes/Beamformer.py index c3535e9..eaaa49f 100644 --- a/flandre/nodes/Beamformer.py +++ b/flandre/nodes/Beamformer.py @@ -1,4 +1,5 @@ import logging +import threading import time import traceback @@ -14,6 +15,7 @@ from flandre.utils.Config import DeviceConfig from flandre.utils.Msg import ( BeamformerMsg, ImageArgMsg, + KillMsg, MaxMsg, Msg, RfFrameMsg, @@ -31,13 +33,12 @@ class Beamformer(Node): def __init__(self, level=logging.INFO): super(Beamformer, self).__init__(level=level) - self.muxer_req_socket: zmq.Socket = None self.tfm = None + self.isalive = True + self.b_msg: BeamformerMsg | None = None def custom_setup(self): - self.muxer_req_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ) - self.muxer_req_socket.connect(C.muxer_rep_socket) - self.c.poller.register(self.muxer_req_socket, zmq.POLLIN) + pass def process_pwi(self, data: RfMat, arg: ImageArgMsg, pwi): if data is None: @@ -90,63 +91,75 @@ class Beamformer(Node): ) self.send(RfMatMsg(d2)) - def loop(self): + def pt(self): dc = DeviceConfig() pwi, _, mm = gen_pwi(direct_dist(dc), dc) self.send(MaxMsg(mm.item())) last_v2 = 5900 last_f_rows = 0 last_blake2b = None + + last_b_msg = None + while self.isalive: + if not self.b_msg: + continue + if last_b_msg == self.b_msg: + continue + r = self.b_msg.value + id2 = r.index(Msg.magic(), 1) + arg_msg: ImageArgMsg = Msg.decode_msg(r[:id2]) + rf_frame_msg: RfFrameMsg = Msg.decode_msg(r[id2:]) + current_frame = rf_frame_msg.rf_frame + + if isinstance(current_frame, RfFrameFile): + if ( + current_frame.meta.blake2b is not None + and current_frame.meta.blake2b == last_blake2b + ): + continue + mat = RfMat.from_rf_frame(rf_frame_msg.rf_frame, "gpu") + # logger.info(mat.frame_meta.blake2b) + if mat is None: + logger.warning( + f"{rf_frame_msg.rf_frame.seq_meta.prod()} , {rf_frame_msg.rf_frame.__bytes__().__len__() // 2}" + ) + continue + + last_blake2b = mat.frame_meta.blake2b + if mat is None: + continue + if arg_msg.v2 != last_v2 or arg_msg.f_rows != last_f_rows: + last_v2 = arg_msg.v2 + last_f_rows = arg_msg.f_rows + dc = DeviceConfig(v2=arg_msg.v2, rows=arg_msg.f_rows) + pwi, _, m = gen_pwi(direct_dist(dc), dc) + self.send(MaxMsg(m.item())) + try: + if mat.seq_meta.mode == RfSequenceMeta.RfSequenceMode.PWI: + self.process_pwi(mat, arg_msg, pwi) + if mat.seq_meta.mode == RfSequenceMeta.RfSequenceMode.TFM: + if self.tfm is None: + dc = DeviceConfig(v2=arg_msg.v2, rows=arg_msg.f_rows) + self.tfm = TFM(dc) + self.tfm.load_yids(dist_mat_to_yids(direct_dist())) + self.process_tfm(mat, arg_msg, self.tfm) + except Exception as e: + logger.warning(e) + traceback.print_exc() + last_b_msg = self.b_msg + + def loop(self): + t = threading.Thread(target=self.pt) + t.start() while True: - self.muxer_req_socket.send(b"") r = dict(self.c.poller.poll()) if self.c.sub in r: msg = self.recv() - if self.muxer_req_socket in r: - msg: BeamformerMsg = Msg.decode_msg(self.muxer_req_socket.recv()) - if msg.value == b"init": - time.sleep(1) - continue - if msg.value == b"nop": - continue - r = msg.value - id2 = r.index(Msg.magic(), 1) - arg_msg: ImageArgMsg = Msg.decode_msg(r[:id2]) - rf_frame_msg: RfFrameMsg = Msg.decode_msg(r[id2:]) - current_frame = rf_frame_msg.rf_frame - - if isinstance(current_frame, RfFrameFile): - if ( - current_frame.meta.blake2b is not None - and current_frame.meta.blake2b == last_blake2b - ): - continue - mat = RfMat.from_rf_frame(rf_frame_msg.rf_frame, "gpu") - # logger.info(mat.frame_meta.blake2b) - if mat is None: - logger.warning( - f"{rf_frame_msg.rf_frame.seq_meta.prod()} , {rf_frame_msg.rf_frame.__bytes__().__len__() // 2}" - ) - continue - - last_blake2b = mat.frame_meta.blake2b - if mat is None: - continue - if arg_msg.v2 != last_v2 or arg_msg.f_rows != last_f_rows: - last_v2 = arg_msg.v2 - last_f_rows = arg_msg.f_rows - dc = DeviceConfig(v2=arg_msg.v2, rows=arg_msg.f_rows) - pwi, _, m = gen_pwi(direct_dist(dc), dc) - self.send(MaxMsg(m.item())) - try: - if mat.seq_meta.mode == RfSequenceMeta.RfSequenceMode.PWI: - self.process_pwi(mat, arg_msg, pwi) - if mat.seq_meta.mode == RfSequenceMeta.RfSequenceMode.TFM: - if self.tfm is None: - dc = DeviceConfig(v2=arg_msg.v2, rows=arg_msg.f_rows) - self.tfm = TFM(dc) - self.tfm.load_yids(dist_mat_to_yids(direct_dist())) - self.process_tfm(mat, arg_msg, self.tfm) - except Exception as e: - logger.warning(e) - traceback.print_exc() + match msg: + case KillMsg(): + self.isalive = False + return + case BeamformerMsg(): + self.b_msg = msg + case _: + pass diff --git a/flandre/nodes/Muxer.py b/flandre/nodes/Muxer.py index d3891d7..baea75a 100644 --- a/flandre/nodes/Muxer.py +++ b/flandre/nodes/Muxer.py @@ -39,78 +39,54 @@ class Muxer(Node): DeviceEnabledMsg, ] - def __init__(self, level=logging.INFO): + def __init__(self, level: int = logging.INFO): super(Muxer, self).__init__(level=level) self.seq_meta = None - self.seq_meta_live: RfSequenceMeta = None + self.seq_meta_live: RfSequenceMeta | None = None self.seq_meta_playback = None self.play_mode: str | None = None - self.rep_socket: zmq.Socket = None - self.req_driver_socket: zmq.Socket = None - self.driver_pull_socket: zmq.Socket = None + self.rep_socket: zmq.Socket[bytes] | None = None + self.req_driver_socket: zmq.Socket[bytes] | None = None + self.driver_pull_socket: zmq.Socket[bytes] | None = None self.playback_rf_msg: RfFrameMsg | None = None self.device_enabled = False self.driver_data_raw = b"" - self.run_p_thread = True + self.islive = True + self.data_msg: RfFrameMsg | None = None def custom_setup(self): - self.rep_socket: zmq.Socket = self.c.ctx.socket(zmq.REP) + self.rep_socket = self.c.ctx.socket(zmq.REP) self.rep_socket.bind(f"tcp://localhost:{C.muxer_rep_port}") - self.req_driver_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ) + self.req_driver_socket = self.c.ctx.socket(zmq.REQ) # self.driver_pull_socket = self.c.ctx.socket(zmq.PULL) # self.driver_pull_socket.connect(C.live_push_socket) # self.req_driver_socket.connect(C.driver_rep_socket) self.req_driver_socket.connect(C.live_rep_socket) self.c.poller.register(self.rep_socket, zmq.POLLIN) - def p_thread(self): - while self.run_p_thread: - if self.play_mode == "live": - # ii = self.driver_pull_socket.poll(timeout=1000) - # if ii > 0: - # self.driver_data_raw = self.driver_pull_socket.recv() + def device_req_thread(self): + while self.islive: + if ( + self.play_mode == "live" + and self.req_driver_socket + and self.rep_socket + and self.seq_meta_live + ): self.req_driver_socket.send( struct.pack("i", Device.magic) + struct.pack("i", DeviceCmd.GetData.value) ) - self.driver_data_raw = self.req_driver_socket.recv() - else: - time.sleep(1) + driver_data_raw = self.req_driver_socket.recv() - def handle_rep_socket(self): - self.rep_socket.recv() - if self.play_mode is None: - self.rep_socket.send(BeamformerMsg(b"init").encode_msg()) - return - match self.play_mode: - case "playback": - # logger.warning(f'test, {self.playback_rf_msg}') - if self.playback_rf_msg is None: - self.rep_socket.send(BeamformerMsg(b"nop").encode_msg()) - return - data_msg = self.playback_rf_msg - case "live": - if not self.device_enabled: - self.rep_socket.send(BeamformerMsg(b"init").encode_msg()) - logger.warning("Device not enabled") - return - # self.req_driver_socket.send(b'') - # self.driver_data_raw = self.req_driver_socket.recv() - if self.driver_data_raw == b"": - # todo fixit driver no empty - self.rep_socket.send(BeamformerMsg(b"nop").encode_msg()) - return - # _, sequence_id, encoder = struct.unpack_from('=IQi', self.driver_data_raw) - # ts, sequence_id, encoder, driver_data_body = b2t(self.driver_data_raw) ( sequence_id, encoder, - host_ts, - device_ts_low, - device_ts_high, + _, + _, driver_data_body, - ) = b2t(self.driver_data_raw) - data_msg = RfFrameMsg( + ) = b2t(driver_data_raw) + + self.data_msg = RfFrameMsg( 0, RfFrameMemory( RfFrameMeta( @@ -133,17 +109,49 @@ class Muxer(Node): driver_data_body, ), ) + self.send_beamformer_msg() + else: + time.sleep(1) + + def send_beamformer_msg(self): + if not self.data_msg: + return + self.send(BeamformerMsg(self.arg.encode_msg() + self.data_msg.encode_msg())) + + def handle_rep_socket(self): + if not self.rep_socket: + return + self.rep_socket.recv() + if self.play_mode is None: + self.rep_socket.send(BeamformerMsg(b"init").encode_msg()) + return + match self.play_mode: + case "playback": + # logger.warning(f'test, {self.playback_rf_msg}') + if self.playback_rf_msg is None: + self.rep_socket.send(BeamformerMsg(b"nop").encode_msg()) + return + + case "live": + if not self.device_enabled: + self.rep_socket.send(BeamformerMsg(b"init").encode_msg()) + logger.warning("Device not enabled") + return + # self.req_driver_socket.send(b'') + # self.driver_data_raw = self.req_driver_socket.recv() + if self.driver_data_raw == b"": + # todo fixit driver no empty + self.rep_socket.send(BeamformerMsg(b"nop").encode_msg()) + return + # _, sequence_id, encoder = struct.unpack_from('=IQi', self.driver_data_raw) + # ts, sequence_id, encoder, driver_data_body = b2t(self.driver_data_raw) + case _: raise NotImplementedError() - # if (data_msg.data.__len__() // 2) != data_msg.rf_frame.prod(): - # self.rep_socket.send(BeamformerMsg(b'nop').encode_msg()) - # return - self.rep_socket.send( - BeamformerMsg(self.arg.encode_msg() + data_msg.encode_msg()).encode_msg() - ) def loop(self): - t = Thread(target=self.p_thread).start() + assert self.rep_socket + t = Thread(target=self.device_req_thread).start() self.rtsi = RobotRtsiMsg( pos=(0, 0, 0, 0, 0, 0), force=(0, 0, 0, 0, 0, 0), @@ -167,18 +175,22 @@ class Muxer(Node): if k == self.c.sub: msg = self.recv() if isinstance(msg, KillMsg): - self.run_p_thread = False + self.islive = False if msg.name == "": return elif isinstance(msg, RfFrameMsg): if msg.sender == 1: - self.playback_rf_msg = msg + self.data_msg = msg + self.send_beamformer_msg() elif isinstance(msg, ImageArgMsg): self.arg = msg + self.send_beamformer_msg() elif isinstance(msg, SeqMetaMsg): match msg.target: case "live": self.seq_meta_live = RfSequenceMeta.from_name(msg.name) + case _: + pass elif isinstance(msg, SetPlayMode): logger.info(f"set playmode {msg}") self.play_mode = msg.value diff --git a/flandre/utils/RfFrame.py b/flandre/utils/RfFrame.py index 59edec2..971580d 100644 --- a/flandre/utils/RfFrame.py +++ b/flandre/utils/RfFrame.py @@ -47,14 +47,14 @@ class RfFrameMemory(RfFrame): return self.data -def b2t(b: bytes): +def b2t(b: bytes) -> tuple[int, int, int, int, bytes]: # _, seq, encoder, host_ts, driver_ts = struct.unpack_from('