from enum import Enum, auto from pathlib import Path from typing import Annotated, get_type_hints import cupy as cp import numpy as np from attr import dataclass @dataclass class RfFolder: parent: Path D: str L: int = 30 C: str = 'PAR' SHAPE = (1500, 256) @staticmethod def from_path(p: Path | str) -> 'RfFolder': p = Path(p) D, L, C = p.name.split(',') L = int(L.replace('L=', '')) C = C.replace('C=', '') return RfFolder(p.parent, D, L, C) @property def name(self): return f'{self.D},L={self.L},C={self.C}' @property def path(self): return self.parent / self.name @property def all(self): return [RfFile.from_path(f) for f in self.path.glob('*.bin')] @dataclass class RfFile: folder: RfFolder X: int = None Y: int = None Z: int = None S: int = None E: int = None @staticmethod def from_path(p: Path | str) -> 'RfFile': p = Path(p) folder = RfFolder.from_path(p.parent) return RfFile(folder, **{(c := pair.split('='))[0]: int(c[1]) for pair in p.stem.split(',')}) @property def path(self): arr = [] d = self.__dict__.copy() # print(d) d.__delitem__('folder') for k in d: if d[k] is not None: arr.append(f'{k}={d[k]}') filename = ','.join(arr) + '.bin' # print(filename) return self.folder.path / filename @property def bin(self): return self.path.read_bytes() def test2(): r = RfFile.from_path('/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/TEST1,L=30,C=PAR/S=925,E=4.bin') print(r) COMMIT_KEY = 'COMMIT' class SM: @classmethod @property def p2a(clz): gh = get_type_hints(clz, include_extras=True) return {k: gh[k].__metadata__[0] for k in gh} @classmethod @property def p2t(clz): return get_type_hints(clz) @classmethod @property def a2p(clz): return {v: k for k, v in clz.p2a.items()} @property def name(self): p2a = self.p2a p2t = self.p2t a2p = self.a2p arr = [] if COMMIT_KEY in a2p: cp = a2p[COMMIT_KEY] del p2a[cp] arr.append(f'{self.__getattribute__(cp)}') for p in p2a: t = p2t[p] v = self.__getattribute__(p) if issubclass(t, Enum): vs = v.name elif issubclass(t, tuple): vs = f'({' '.join([str(vv) for vv in v])})' else: vs = str(v) if v is not None: arr.append(f'{p2a[p]}={vs}') return ",".join(arr) @property def filename(self): return self.name + '.bin' @classmethod def from_path(clz, path: Path | str): path = Path(path) if not path.exists(): raise FileNotFoundError(path) if path.is_file(): return clz.from_name(path.stem) elif path.is_dir(): return clz.from_name(path.name) else: raise FileNotFoundError @classmethod def from_name(clz, name: str): p2t = clz.p2t a2p = clz.a2p c = clz() sp = name.split(',') if COMMIT_KEY in a2p: c.__setattr__(a2p[COMMIT_KEY], sp.pop(0)) for pv in sp: a, v = pv.split('=') p = a2p[a] t = p2t[p] if issubclass(t, Enum): c.__setattr__(p, t[v]) elif issubclass(t, tuple): c.__setattr__(p, tuple(int(i) for i in v[1:-1].split(' '))) else: c.__setattr__(p, t(v)) return c class RfFrame: @dataclass class RfFrameMeta(SM): encoder: Annotated[int, 'E'] = None sequence_id: Annotated[int, 'S'] = None # test3 robot_x: Annotated[int, 'X'] = None robot_y: Annotated[int, 'Y'] = None robot_z: Annotated[int, 'Z'] = None def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None): self.data = data self.meta = meta self._seq = seq def save(self, folder: Path): (folder / self.meta.name).write_bytes(self.bytes) @property def seq(self): if self._seq is None and isinstance(self.data, Path): return RfSequence.from_folder(self.data.parent) return self._seq @property def bytes(self): if isinstance(self.data, bytes): return self.data if isinstance(self.data, Path): return self.data.read_bytes() def mat(self, device='gpu'): from utils.RfMat import RfMat if device == 'gpu': arr = cp.frombuffer(self.bytes, dtype=cp.int16) else: arr = np.frombuffer(self.bytes, dtype=np.int16) return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta) @dataclass class RfSequenceMeta(SM): class RfSequenceMode(Enum): PWI = auto() TFM = auto() commit: Annotated[str, COMMIT_KEY] = None shape: Annotated[tuple, 'S'] = None mode: Annotated[RfSequenceMode, 'M'] = RfSequenceMode.PWI us: Annotated[int, 'U'] = None class RfSequence: def __init__(self, frames: list[RfFrame], meta: RfSequenceMeta): self.frames = frames self.meta = meta @classmethod def from_folder(cls, folder: Path | str) -> 'RfSequence': folder = Path(folder) if not folder.exists(): raise FileNotFoundError meta = RfSequenceMeta.from_path(folder) rs = RfSequence([], meta) for f in folder.glob('*.bin'): rs.frames.append(RfFrame(f, RfFrame.RfFrameMeta.from_path(f), seq=rs)) return rs @property def all(self): pass def query(self): pass @property def seq_id_minmax(self): mmin = 1000000 mmax = 0 for f in self.frames: mmin = min(mmin, f.meta.sequence_id) mmax = max(mmax, f.meta.sequence_id) return mmin, mmax if __name__ == '__main__': # t = (1, 2) # f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI') # print(f.commit) # print(f.name) # print(RfSequence.RfSequenceMeta.p2t) # f = RfFrame.RfFrameMeta(123, 345) # print(f.name) # rs = RfSequence([], RfSequenceMeta()) rs = RfSequence.from_folder( '/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)') # print(rs.meta) for frame in rs.frames: if frame.mat().rotate90().show((1080, 1920)) == ord('q'): break