refactor: change beamformer req/rep to sub and thread loop

This commit is contained in:
flandre 2025-06-12 14:05:19 +08:00
parent 2111cb3f52
commit 65c2254676
3 changed files with 137 additions and 112 deletions

View File

@ -1,4 +1,5 @@
import logging import logging
import threading
import time import time
import traceback import traceback
@ -14,6 +15,7 @@ from flandre.utils.Config import DeviceConfig
from flandre.utils.Msg import ( from flandre.utils.Msg import (
BeamformerMsg, BeamformerMsg,
ImageArgMsg, ImageArgMsg,
KillMsg,
MaxMsg, MaxMsg,
Msg, Msg,
RfFrameMsg, RfFrameMsg,
@ -31,13 +33,12 @@ class Beamformer(Node):
def __init__(self, level=logging.INFO): def __init__(self, level=logging.INFO):
super(Beamformer, self).__init__(level=level) super(Beamformer, self).__init__(level=level)
self.muxer_req_socket: zmq.Socket = None
self.tfm = None self.tfm = None
self.isalive = True
self.b_msg: BeamformerMsg | None = None
def custom_setup(self): def custom_setup(self):
self.muxer_req_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ) pass
self.muxer_req_socket.connect(C.muxer_rep_socket)
self.c.poller.register(self.muxer_req_socket, zmq.POLLIN)
def process_pwi(self, data: RfMat, arg: ImageArgMsg, pwi): def process_pwi(self, data: RfMat, arg: ImageArgMsg, pwi):
if data is None: if data is None:
@ -90,26 +91,21 @@ class Beamformer(Node):
) )
self.send(RfMatMsg(d2)) self.send(RfMatMsg(d2))
def loop(self): def pt(self):
dc = DeviceConfig() dc = DeviceConfig()
pwi, _, mm = gen_pwi(direct_dist(dc), dc) pwi, _, mm = gen_pwi(direct_dist(dc), dc)
self.send(MaxMsg(mm.item())) self.send(MaxMsg(mm.item()))
last_v2 = 5900 last_v2 = 5900
last_f_rows = 0 last_f_rows = 0
last_blake2b = None last_blake2b = None
while True:
self.muxer_req_socket.send(b"") last_b_msg = None
r = dict(self.c.poller.poll()) while self.isalive:
if self.c.sub in r: if not self.b_msg:
msg = self.recv()
if self.muxer_req_socket in r:
msg: BeamformerMsg = Msg.decode_msg(self.muxer_req_socket.recv())
if msg.value == b"init":
time.sleep(1)
continue continue
if msg.value == b"nop": if last_b_msg == self.b_msg:
continue continue
r = msg.value r = self.b_msg.value
id2 = r.index(Msg.magic(), 1) id2 = r.index(Msg.magic(), 1)
arg_msg: ImageArgMsg = Msg.decode_msg(r[:id2]) arg_msg: ImageArgMsg = Msg.decode_msg(r[:id2])
rf_frame_msg: RfFrameMsg = Msg.decode_msg(r[id2:]) rf_frame_msg: RfFrameMsg = Msg.decode_msg(r[id2:])
@ -150,3 +146,20 @@ class Beamformer(Node):
except Exception as e: except Exception as e:
logger.warning(e) logger.warning(e)
traceback.print_exc() traceback.print_exc()
last_b_msg = self.b_msg
def loop(self):
t = threading.Thread(target=self.pt)
t.start()
while True:
r = dict(self.c.poller.poll())
if self.c.sub in r:
msg = self.recv()
match msg:
case KillMsg():
self.isalive = False
return
case BeamformerMsg():
self.b_msg = msg
case _:
pass

View File

