flandre/src/utils/RfFile.py
2025-02-11 23:24:49 +08:00

89 lines
2.5 KiB
Python

from pathlib import Path
import cupy as cp
import numpy as np
from utils.RfMeta import RfFrameMeta, RfSequenceMeta
class RfFrame:
def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None):
self.data = data
self.meta = meta
self._seq = seq
def save(self, folder: Path):
(folder / self.meta.name).write_bytes(self.bytes)
@property
def seq(self):
if self._seq is None and isinstance(self.data, Path):
return RfSequence.from_folder(self.data.parent)
return self._seq
@property
def bytes(self):
if isinstance(self.data, bytes):
return self.data
if isinstance(self.data, Path):
return self.data.read_bytes()
def mat(self, device='gpu'):
from utils.RfMat import RfMat
if device == 'gpu':
arr = cp.frombuffer(self.bytes, dtype=cp.int16)
else:
arr = np.frombuffer(self.bytes, dtype=np.int16)
return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta)
class RfSequence:
def __init__(self, frames: list[RfFrame], meta: RfSequenceMeta):
self.frames = frames
self.meta = meta
@classmethod
def from_folder(cls, folder: Path | str) -> 'RfSequence':
folder = Path(folder)
if not folder.exists():
raise FileNotFoundError
meta = RfSequenceMeta.from_path(folder)
rs = RfSequence([], meta)
for f in folder.glob('*.bin'):
rs.frames.append(RfFrame(f, RfFrameMeta.from_path(f), seq=rs))
return rs
@property
def all(self):
pass
def query(self):
pass
@property
def seq_id_minmax(self):
mmin = 2 ** 32
mmax = 0
for f in self.frames:
mmin = min(mmin, f.meta.sequence_id)
mmax = max(mmax, f.meta.sequence_id)
return mmin, mmax
if __name__ == '__main__':
# t = (1, 2)
# f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI')
# print(f.commit)
# print(f.name)
# print(RfSequence.RfSequenceMeta.p2t)
# f = RfFrame.RfFrameMeta(123, 345)
# print(f.name)
# rs = RfSequence([], RfSequenceMeta())
rs = RfSequence.from_folder(
'/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)')
# print(rs.meta)
for frame in rs.frames:
if frame.mat().rotate90().show((1080, 1920)) == ord('q'):
break