merge mux req rep

This commit is contained in:
remilia 2025-03-24 11:50:13 +08:00
parent 7b82f31cf5
commit 00f9e24199
5 changed files with 166 additions and 112 deletions

View File

@ -3,10 +3,12 @@ import time
import cupy as cp import cupy as cp
import cv2 import cv2
import zmq
from config import VIDEO_WIDTH, VIDEO_HEIGHT from config import VIDEO_WIDTH, VIDEO_HEIGHT
from nodes.Node import Node from nodes.Node import Node
from utils.Msg import BMMsg, ImageArgMsg, SetSeqMetaMsg, Msg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg from utils.Msg import BMMsg, ImageArgMsg, SetSeqMetaMsg, Msg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg, \
SeqMetaMsg
from utils.RfFile import RfSequenceMeta from utils.RfFile import RfSequenceMeta
from utils.RfMat import RfMat from utils.RfMat import RfMat
@ -18,6 +20,12 @@ class Beamformer(Node):
def __init__(self, level=logging.INFO): def __init__(self, level=logging.INFO):
super(Beamformer, self).__init__(level=level) super(Beamformer, self).__init__(level=level)
self.req_muxer_socket: zmq.Socket = None
def custom_setup(self):
self.req_muxer_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ)
self.req_muxer_socket.connect('tcp://localhost:5560')
self.c.poller.register(self.req_muxer_socket, zmq.POLLIN)
def process(self, data: RfMat, arg: ImageArgMsg): def process(self, data: RfMat, arg: ImageArgMsg):
if data is None: if data is None:
@ -36,26 +44,25 @@ class Beamformer(Node):
def loop(self): def loop(self):
time.sleep(1) time.sleep(1)
while True: while True:
self.send(RequestRfFrameMsg()) self.req_muxer_socket.send(b'')
r = dict(self.c.poller.poll(10)) r = dict(self.c.poller.poll())
if self.c.sub in r: if self.c.sub in r:
msg = self.recv() msg = self.recv()
if isinstance(msg, BeamformerMsg): if self.req_muxer_socket in r:
if msg.value == b'init': msg:BeamformerMsg = Msg.decode_msg(self.req_muxer_socket.recv())
time.sleep(1) if msg.value == b'init':
continue time.sleep(1)
if msg.value == b'nop': continue
continue if msg.value == b'nop':
r = msg.value continue
id2 = r.index(Msg.magic(), 1) r = msg.value
id3 = r.index(Msg.magic(), id2 + 1) id2 = r.index(Msg.magic(), 1)
seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[:id2]) id3 = r.index(Msg.magic(), id2 + 1)
arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3]) seq_msg: SeqMetaMsg = Msg.decode_msg(r[:id2])
b_msg: RfFrameWithMetaMsg = Msg.decode_msg(r[id3:]) arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3])
s = b_msg.data b_msg: RfFrameWithMetaMsg = Msg.decode_msg(r[id3:])
fb1 = cp.frombuffer(s, dtype=cp.int16) s = b_msg.data
seq_meta = RfSequenceMeta.from_name(seq_msg.name) fb1 = cp.frombuffer(s, dtype=cp.int16)
mat = RfMat(fb1.reshape(seq_meta.shape), b_msg.meta, seq_meta) seq_meta = RfSequenceMeta.from_name(seq_msg.name)
self.process(mat, arg_msg) mat = RfMat(fb1.reshape(seq_meta.shape), b_msg.meta, seq_meta)
else: self.process(mat, arg_msg)
logger.debug('package lose')

View File

