diff --git a/flandre/nodes/Loader.py b/flandre/nodes/Loader.py index 141ed23..9d80f0a 100644 --- a/flandre/nodes/Loader.py +++ b/flandre/nodes/Loader.py @@ -7,7 +7,8 @@ import zmq from flandre.config import C, ISDEV from flandre.nodes.Node import Node -from flandre.utils.Msg import MoveAxisMsg, KillMsg, SetSeqMetaMsg, SeqIdMinMax, SetBaseMsg, PlaybackSeqListMsg, SeqIdList, \ +from flandre.utils.Msg import MoveAxisMsg, KillMsg, SetSeqMetaMsg, SeqIdMinMax, SetBaseMsg, PlaybackSeqListMsg, \ + SeqIdList, \ SetSidMsg, RfFrameWithMetaMsg from flandre.utils.RfFile import RfSequence @@ -33,12 +34,17 @@ class Loader(Node): pass elif isinstance(msg, SetSidMsg): frame = rff.frames[msg.value] - self.send(RfFrameWithMetaMsg(1, frame.meta, frame.bytes)) + self.send(RfFrameWithMetaMsg(1, frame.meta, frame.bytes, frame.is_zip)) + elif isinstance(msg, SetSeqMetaMsg): if base is None: continue if msg.target == 'playback': - rff = RfSequence.from_folder(base / msg.name) + logger.info(f'load {msg.name}') + if msg.name.endswith('.zip'): + rff = RfSequence.from_zip(base / msg.name) + else: + rff = RfSequence.from_folder(base / msg.name) self.send(SeqIdMinMax(*rff.seq_id_minmax)) self.send(SeqIdList([f.meta.sequence_id for f in rff.frames])) self.send(SetSidMsg(0)) diff --git a/flandre/nodes/Muxer.py b/flandre/nodes/Muxer.py index 21574f2..9701092 100644 --- a/flandre/nodes/Muxer.py +++ b/flandre/nodes/Muxer.py @@ -1,6 +1,7 @@ import logging import struct import time +from pathlib import Path import zmq @@ -10,13 +11,14 @@ from flandre.utils.Msg import ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RobotRtsiMsg, SeqMetaMsg, DeviceEnabledMsg from flandre.utils.RfFile import RfSequenceMeta from flandre.utils.RfMeta import RfFrameMeta +from flandre.utils.archive import zip_to_bytes logger = logging.getLogger(__name__) class Muxer(Node): topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, RfFrameWithMetaMsg, - ImageArgMsg, RobotRtsiMsg, SeqMetaMsg,DeviceEnabledMsg] + ImageArgMsg, RobotRtsiMsg, SeqMetaMsg, DeviceEnabledMsg] def __init__(self, level=logging.INFO): super(Muxer, self).__init__(level=level) @@ -55,6 +57,18 @@ class Muxer(Node): self.rep_socket.send(BeamformerMsg(b'nop').encode_msg()) return data_msg = self.playback_rf_msg + # logger.info(f'bb {data_msg._header.is_zip}') + if data_msg._header.is_zip: + p = Path(data_msg.data.decode()) + if p.suffix == '.zst': + data_msg = RfFrameWithMetaMsg( + data_msg.sender, + data_msg.meta, + zip_to_bytes(p.parent, int(p.stem)) + ) + else: + raise NotImplementedError() + case 'live': if not self.device_enabled: self.rep_socket.send(BeamformerMsg(b'init').encode_msg()) diff --git a/flandre/utils/Msg.py b/flandre/utils/Msg.py index 72a09b1..3dca708 100644 --- a/flandre/utils/Msg.py +++ b/flandre/utils/Msg.py @@ -307,9 +307,10 @@ class RfFrameWithMetaMsg(HeaderByteMsg): class Header: sender: int meta: str + is_zip: bool - def __init__(self, sender: int, meta: RfFrameMeta, data: bytes): - self._header = self.Header(sender, meta.name) + def __init__(self, sender: int, meta: RfFrameMeta, data: bytes, is_zip: bool = False): + self._header = self.Header(sender, meta.name, is_zip) super().__init__(self._header.__dict__, data) @property @@ -324,8 +325,7 @@ class RfFrameWithMetaMsg(HeaderByteMsg): def decode(cls, data) -> 'RfFrameWithMetaMsg': msg = super(RfFrameWithMetaMsg, cls).decode(data) header = cls.Header(**msg.header) - return RfFrameWithMetaMsg(header.sender, RfFrameMeta.from_name(header.meta), msg.data) - + return RfFrameWithMetaMsg(header.sender, RfFrameMeta.from_name(header.meta), msg.data, header.is_zip) @dataclasses.dataclass class BytesMsg(Msg): @@ -398,10 +398,11 @@ def test(): test() if __name__ == '__main__': - c = HeaderByteMsg(dict(a=1, b='s'), b'asdasd') - c2 = c.decode(c.encode()) - print(c2.header, c2.data) + # c = HeaderByteMsg(dict(a=1, b='s'), b'asdasd') + # c2 = c.decode(c.encode()) + # print(c2.header, c2.data) - c = RfFrameWithMetaMsg(1, RfFrameMeta(1, 11, 111), b'asdasdasdads') + c = RfFrameWithMetaMsg(1, RfFrameMeta(1, 11, 111), b'asdasdasdads',True) + print(c.sender, c.meta, c.data, c._header) c2 = c.decode(c.encode()) - print(c2.sender, c2.meta, c2.data) + print(c2.sender, c2.meta, c2.data, c2._header) diff --git a/flandre/utils/RfFile.py b/flandre/utils/RfFile.py index f3e6a85..b206a5b 100644 --- a/flandre/utils/RfFile.py +++ b/flandre/utils/RfFile.py @@ -1,3 +1,5 @@ +import json +import zipfile from pathlib import Path import cupy as cp @@ -11,6 +13,10 @@ class RfFrame: self.data = data self.meta = meta self._seq = seq + self.is_zip = False + if isinstance(self.data, Path): + if self.data.parent.suffix == '.zip': + self.is_zip = True def save(self, folder: Path): (folder / self.meta.name).write_bytes(self.bytes) @@ -23,11 +29,12 @@ class RfFrame: @property def bytes(self): + if self.is_zip: + return self.data.__str__().encode() if isinstance(self.data, bytes): return self.data if isinstance(self.data, Path): return self.data.read_bytes() - def mat(self, device='gpu'): from flandre.utils.RfMat import RfMat @@ -54,6 +61,20 @@ class RfSequence: rs.frames.append(RfFrame(f, RfFrameMeta.from_path(f), seq=rs)) return rs + @classmethod + def from_zip(cls, z: Path | str) -> 'RfSequence': + z = Path(z) + if not z.exists(): + raise FileNotFoundError + meta = RfSequenceMeta.from_name(z.stem) + rs = RfSequence([], meta) + zz = zipfile.ZipFile(z) + max_idx = max([int(Path(item.filename).stem) for item in zz.infolist()]) + for i in range(max_idx + 1): + j = json.loads(zz.open(f'{i}.json').read()) + rs.frames.append(RfFrame(z / f'{i}.zst', RfFrameMeta(**j), seq=rs)) + return rs + @property def all(self): pass diff --git a/flandre/utils/RfMeta.py b/flandre/utils/RfMeta.py index 5da82f8..9477297 100644 --- a/flandre/utils/RfMeta.py +++ b/flandre/utils/RfMeta.py @@ -1,3 +1,4 @@ +import json from enum import Enum, auto from pathlib import Path from typing import Annotated, get_type_hints @@ -62,6 +63,8 @@ class RfMeta: @classmethod def from_name(clz, name: str): + if Path(name).suffix == '.zip': + name = Path(name).stem p2t = clz.p2t() a2p = clz.a2p() c = clz() @@ -81,6 +84,10 @@ class RfMeta: return c + @property + def json_str(self): + return json.dumps(self.__dict__) + @dataclass class RfFrameMeta(RfMeta): diff --git a/flandre/utils/archive.py b/flandre/utils/archive.py new file mode 100644 index 0000000..dbdbc01 --- /dev/null +++ b/flandre/utils/archive.py @@ -0,0 +1,46 @@ +import json +import subprocess +from pathlib import Path +import zipfile +import zstd + + +TEMP_FOLDER = Path('/mnt/16T/private_dataset/New Folder/temp') + + +def folder_to_zip(folder: Path): + from flandre.utils.RfMeta import RfFrameMeta + for i, file in enumerate(folder.glob('*')): + file = Path(file) + print(i, file, RfFrameMeta.from_path(file).json_str) + + src = file + dst = TEMP_FOLDER / f'{i}.zst' + dstj = TEMP_FOLDER / f'{i}.json' + + dstj.write_text(RfFrameMeta.from_path(file).json_str) + + subprocess.run(['zstd', '-f', src, '-o', dst]) + subprocess.run(['zip', '-0', '-j', '-r', TEMP_FOLDER.parent / f'{folder.name}.zip', TEMP_FOLDER]) + + +def zip_to_bytes(file: Path, name: int): + return zstd.loads(zipfile.ZipFile(file).open(f'{name}.zst').read()) + + +if __name__ == '__main__': + # folder_to_zip(Path('/mnt/16T/private_dataset/us/R1,U=30,M=PWI,S=(256 1502)')) + # r = get(Path('/mnt/16T/private_dataset/New Folder/ST,U=60,M=FMC,S=(256 256 3002).zip'), 3) + # print(r.__len__()) + # r = Path('/mnt/16T/private_dataset/New Folder/ST,U=60,M=FMC,S=(256 256 3002)/X=-204.bin').read_bytes() + # print(r.__len__()) + p = Path('/mnt/16T/private_dataset/us/R1,U=30,M=PWI,S=(256 1502).zip') + # # + archive = zipfile.ZipFile(p) + for item in archive.infolist(): + print(int(Path(item.filename).stem)) + print() + # # imgfile = archive.open('img_01.png') + # print(archive.filelist) + # p2 = archive.open('X=20,Y=395.bin') + # print(p2.read().__len__()) diff --git a/pyproject.toml b/pyproject.toml index 6b22ded..233051f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dependencies = [ "mido[ports-rtmidi]>=1.3.3", "pyjoystick>=1.2.4", "platformdirs>=4.3.6", + "zstd>=1.5.6.7", ] [tool.setuptools.packages.find] diff --git a/test/rtc.py b/test/rtc.py index 93262f1..248af81 100644 --- a/test/rtc.py +++ b/test/rtc.py @@ -17,7 +17,7 @@ pcs = set() async def offer(request): - params = await request.json() + params = await request.json_str() offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) pc = RTCPeerConnection(RTCConfiguration([])) pcs.add(pc) diff --git a/uv.lock b/uv.lock index ba950b5..23aeeb7 100644 --- a/uv.lock +++ b/uv.lock @@ -391,6 +391,7 @@ dependencies = [ { name = "pyzmq" }, { name = "scipy" }, { name = "tqdm" }, + { name = "zstd" }, ] [package.metadata] @@ -407,6 +408,7 @@ requires-dist = [ { name = "pyzmq", specifier = ">=26.2.0" }, { name = "scipy", specifier = ">=1.14.1" }, { name = "tqdm", specifier = ">=4.67.1" }, + { name = "zstd", specifier = ">=1.5.6.7" }, ] [[package]] @@ -1715,3 +1717,16 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c7/82/7eb0669759fdf10a3abf8370defcb8d0d85ec3db7ace40f08fc628efc4f0/zeroconf-0.145.1-cp312-cp312-win32.whl", hash = "sha256:38620c18817026a03afb92ef65642d51a2f4d4a9761f67a6520914709e5cea74", size = 1431638 }, { url = "https://files.pythonhosted.org/packages/6f/2a/2e83d29902a33887b8b8434ca13eb6ba6bca7f66ac29704647c9c4f43bbf/zeroconf-0.145.1-cp312-cp312-win_amd64.whl", hash = "sha256:31ad6428646899206097c05b83dc1f15c04b7292c0c7080459c637e1c8c16329", size = 1661171 }, ] + +[[package]] +name = "zstd" +version = "1.5.6.7" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/92/a7/5f4c6c2a62dd88f58133e333af1c251dd08f98d48445f650889d85e43b7b/zstd-1.5.6.7.tar.gz", hash = "sha256:b3fa8f6bfb5e116b950ad633edccae4f65055a213ab4c00c1d4397f4b257b697", size = 649577 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/74/709140155d6e225782f539185b078329a620c9ff47cebff8c83c26678d0b/zstd-1.5.6.7-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:63be0e292a0b09177595674492387bfde0df7dadad72a8d98d09fc71f40899a1", size = 258010 }, + { url = "https://files.pythonhosted.org/packages/33/b1/a07f2c7197d0cdc5e4902b369e313c1859bca942a276003c484b6c480cb5/zstd-1.5.6.7-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b40aa76745b5fbf2217a94369dd1bb017467acd5f1af802ab7d61ee2c3671e59", size = 209113 }, + { url = "https://files.pythonhosted.org/packages/8f/5c/22af5445f65c684e86a44a166d057bf5ce042ab809998fde52d7a1a12417/zstd-1.5.6.7-cp312-cp312-manylinux_2_4_i686.whl", hash = "sha256:f23f19592f1602932f284d82f9c8e4ff28e0ef384077ab4c2a8d80f888f729d0", size = 259532 }, + { url = "https://files.pythonhosted.org/packages/65/24/46598527f79eff47fcd860d743fd7a517f1a0db1e017b40fdaf805fee34c/zstd-1.5.6.7-cp312-cp312-win32.whl", hash = "sha256:284df0a2bba3a26eb49388a251822043afc82369c7116b11204826258963a675", size = 144843 }, + { url = "https://files.pythonhosted.org/packages/5a/6a/9b92db285ef15b51a1e62662672b1ce3c30dbc20aa75c233ca60b1b4d1f4/zstd-1.5.6.7-cp312-cp312-win_amd64.whl", hash = "sha256:ef6d543208ec11891a2655e38b39cecee0f20e85d34cb3a94b98444908e7e844", size = 160084 }, +]