From f51d244466975bc9aeca25e8f5656ec2d5fc372d Mon Sep 17 00:00:00 2001 From: remilia Date: Wed, 26 Feb 2025 21:41:15 +0800 Subject: [PATCH] add request --- src/nodes/Beamformer.py | 56 +++++++-------- src/nodes/Device.py | 17 ++++- src/nodes/ImageFFMPEG.py | 23 ++++-- src/nodes/Loader.py | 11 +-- src/nodes/Muxer.py | 152 ++++++++++++++++----------------------- src/utils/Msg.py | 58 ++++++++++++++- test/kdemain.py | 10 +-- test/test.py | 2 + test/testmiio.py | 13 ++-- 9 files changed, 199 insertions(+), 143 deletions(-) diff --git a/src/nodes/Beamformer.py b/src/nodes/Beamformer.py index 028cf30..8607ab8 100644 --- a/src/nodes/Beamformer.py +++ b/src/nodes/Beamformer.py @@ -1,26 +1,20 @@ import logging -import struct import time -from pathlib import Path import cupy as cp import cv2 -import numpy as np -import zmq -from config import PLAYBACK_SOCKET, VIDEO_WIDTH, VIDEO_HEIGHT, LIVE_SOCKET, IMAGING_CONFIG +from config import VIDEO_WIDTH, VIDEO_HEIGHT from nodes.Node import Node -from utils.Msg import BMMsg, ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, \ - RecordFrameMsg, ImagingConfigNameListMsg, Msg, RfFrameMsg -from utils.RfFile import RfFrame, RfSequenceMeta +from utils.Msg import BMMsg, ImageArgMsg, SetSeqMetaMsg, Msg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg +from utils.RfFile import RfSequenceMeta from utils.RfMat import RfMat -from utils.RfMeta import RfFrameMeta logger = logging.getLogger(__name__) class Beamformer(Node): - topics = [RfFrameMsg] + topics = [BeamformerMsg] def __init__(self, level=logging.INFO): super(Beamformer, self).__init__(level=level) @@ -40,24 +34,28 @@ class Beamformer(Node): self.send(BMMsg(0, d2.__bytes__())) def loop(self): - req_socket = self.context.socket(zmq.REQ) - req_socket.connect('tcp://127.0.0.1:5559') time.sleep(1) while True: - req_socket.send(b'') - r = req_socket.recv() - if r.__len__() > 0: - id2 = r.index(Msg.magic(), 1) - id3 = r.index(Msg.magic(), id2 + 1) - print(id2, id3) - seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[:id2]) - arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3]) - b_msg: RfFrameMsg = Msg.decode_msg(r[id3:]) - s = b_msg.data - fb1 = cp.frombuffer(s, dtype=cp.int16) - seq_meta = RfSequenceMeta.from_name(seq_msg.name) - mat = RfMat(fb1.reshape(seq_meta.shape), RfFrameMeta( - encoder=b_msg.encoder, - sequence_id=b_msg.sequence_id, - ), seq_meta) - self.process(mat, arg_msg) + self.send(RequestRfFrameMsg()) + r = dict(self.c.poller.poll(1000)) + if self.c.sub in r: + msg = self.recv() + if isinstance(msg, BeamformerMsg): + if msg.value == b'init': + time.sleep(1) + continue + if msg.value == b'nop': + continue + r = msg.value + id2 = r.index(Msg.magic(), 1) + id3 = r.index(Msg.magic(), id2 + 1) + seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[:id2]) + arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3]) + b_msg: RfFrameWithMetaMsg = Msg.decode_msg(r[id3:]) + s = b_msg.data + fb1 = cp.frombuffer(s, dtype=cp.int16) + seq_meta = RfSequenceMeta.from_name(seq_msg.name) + mat = RfMat(fb1.reshape(seq_meta.shape), b_msg.meta, seq_meta) + self.process(mat, arg_msg) + else: + logger.debug('package lose') diff --git a/src/nodes/Device.py b/src/nodes/Device.py index dd1f8ba..746ea21 100644 --- a/src/nodes/Device.py +++ b/src/nodes/Device.py @@ -1,4 +1,5 @@ import logging +import struct import subprocess import time @@ -7,13 +8,15 @@ import zmq from config import LIVE_REP_SOCKET, CONFIG, DEVICE_CONFIG from nodes.Node import Node from utils.Msg import ImageArgMsg, KillMsg, SetDeviceConnectedMsg, SetDeviceEnabledMsg, DeviceEnabledMsg, \ - DeviceConnectedMsg, SetDeviceConfigMsg, DeviceOnlineMsg, DeviceConfigListMsg + DeviceConnectedMsg, SetDeviceConfigMsg, DeviceOnlineMsg, DeviceConfigListMsg, RequestRfFrameMsg, RfFrameMsg, \ + RfFrameWithMetaMsg +from utils.RfMeta import RfFrameMeta logger = logging.getLogger(__name__) class Device(Node): - topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg] + topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg, RequestRfFrameMsg] def __init__(self, level=logging.INFO): super(Device, self).__init__(level=level) @@ -137,7 +140,6 @@ class Device(Node): logger.debug(f'device start loop') while True: msg = self.recv() - logger.debug(f'{msg}') if isinstance(msg, KillMsg): if msg.name == '': return @@ -153,3 +155,12 @@ class Device(Node): self.disconnect() elif isinstance(msg, SetDeviceConfigMsg): self.setfile(msg.value) + elif isinstance(msg, RequestRfFrameMsg): + braw = self.data() + if braw == b'': + continue + _, sequence_id, encoder = struct.unpack_from('=iqi', braw) + buffer = braw[4 + 8 + 4:] + self.send(RfFrameWithMetaMsg(0, RfFrameMeta( + encoder=encoder, sequence_id=sequence_id + ), buffer)) diff --git a/src/nodes/ImageFFMPEG.py b/src/nodes/ImageFFMPEG.py index 3a79c21..3be7012 100644 --- a/src/nodes/ImageFFMPEG.py +++ b/src/nodes/ImageFFMPEG.py @@ -27,31 +27,42 @@ class ImageFFMPEG(Node): '-f', 'rawvideo', '-pixel_format', 'rgb24', '-video_size', '1080x1920', - '-framerate', '60', + '-framerate', '24', '-hwaccel', 'nvdec', '-i', '-', '-vcodec', 'h264_nvenc', - '-preset', 'faster', + '-preset', 'fast', + '-gpu', '1', + + # '-profile:v', 'high', + '-zerolatency', '1', + # '-tune', 'ull', + # '-level', '42', + '-pix_fmt', 'yuv420p', # '-vcodec', 'libx264', - '-b:v', '40M', + # '-b:v', '40M', '-f', 'flv', - 'rtmp://localhost/live/livestream' + 'rtmp://q1hyb.as/live/bscan' # '-f', 'mpegts', # 'srt://localhost:10080?streamid=#!::r=live/livestream,m=publish' ], stdin=subprocess.PIPE, ) + lasttime = time.time() while True: # socks = dict(self.c.poller.poll(1 / 30)) - events = self.c.poller.poll(1) + events = self.c.poller.poll(1000 / 24) if events: msg: BMMsg = Msg.decode_msg(events[0][0].recv()) b = np.frombuffer(msg.data, dtype=np.uint8) b = np.reshape(b, (VIDEO_HEIGHT, VIDEO_WIDTH, 4))[:, :, :3] self.buffer = b.tobytes() p.stdin.write(self.buffer) - time.sleep(1 / 60) + # time.sleep(1 / 60) + currenttime = time.time() + logger.debug(f'{currenttime - lasttime}') + lasttime = currenttime if __name__ == '__main__': diff --git a/src/nodes/Loader.py b/src/nodes/Loader.py index 76ce17f..62a9281 100644 --- a/src/nodes/Loader.py +++ b/src/nodes/Loader.py @@ -8,7 +8,7 @@ import zmq from config import PLAYBACK_SOCKET_PORT, SOFTWARE_CONFIG from nodes.Node import Node from utils.Msg import MoveAxisMsg, KillMsg, SetSeqMetaMsg, SeqIdMinMax, SetBaseMsg, SeqListMsg, SeqIdList, \ - SetSidMsg + SetSidMsg, RfFrameWithMetaMsg from utils.RfFile import RfSequence logger = logging.getLogger(__name__) @@ -33,10 +33,11 @@ class Loader(Node): pass elif isinstance(msg, SetSidMsg): frame = rff.frames[msg.value] - buffer = io.BytesIO() - buffer.write(struct.pack('=iqi', 114514, frame.meta.sequence_id, frame.meta.encoder)) - buffer.write(frame.bytes) - playback_socket.send(buffer.getvalue()) + # buffer = io.BytesIO() + # buffer.write(struct.pack('=iqi', 114514, frame.meta.sequence_id, frame.meta.encoder)) + # buffer.write(frame.bytes) + # playback_socket.send(buffer.getvalue()) + self.send(RfFrameWithMetaMsg(1, frame.meta, frame.bytes)) elif isinstance(msg, SetSeqMetaMsg): if base is None: continue diff --git a/src/nodes/Muxer.py b/src/nodes/Muxer.py index a51b3c9..2d88bc4 100644 --- a/src/nodes/Muxer.py +++ b/src/nodes/Muxer.py @@ -1,26 +1,22 @@ import logging -import struct import time from pathlib import Path -import cupy as cp -import cv2 import numpy as np import zmq -from config import PLAYBACK_SOCKET, VIDEO_WIDTH, VIDEO_HEIGHT, LIVE_SOCKET, IMAGING_CONFIG +from config import IMAGING_CONFIG from nodes.Node import Node -from utils.Msg import BMMsg, ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, \ - RecordFrameMsg, ImagingConfigNameListMsg, RfFrameMsg -from utils.RfFile import RfFrame, RfSequenceMeta -from utils.RfMat import RfMat -from utils.RfMeta import RfFrameMeta +from utils.Msg import ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, \ + ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg +from utils.RfFile import RfSequenceMeta logger = logging.getLogger(__name__) class Muxer(Node): - topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg] + topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, RfFrameWithMetaMsg, RequestRfFrameMsg, + ImageArgMsg] def __init__(self, level=logging.INFO): super(Muxer, self).__init__(level=level) @@ -33,94 +29,70 @@ class Muxer(Node): self.play_mode = None @property - def current_seq_meta(self): - seq_meta = None - match self.play_mode: - case 'live': - seq_meta = self.seq_meta_live - case 'playback': - seq_meta = self.seq_meta_playback + def current_meta(self): + seq_meta: RfSequenceMeta | None = { + "live": self.seq_meta_live, + "playback": self.seq_meta_playback, + }.get(self.play_mode, None) return seq_meta def loop(self): device_socket = self.context.socket(zmq.PULL) - device_socket.setsockopt(zmq.CONFLATE, 1) - rep_socket = self.context.socket(zmq.REP) - rep_socket.bind('tcp://*:5559') + self.sf = None self.arg = ImageArgMsg('', t_start=0, t_end=1499) self.c.poller.register(device_socket, zmq.POLLIN) - self.c.poller.register(rep_socket, zmq.POLLIN) - rfframemsg: RfFrameMsg | None = None time.sleep(1) self.send(ImagingConfigNameListMsg([path.stem for path in IMAGING_CONFIG.glob('*.json')])) while True: socks = dict(self.c.poller.poll()) - if device_socket in socks and socks[device_socket] == zmq.POLLIN: - buffer = device_socket.recv() - device_socket.disconnect(f"tcp://{LIVE_SOCKET}") - device_socket.connect(f"tcp://{LIVE_SOCKET}") - logger.debug(f'device receive {buffer.__len__()}') - _, sequence_id, encoder = struct.unpack_from('=iqi', buffer) - buf = buffer[4 + 8 + 4:] - logger.debug(f'live meta {self.seq_meta_live}, playback meta {self.seq_meta_playback}') - seq_meta: RfSequenceMeta | None = { - "live": self.seq_meta_live, - "playback": self.seq_meta_playback, - }.get(self.play_mode, None) - if seq_meta is not None: - if (buf.__len__() // 2) == np.prod(seq_meta.shape): - self.seq_meta = seq_meta - rfframemsg = RfFrameMsg(sequence_id, encoder, buf) - fm = RfFrameMeta( - encoder=encoder, - sequence_id=sequence_id, - ) - if self.record_enable: - (self.record_path / fm.filename).write_bytes(buf) - self.send(RecordFrameMsg(buf.__len__(), sequence_id)) - if rep_socket in socks and socks[rep_socket] == zmq.POLLIN: - rep_socket.recv() - if rfframemsg is None or self.seq_meta is None: - rep_socket.send(b'') - else: - rep_socket.send(SetSeqMetaMsg('any', self.seq_meta.name).encode_msg() + + for k in socks: + if k == device_socket: + pass + if k == self.c.sub: + msg = self.recv() + if isinstance(msg, RfFrameWithMetaMsg): + if msg.sender == 0 and self.play_mode == 'live': + self.send(BeamformerMsg( + SetSeqMetaMsg('any', self.seq_meta_live.name).encode_msg() + + self.arg.encode_msg() + + msg.encode_msg() + )) + elif msg.sender == 1: + self.sf = msg + elif isinstance(msg, RequestRfFrameMsg): + if self.play_mode is None: + self.send(BeamformerMsg(b'init')) + elif self.play_mode == 'playback': + if self.sf is None: + self.send(BeamformerMsg(b'nop')) + elif (self.sf.data.__len__() // 2) != np.prod(self.seq_meta_playback.shape): + self.send(BeamformerMsg(b'nop')) + else: + self.send(BeamformerMsg( + SetSeqMetaMsg('any', self.seq_meta_playback.name).encode_msg() + self.arg.encode_msg() + - rfframemsg.encode_msg()) - if self.c.sub in socks and socks[self.c.sub] == zmq.POLLIN: - msg = self.recv() - if isinstance(msg, KillMsg): - if msg.name == '': - return - if isinstance(msg, ImageArgMsg): - self.arg = msg - if isinstance(msg, SetSeqMetaMsg): - match msg.target: - case 'live': - self.seq_meta_live = RfSequenceMeta.from_name(msg.name) - case 'playback': - self.seq_meta_playback = RfSequenceMeta.from_name(msg.name) - if isinstance(msg, SetPlayMode): - logger.debug(f'set playmode {msg}') - self.play_mode = msg.value - if msg.value == 'live': - try: - device_socket.disconnect(f"tcp://{PLAYBACK_SOCKET}") - except: - pass - device_socket.connect(f"tcp://{LIVE_SOCKET}") - elif msg.value == 'playback': - try: - device_socket.disconnect(f"tcp://{LIVE_SOCKET}") - except: - pass - device_socket.connect(f"tcp://{PLAYBACK_SOCKET}") - logger.debug('connect to playback') - if isinstance(msg, SetRecordMsg): - self.record_enable = msg.enable - if msg.enable: - seq_meta = self.current_seq_meta - self.record_commit = msg.commit - seq_meta.commit = msg.commit - p = Path(msg.base) / seq_meta.name - p.mkdir(parents=True, exist_ok=True) - self.record_path = p + self.sf.encode_msg() + )) + elif isinstance(msg, KillMsg): + if msg.name == '': + return + elif isinstance(msg, ImageArgMsg): + self.arg = msg + elif isinstance(msg, SetSeqMetaMsg): + match msg.target: + case 'live': + self.seq_meta_live = RfSequenceMeta.from_name(msg.name) + case 'playback': + self.seq_meta_playback = RfSequenceMeta.from_name(msg.name) + elif isinstance(msg, SetPlayMode): + logger.debug(f'set playmode {msg}') + self.play_mode = msg.value + elif isinstance(msg, SetRecordMsg): + self.record_enable = msg.enable + if msg.enable: + seq_meta = self.current_seq_meta + self.record_commit = msg.commit + seq_meta.commit = msg.commit + p = Path(msg.base) / seq_meta.name + p.mkdir(parents=True, exist_ok=True) + self.record_path = p diff --git a/src/utils/Msg.py b/src/utils/Msg.py index f5d9808..ad38a73 100644 --- a/src/utils/Msg.py +++ b/src/utils/Msg.py @@ -4,6 +4,8 @@ import struct from enum import auto, Enum from pathlib import Path +from utils.RfMeta import RfFrameMeta + class BG(Enum): Msg1 = auto() @@ -34,6 +36,10 @@ class BG(Enum): SetSidMsg = auto() ImagingConfigNameListMsg = auto() RfFrameMsg = auto() + RfFrameWithMetaMsg = auto() + BytesMsg = auto() + BeamformerMsg = auto() + RequestRfFrameMsg = auto() class Msg: @@ -249,19 +255,69 @@ class RfFrameMsg(Msg): ) + self.data @classmethod - def decode(cls, data: bytes) -> 'Msg': + def decode(cls, data: bytes) -> 'RfFrameMsg': return cls( *struct.unpack('II', data[:8]), data[8:] ) +class RfFrameWithMetaMsg(Msg): + SPLIT = struct.pack('I', 114514) + + def __init__(self, sender: int, meta: RfFrameMeta, data: bytes): + self.sender = sender + self.meta = meta + self.data = data + + def encode(self) -> bytes: + return struct.pack('I', self.sender) + self.meta.name.encode() + self.SPLIT + self.data + + @classmethod + def decode(cls, data: bytes) -> 'RfFrameWithMetaMsg': + id2 = data.index(cls.SPLIT) + return cls( + sender=struct.unpack('I', data[:4])[0], + meta=RfFrameMeta.from_name(data[4:id2].decode()), + data=data[id2 + 4:], + ) + + +@dataclasses.dataclass +class BytesMsg(Msg): + value: bytes + + def encode(self) -> bytes: + return self.value + + @classmethod + def decode(cls, data: bytes) -> 'Msg': + return cls(data) + + def split(self): + pass + # id2 = r.index(Msg.magic(), 1) + # id3 = r.index(Msg.magic(), id2 + 1) + # print(id2, id3) + # seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[0:id2]) + # arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3]) + # b_msg: RfFrameMsg = Msg.decode_msg(r[id3:-1]) + + +class BeamformerMsg(BytesMsg): + pass + + @dataclasses.dataclass class RobotRtsiMsg(Msg): pos: tuple[float, float, float, float, float, float] force: tuple[float, float, float, float, float, float] +class RequestRfFrameMsg(Msg): + pass + + def test(): values = set(item.name for item in BG) for k in globals().keys(): diff --git a/test/kdemain.py b/test/kdemain.py index 3dada84..3887655 100644 --- a/test/kdemain.py +++ b/test/kdemain.py @@ -6,8 +6,9 @@ from nodes.Beamformer import Beamformer from nodes.Broker import Broker from nodes.Device import Device from nodes.ImageCV import ImageCV +from nodes.ImageFFMPEG import ImageFFMPEG from nodes.Loader import Loader -from nodes.Muxer import Receiver +from nodes.Muxer import Muxer from nodes.Robot import Robot from nodes.WebRTC import WebRTC from qtonly import kde_pyqt6_mainui @@ -20,13 +21,14 @@ if __name__ == '__main__': pps = [] ps = [ Broker(), - # WebRTC(), + WebRTC(), kde_pyqt6_mainui, Device(level=logging.DEBUG), + ImageFFMPEG(), ImageCV(level=logging.DEBUG), - Beamformer(), + Beamformer(level=logging.DEBUG), Loader(), - Receiver(), + Muxer(level=logging.DEBUG), Robot(), ] for p in ps: diff --git a/test/test.py b/test/test.py index 926443f..245fdf2 100644 --- a/test/test.py +++ b/test/test.py @@ -1,2 +1,4 @@ if __name__ == '__main__': print(tuple(str((1, 2, 3)))) + print([1,2,3][-1:1]) + print([1,2,3][1:]) diff --git a/test/testmiio.py b/test/testmiio.py index 5f8599f..d807c9e 100644 --- a/test/testmiio.py +++ b/test/testmiio.py @@ -2,9 +2,12 @@ from miio.miioprotocol import MiIOProtocol from config import SWITCH1_IP, SWITCH1_TOKEN, SWITCH2_IP, SWITCH2_TOKEN from miio import Device +from utils.mi import c1_disconnect, c1_connect + if __name__ == '__main__': - m = MiIOProtocol( - SWITCH1_IP, SWITCH1_TOKEN, - ) - r = m.send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}]) - print(r[0]['value']) \ No newline at end of file + # m = MiIOProtocol( + # SWITCH1_IP, SWITCH1_TOKEN, + # ) + # r = m.send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}]) + # print(r[0]['value']) + c1_connect() \ No newline at end of file