@ -33,6 +33,7 @@ class DeviceCmd(Enum):
SetEnableOff = auto() SetEnableOff = auto()
SetZero = auto() SetZero = auto()
class Device(Node): class Device(Node):
magic = 7355608 magic = 7355608
topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg, RequestRfFrameMsg, DeviceZero, topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg, RequestRfFrameMsg, DeviceZero,
@ -43,7 +44,8 @@ class Device(Node):
super(Device, self).__init__(level=level) super(Device, self).__init__(level=level)
self.arg = ImageArgMsg('', t_start=0, t_end=1499) self.arg = ImageArgMsg('', t_start=0, t_end=1499)
self.seq_meta = None self.seq_meta = None
self.device_rep_socket: zmq.Socket = None self.req_driver_socket: zmq.Socket = None
self.rep_socket: zmq.Socket = None
self.ok = b'ok\x00' self.ok = b'ok\x00'
self.loop2_t = None self.loop2_t = None
self.switch = False self.switch = False
@ -54,17 +56,17 @@ class Device(Node):
time.sleep(1) time.sleep(1)
def device_cmd(self, cmd: DeviceCmd, v: bytes = b''): def device_cmd(self, cmd: DeviceCmd, v: bytes = b''):
self.device_rep_socket.send(struct.pack('i', self.magic) + struct.pack('i', cmd.value) + v) self.req_driver_socket.send(struct.pack('i', self.magic) + struct.pack('i', cmd.value) + v)
def connect(self): def connect(self):
temp_client = BusClient(SetDeviceSwitchMsg,RefreshDeviceMsg, poller=True) temp_client = BusClient(SetDeviceSwitchMsg, RefreshDeviceMsg, poller=True)
temp_client.poller.register(self.device_rep_socket, zmq.POLLIN) temp_client.poller.register(self.req_driver_socket, zmq.POLLIN)
self.device_cmd(DeviceCmd.SetConnectionOn) self.device_cmd(DeviceCmd.SetConnectionOn)
logger.warning('onn') logger.warning('onn')
d = dict(temp_client.poller.poll()) d = dict(temp_client.poller.poll())
if self.device_rep_socket in d: if self.req_driver_socket in d:
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
if rb == self.ok: if rb == self.ok:
self.send(DeviceConnectedMsg(True)) self.send(DeviceConnectedMsg(True))
else: else:
@ -75,23 +77,23 @@ class Device(Node):
if not msg.value: if not msg.value:
self.switch = False self.switch = False
logger.warning(f"interrupt connecting") logger.warning(f"interrupt connecting")
self.device_rep_socket.close() self.req_driver_socket.close()
self.context.term() self.context.term()
self.context = zmq.Context() self.context = zmq.Context()
self.device_rep_socket = self.context.socket(zmq.REQ) self.req_driver_socket = self.context.socket(zmq.REQ)
self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") self.req_driver_socket.connect(f"tcp://{LIVE_REP_SOCKET}")
if isinstance(msg,RefreshDeviceMsg): if isinstance(msg, RefreshDeviceMsg):
self.switch = False self.switch = False
logger.warning(f"interrupt connecting") logger.warning(f"interrupt connecting")
self.device_rep_socket.close() self.req_driver_socket.close()
self.context.term() self.context.term()
self.context = zmq.Context() self.context = zmq.Context()
self.device_rep_socket = self.context.socket(zmq.REQ) self.req_driver_socket = self.context.socket(zmq.REQ)
self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") self.req_driver_socket.connect(f"tcp://{LIVE_REP_SOCKET}")
def disconnect(self): def disconnect(self):
self.device_cmd(DeviceCmd.SetConnectionOff) self.device_cmd(DeviceCmd.SetConnectionOff)
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
if rb == self.ok: if rb == self.ok:
self.send(DeviceConnectedMsg(False)) self.send(DeviceConnectedMsg(False))
else: else:
@ -99,7 +101,7 @@ class Device(Node):
def enable(self): def enable(self):
self.device_cmd(DeviceCmd.SetEnableOn) self.device_cmd(DeviceCmd.SetEnableOn)
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
if rb == self.ok: if rb == self.ok:
self.send(DeviceEnabledMsg(True)) self.send(DeviceEnabledMsg(True))
return True return True
@ -109,7 +111,7 @@ class Device(Node):
def disable(self): def disable(self):
self.device_cmd(DeviceCmd.SetEnableOff) self.device_cmd(DeviceCmd.SetEnableOff)
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
if rb == self.ok: if rb == self.ok:
self.send(DeviceEnabledMsg(False)) self.send(DeviceEnabledMsg(False))
return True return True
@ -131,7 +133,7 @@ class Device(Node):
def get_enable(self): def get_enable(self):
self.device_cmd(DeviceCmd.GetEnable) self.device_cmd(DeviceCmd.GetEnable)
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
match rb: match rb:
case b'true': case b'true':
self.send(DeviceEnabledMsg(True)) self.send(DeviceEnabledMsg(True))
@ -144,13 +146,13 @@ class Device(Node):
def get_seq_meta_name(self): def get_seq_meta_name(self):
self.device_cmd(DeviceCmd.GetName) self.device_cmd(DeviceCmd.GetName)
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
if rb != b'': if rb != b'':
self.send(SeqMetaMsg('live', rb.decode())) self.send(SeqMetaMsg('live', rb.decode()))
def get_connection(self): def get_connection(self):
self.device_cmd(DeviceCmd.GetConnection) self.device_cmd(DeviceCmd.GetConnection)
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
match rb: match rb:
case b'true': case b'true':
self.send(DeviceConnectedMsg(True)) self.send(DeviceConnectedMsg(True))
@ -175,7 +177,7 @@ class Device(Node):
name_encoded = name.encode() name_encoded = name.encode()
self.device_cmd(DeviceCmd.SetNameAndFileOnly, self.device_cmd(DeviceCmd.SetNameAndFileOnly,
struct.pack('I', name_encoded.__len__()) + name.encode() + txt.encode()) struct.pack('I', name_encoded.__len__()) + name.encode() + txt.encode())
rb = self.device_rep_socket.recv() rb = self.req_driver_socket.recv()
if rb == self.ok: if rb == self.ok:
return True return True
else: else:
@ -184,15 +186,19 @@ class Device(Node):
def get_data(self): def get_data(self):
self.device_cmd(DeviceCmd.GetData) self.device_cmd(DeviceCmd.GetData)
return self.device_rep_socket.recv() return self.req_driver_socket.recv()
def set_zero(self): def set_zero(self):
self.device_cmd(DeviceCmd.SetZero) self.device_cmd(DeviceCmd.SetZero)
return self.device_rep_socket.recv() return self.req_driver_socket.recv()
def custom_setup(self): def custom_setup(self):
self.device_rep_socket = self.context.socket(zmq.REQ) self.rep_socket = self.context.socket(zmq.REP)
self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") self.rep_socket.bind(f"tcp://localhost:5561")
self.c.poller.register(self.rep_socket, zmq.POLLIN)
self.req_driver_socket = self.context.socket(zmq.REQ)
self.req_driver_socket.connect(f"tcp://{LIVE_REP_SOCKET}")
self.loop2_t = Thread(target=self.loop2) self.loop2_t = Thread(target=self.loop2)
self.loop2_t.start() self.loop2_t.start()
@ -210,45 +216,50 @@ class Device(Node):
# self.setfile(arr[0][1]) # self.setfile(arr[0][1])
logger.debug(f'device start loop') logger.debug(f'device start loop')
while True: while True:
msg = self.recv() d = dict(self.c.poller.poll())
if isinstance(msg, KillMsg): if self.c.sub in d:
if msg.name == '': msg = self.recv()
return if isinstance(msg, KillMsg):
elif isinstance(msg, SetDeviceEnabledMsg): if msg.name == '':
if msg.value: return
self.enable() elif isinstance(msg, SetDeviceEnabledMsg):
else: if msg.value:
self.disable() self.enable()
elif isinstance(msg, SetDeviceConnectedMsg): else:
logger.info('connecting to device') self.disable()
if msg.value: elif isinstance(msg, SetDeviceConnectedMsg):
self.connect() logger.info('connecting to device')
else: if msg.value:
self.disconnect() self.connect()
elif isinstance(msg, SetDeviceConfigMsg): else:
self.set_name_and_file(msg.name, msg.txt) self.disconnect()
self.send(SeqMetaMsg('live', msg.name)) elif isinstance(msg, SetDeviceConfigMsg):
elif isinstance(msg, RequestRfFrameMsg): self.set_name_and_file(msg.name, msg.txt)
if self.switch: self.send(SeqMetaMsg('live', msg.name))
braw = self.get_data() elif isinstance(msg, RequestRfFrameMsg):
if braw == b'': if self.switch:
logger.warning('empty msg!') braw = self.get_data()
continue if braw == b'':
_, sequence_id, encoder = struct.unpack_from('=iqi', braw) logger.warning('empty msg!')
buffer = braw[4 + 8 + 4:] continue
# logger.debug('send') _, sequence_id, encoder = struct.unpack_from('=iqi', braw)
self.send(RfFrameWithMetaMsg(0, RfFrameMeta( buffer = braw[4 + 8 + 4:]
encoder=encoder, sequence_id=sequence_id # logger.debug('send')
), buffer)) self.send(RfFrameWithMetaMsg(0, RfFrameMeta(
else: encoder=encoder, sequence_id=sequence_id
logger.warning('device not online') ), buffer))
elif isinstance(msg, DeviceZero): else:
self.set_zero() logger.warning('device not online')
elif isinstance(msg, DeviceSwitchMsg): elif isinstance(msg, DeviceZero):
self.switch = msg.value == 'GREEN' self.set_zero()
# elif isinstance(msg, SetDeviceSwitchMsg): elif isinstance(msg, DeviceSwitchMsg):
# logger.info(f'{msg.value}') self.switch = msg.value == 'GREEN'
# if msg.value: # elif isinstance(msg, SetDeviceSwitchMsg):
# self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}") # logger.info(f'{msg.value}')
# else: # if msg.value:
# self.device_rep_socket.close() # self.device_rep_socket.connect(f"tcp://{LIVE_REP_SOCKET}")
# else:
# self.device_rep_socket.close()
if self.rep_socket in d:
self.rep_socket.recv()
self.rep_socket.send(self.get_data())