@ -39,78 +39,54 @@ class Muxer(Node):
DeviceEnabledMsg, DeviceEnabledMsg,
] ]
def __init__(self, level=logging.INFO): def __init__(self, level: int = logging.INFO):
super(Muxer, self).__init__(level=level) super(Muxer, self).__init__(level=level)
self.seq_meta = None self.seq_meta = None
self.seq_meta_live: RfSequenceMeta = None self.seq_meta_live: RfSequenceMeta | None = None
self.seq_meta_playback = None self.seq_meta_playback = None
self.play_mode: str | None = None self.play_mode: str | None = None
self.rep_socket: zmq.Socket = None self.rep_socket: zmq.Socket[bytes] | None = None
self.req_driver_socket: zmq.Socket = None self.req_driver_socket: zmq.Socket[bytes] | None = None
self.driver_pull_socket: zmq.Socket = None self.driver_pull_socket: zmq.Socket[bytes] | None = None
self.playback_rf_msg: RfFrameMsg | None = None self.playback_rf_msg: RfFrameMsg | None = None
self.device_enabled = False self.device_enabled = False
self.driver_data_raw = b"" self.driver_data_raw = b""
self.run_p_thread = True self.islive = True
self.data_msg: RfFrameMsg | None = None
def custom_setup(self): def custom_setup(self):
self.rep_socket: zmq.Socket = self.c.ctx.socket(zmq.REP) self.rep_socket = self.c.ctx.socket(zmq.REP)
self.rep_socket.bind(f"tcp://localhost:{C.muxer_rep_port}") self.rep_socket.bind(f"tcp://localhost:{C.muxer_rep_port}")
self.req_driver_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ) self.req_driver_socket = self.c.ctx.socket(zmq.REQ)
# self.driver_pull_socket = self.c.ctx.socket(zmq.PULL) # self.driver_pull_socket = self.c.ctx.socket(zmq.PULL)
# self.driver_pull_socket.connect(C.live_push_socket) # self.driver_pull_socket.connect(C.live_push_socket)
# self.req_driver_socket.connect(C.driver_rep_socket) # self.req_driver_socket.connect(C.driver_rep_socket)
self.req_driver_socket.connect(C.live_rep_socket) self.req_driver_socket.connect(C.live_rep_socket)
self.c.poller.register(self.rep_socket, zmq.POLLIN) self.c.poller.register(self.rep_socket, zmq.POLLIN)
def p_thread(self): def device_req_thread(self):
while self.run_p_thread: while self.islive:
if self.play_mode == "live": if (
# ii = self.driver_pull_socket.poll(timeout=1000) self.play_mode == "live"
# if ii > 0: and self.req_driver_socket
# self.driver_data_raw = self.driver_pull_socket.recv() and self.rep_socket
and self.seq_meta_live
):
self.req_driver_socket.send( self.req_driver_socket.send(
struct.pack("i", Device.magic) struct.pack("i", Device.magic)
+ struct.pack("i", DeviceCmd.GetData.value) + struct.pack("i", DeviceCmd.GetData.value)
) )
self.driver_data_raw = self.req_driver_socket.recv() driver_data_raw = self.req_driver_socket.recv()
else:
time.sleep(1)
def handle_rep_socket(self):
self.rep_socket.recv()
if self.play_mode is None:
self.rep_socket.send(BeamformerMsg(b"init").encode_msg())
return
match self.play_mode:
case "playback":
# logger.warning(f'test, {self.playback_rf_msg}')
if self.playback_rf_msg is None:
self.rep_socket.send(BeamformerMsg(b"nop").encode_msg())
return
data_msg = self.playback_rf_msg
case "live":
if not self.device_enabled:
self.rep_socket.send(BeamformerMsg(b"init").encode_msg())
logger.warning("Device not enabled")
return
# self.req_driver_socket.send(b'')
# self.driver_data_raw = self.req_driver_socket.recv()
if self.driver_data_raw == b"":
# todo fixit driver no empty
self.rep_socket.send(BeamformerMsg(b"nop").encode_msg())
return
# _, sequence_id, encoder = struct.unpack_from('=IQi', self.driver_data_raw)
# ts, sequence_id, encoder, driver_data_body = b2t(self.driver_data_raw)
( (
sequence_id, sequence_id,
encoder, encoder,
host_ts, _,
device_ts_low, _,
device_ts_high,
driver_data_body, driver_data_body,
) = b2t(self.driver_data_raw) ) = b2t(driver_data_raw)
data_msg = RfFrameMsg(
self.data_msg = RfFrameMsg(
0, 0,
RfFrameMemory( RfFrameMemory(
RfFrameMeta( RfFrameMeta(
@ -133,17 +109,49 @@ class Muxer(Node):
driver_data_body, driver_data_body,
), ),
) )
self.send_beamformer_msg()
else:
time.sleep(1)
def send_beamformer_msg(self):
if not self.data_msg:
return
self.send(BeamformerMsg(self.arg.encode_msg() + self.data_msg.encode_msg()))
def handle_rep_socket(self):
if not self.rep_socket:
return
self.rep_socket.recv()
if self.play_mode is None:
self.rep_socket.send(BeamformerMsg(b"init").encode_msg())
return
match self.play_mode:
case "playback":
# logger.warning(f'test, {self.playback_rf_msg}')
if self.playback_rf_msg is None:
self.rep_socket.send(BeamformerMsg(b"nop").encode_msg())
return
case "live":
if not self.device_enabled:
self.rep_socket.send(BeamformerMsg(b"init").encode_msg())
logger.warning("Device not enabled")
return
# self.req_driver_socket.send(b'')
# self.driver_data_raw = self.req_driver_socket.recv()
if self.driver_data_raw == b"":
# todo fixit driver no empty
self.rep_socket.send(BeamformerMsg(b"nop").encode_msg())
return
# _, sequence_id, encoder = struct.unpack_from('=IQi', self.driver_data_raw)
# ts, sequence_id, encoder, driver_data_body = b2t(self.driver_data_raw)
case _: case _:
raise NotImplementedError() raise NotImplementedError()
# if (data_msg.data.__len__() // 2) != data_msg.rf_frame.prod():
# self.rep_socket.send(BeamformerMsg(b'nop').encode_msg())
# return
self.rep_socket.send(
BeamformerMsg(self.arg.encode_msg() + data_msg.encode_msg()).encode_msg()
)
def loop(self): def loop(self):
t = Thread(target=self.p_thread).start() assert self.rep_socket
t = Thread(target=self.device_req_thread).start()
self.rtsi = RobotRtsiMsg( self.rtsi = RobotRtsiMsg(
pos=(0, 0, 0, 0, 0, 0), pos=(0, 0, 0, 0, 0, 0),
force=(0, 0, 0, 0, 0, 0), force=(0, 0, 0, 0, 0, 0),
@ -167,18 +175,22 @@ class Muxer(Node):
if k == self.c.sub: if k == self.c.sub:
msg = self.recv() msg = self.recv()
if isinstance(msg, KillMsg): if isinstance(msg, KillMsg):
self.run_p_thread = False self.islive = False
if msg.name == "": if msg.name == "":
return return
elif isinstance(msg, RfFrameMsg): elif isinstance(msg, RfFrameMsg):
if msg.sender == 1: if msg.sender == 1:
self.playback_rf_msg = msg self.data_msg = msg
self.send_beamformer_msg()
elif isinstance(msg, ImageArgMsg): elif isinstance(msg, ImageArgMsg):
self.arg = msg self.arg = msg
self.send_beamformer_msg()
elif isinstance(msg, SeqMetaMsg): elif isinstance(msg, SeqMetaMsg):
match msg.target: match msg.target:
case "live": case "live":
self.seq_meta_live = RfSequenceMeta.from_name(msg.name) self.seq_meta_live = RfSequenceMeta.from_name(msg.name)
case _:
pass
elif isinstance(msg, SetPlayMode): elif isinstance(msg, SetPlayMode):
logger.info(f"set playmode {msg}") logger.info(f"set playmode {msg}")
self.play_mode = msg.value self.play_mode = msg.value

View File

@ -47,14 +47,14 @@ class RfFrameMemory(RfFrame):
return self.data return self.data
def b2t(b: bytes): def b2t(b: bytes) -> tuple[int, int, int, int, bytes]:
# _, seq, encoder, host_ts, driver_ts = struct.unpack_from('<iQiQQ', b) # _, seq, encoder, host_ts, driver_ts = struct.unpack_from('<iQiQQ', b)
# _, seq, encoder, host_ts, driver_ts = struct.unpack_from('=QQiQi', b) # _, seq, encoder, host_ts, driver_ts = struct.unpack_from('=QQiQi', b)
# driver_ts, host_ts, encoder, seq = struct.unpack_from('QQiQ', b) # driver_ts, host_ts, encoder, seq = struct.unpack_from('QQiQ', b)
# buffer = b[4 + 8 + 8 + 8 + 4:] # buffer = b[4 + 8 + 8 + 8 + 4:]
# return seq, encoder, host_ts, driver_ts, buffer # return seq, encoder, host_ts, driver_ts, buffer
magic, seq, encoder, host_ts, device_ts = struct.unpack_from("iQiQQ", b) _, seq, encoder, host_ts, device_ts = struct.unpack_from("iQiQQ", b)
# bb = b[4 + 8 + 4 + 8:4 + 8 + 4 + 8 + 4] # bb = b[4 + 8 + 4 + 8:4 + 8 + 4 + 8 + 4]
# device_ts_low = struct.unpack('<I', bb) # device_ts_low = struct.unpack('<I', bb)
# device_ts_low = b[4 + 8 + 4 + 8:4 + 8 + 4 + 8 + 4] # device_ts_low = b[4 + 8 + 4 + 8:4 + 8 + 4 + 8 + 4]