View File

@ -1,21 +1,21 @@
import logging import logging
import struct
import time import time
from pathlib import Path
import numpy as np
import zmq import zmq
from config import IMAGING_CONFIG from config import IMAGING_CONFIG
from nodes.Node import Node from nodes.Node import Node
from utils.Msg import ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, \ from utils.Msg import ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, \
ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg, RobotRtsiMsg, SeqMetaMsg ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RobotRtsiMsg, SeqMetaMsg
from utils.RfFile import RfSequenceMeta from utils.RfFile import RfSequenceMeta
from utils.RfMeta import RfFrameMeta
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Muxer(Node): class Muxer(Node):
topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, RfFrameWithMetaMsg, RequestRfFrameMsg, topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, RfFrameWithMetaMsg,
ImageArgMsg, RobotRtsiMsg, SeqMetaMsg] ImageArgMsg, RobotRtsiMsg, SeqMetaMsg]
def __init__(self, level=logging.INFO): def __init__(self, level=logging.INFO):
@ -24,6 +24,16 @@ class Muxer(Node):
self.seq_meta_live = None self.seq_meta_live = None
self.seq_meta_playback = None self.seq_meta_playback = None
self.play_mode = None self.play_mode = None
self.rep_socket: zmq.Socket = None
self.req_driver_socket: zmq.Socket = None
self.record_rf_msg: RfFrameWithMetaMsg | None = None
def custom_setup(self):
self.rep_socket: zmq.Socket = self.c.ctx.socket(zmq.REP)
self.rep_socket.bind('tcp://localhost:5560')
self.req_driver_socket: zmq.Socket = self.c.ctx.socket(zmq.REQ)
self.req_driver_socket.connect('tcp://localhost:5561')
self.c.poller.register(self.rep_socket, zmq.POLLIN)
@property @property
def current_meta(self): def current_meta(self):
@ -33,13 +43,46 @@ class Muxer(Node):
}.get(self.play_mode, None) }.get(self.play_mode, None)
return seq_meta return seq_meta
def process_request(self):
self.rep_socket.recv()
if self.play_mode is None:
self.rep_socket.send(BeamformerMsg(b'init').encode_msg())
return
match self.play_mode:
case 'playback':
if self.record_rf_msg is None:
self.rep_socket.send(BeamformerMsg(b'nop').encode_msg())
return
data_msg = self.record_rf_msg
case 'live':
self.req_driver_socket.send(b'')
driver_data_raw = self.req_driver_socket.recv()
if driver_data_raw == b'':
# todo fixit driver no empty
self.rep_socket.send(BeamformerMsg(b'nop').encode_msg())
return
_, sequence_id, encoder = struct.unpack_from('=iqi', driver_data_raw)
driver_data_body = driver_data_raw[4 + 8 + 4:]
data_msg = RfFrameWithMetaMsg(0, RfFrameMeta(
encoder=encoder, sequence_id=sequence_id
), driver_data_body)
case _:
raise NotImplementedError()
if (data_msg.data.__len__() // 2) != self.current_meta.prod():
self.rep_socket.send(BeamformerMsg(b'nop').encode_msg())
self.rep_socket.send(BeamformerMsg(
SeqMetaMsg('any', self.current_meta.name).encode_msg() +
self.arg.encode_msg() +
data_msg.encode_msg()
).encode_msg())
def loop(self): def loop(self):
self.rtsi = RobotRtsiMsg( self.rtsi = RobotRtsiMsg(
pos=(0, 0, 0, 0, 0, 0), pos=(0, 0, 0, 0, 0, 0),
force=(0, 0, 0, 0, 0, 0), force=(0, 0, 0, 0, 0, 0),
) )
device_socket = self.context.socket(zmq.PULL) device_socket = self.context.socket(zmq.PULL)
self.sf = None
self.arg = ImageArgMsg('', t_start=0, t_end=1499) self.arg = ImageArgMsg('', t_start=0, t_end=1499)
self.c.poller.register(device_socket, zmq.POLLIN) self.c.poller.register(device_socket, zmq.POLLIN)
time.sleep(1) time.sleep(1)
@ -49,6 +92,8 @@ class Muxer(Node):
for k in socks: for k in socks:
if k == device_socket: if k == device_socket:
pass pass
if k == self.rep_socket:
self.process_request()
if k == self.c.sub: if k == self.c.sub:
msg = self.recv() msg = self.recv()
if isinstance(msg, RfFrameWithMetaMsg): if isinstance(msg, RfFrameWithMetaMsg):
@ -71,21 +116,8 @@ class Muxer(Node):
msg.encode_msg() msg.encode_msg()
)) ))
elif msg.sender == 1: elif msg.sender == 1:
self.sf = msg self.record_rf_msg = msg
elif isinstance(msg, RequestRfFrameMsg):
if self.play_mode is None:
self.send(BeamformerMsg(b'init'))
elif self.play_mode == 'playback':
if self.sf is None:
self.send(BeamformerMsg(b'nop'))
elif (self.sf.data.__len__() // 2) != np.prod(self.seq_meta_playback.shape):
self.send(BeamformerMsg(b'nop'))
else:
self.send(BeamformerMsg(
SetSeqMetaMsg('any', self.seq_meta_playback.name).encode_msg() +
self.arg.encode_msg() +
self.sf.encode_msg()
))
elif isinstance(msg, KillMsg): elif isinstance(msg, KillMsg):
if msg.name == '': if msg.name == '':
return return

View File

@ -114,6 +114,12 @@ class RfSequenceMeta(RfMeta):
mode: Annotated[RfSequenceMode, 'M'] = RfSequenceMode.PWI mode: Annotated[RfSequenceMode, 'M'] = RfSequenceMode.PWI
us: Annotated[int, 'U'] = None us: Annotated[int, 'U'] = None
def prod(self):
res = 1
for i in self.shape:
res *= int(i)
return res
if __name__ == '__main__': if __name__ == '__main__':
r = RfSequenceMeta.from_name('asdasd1,S=(64 1501),M=PWI,U=30') r = RfSequenceMeta.from_name('asdasd1,S=(64 1501),M=PWI,U=30')

View File

@ -22,10 +22,8 @@ def test2():
# d.enable() # d.enable()
# d.setfile(Path('/home/lambda/source/scarlet/flandre/config/device/AA256,U=30,M=PWI,S=(256 1502).txt').read_text()) # d.setfile(Path('/home/lambda/source/scarlet/flandre/config/device/AA256,U=30,M=PWI,S=(256 1502).txt').read_text())
# time.sleep(2) # time.sleep(2)
while True: r = d.get_data()
r = d.get_data() print(r.__len__())
print(r.__len__())
break
# d.disable() # d.disable()
# d.disconnect() # d.disconnect()