From 24892ea0eacc47e48f7d7e6ea08e687732cd61a4 Mon Sep 17 00:00:00 2001 From: flandre Date: Thu, 16 Jan 2025 15:14:53 +0800 Subject: [PATCH] add rfmat --- src/utils/RfFile.py | 68 +++++++++++++++++----- src/utils/RfMat.py | 133 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 188 insertions(+), 13 deletions(-) create mode 100644 src/utils/RfMat.py diff --git a/src/utils/RfFile.py b/src/utils/RfFile.py index 1800649..08b8261 100644 --- a/src/utils/RfFile.py +++ b/src/utils/RfFile.py @@ -1,8 +1,9 @@ from enum import Enum, auto from pathlib import Path -import inspect from typing import Annotated, get_type_hints +import cupy as cp +import numpy as np from attr import dataclass @@ -112,15 +113,32 @@ class SM: vs = f'({' '.join([str(vv) for vv in v])})' else: vs = str(v) - arr.append(f'{p2a[p]}={vs}') - return ",".join(arr) + '.bin' + if v is not None: + arr.append(f'{p2a[p]}={vs}') + return ",".join(arr) + + @property + def filename(self): + return self.name + '.bin' + + @classmethod + def from_path(clz, path: Path | str): + path = Path(path) + if not path.exists(): + raise FileNotFoundError(path) + if path.is_file(): + return clz.from_name(path.stem) + elif path.is_dir(): + return clz.from_name(path.name) + else: + raise FileNotFoundError @classmethod def from_name(clz, name: str): p2t = clz.p2t a2p = clz.a2p c = clz() - sp = Path(name).stem.split(',') + sp = name.split(',') if COMMIT_KEY in a2p: c.__setattr__(a2p[COMMIT_KEY], sp.pop(0)) for pv in sp: @@ -146,13 +164,20 @@ class RfFrame: robot_y: Annotated[int, 'Y'] = None robot_z: Annotated[int, 'Z'] = None - def __init__(self, data: bytes | Path, meta: RfFrameMeta): + def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None): self.data = data self.meta = meta + self._seq = seq def save(self, folder: Path): (folder / self.meta.name).write_bytes(self.bytes) + @property + def seq(self): + if self._seq is None and isinstance(self.data, Path): + return RfSequence.from_folder(self.data.parent) + return self._seq + @property def bytes(self): if isinstance(self.data, bytes): @@ -160,6 +185,15 @@ class RfFrame: if isinstance(self.data, Path): return self.data.read_bytes() + def mat(self, device='gpu'): + from utils.RfMat import RfMat + + if device == 'gpu': + arr = cp.frombuffer(self.bytes, dtype=cp.int16) + else: + arr = np.frombuffer(self.bytes, dtype=np.int16) + return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta) + @dataclass class RfSequenceMeta(SM): @@ -181,11 +215,13 @@ class RfSequence: @classmethod def from_folder(cls, folder: Path | str) -> 'RfSequence': folder = Path(folder) - meta = RfSequenceMeta.from_name(folder.name) - arr = [] + if not folder.exists(): + raise FileNotFoundError + meta = RfSequenceMeta.from_path(folder) + rs = RfSequence([], meta) for f in folder.glob('*.bin'): - arr.append(RfFrame(f, RfFrame.RfFrameMeta.from_name(f.name))) - return RfSequence(arr, meta) + rs.frames.append(RfFrame(f, RfFrame.RfFrameMeta.from_path(f), seq=rs)) + return rs @property def all(self): @@ -196,11 +232,17 @@ class RfSequence: if __name__ == '__main__': - t = (1, 2) - f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI') + # t = (1, 2) + # f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI') # print(f.commit) - print(f.name) + # print(f.name) # print(RfSequence.RfSequenceMeta.p2t) # f = RfFrame.RfFrameMeta(123, 345) # print(f.name) - rs = RfSequence([], RfSequenceMeta()) + # rs = RfSequence([], RfSequenceMeta()) + rs = RfSequence.from_folder( + '/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)') + # print(rs.meta) + for frame in rs.frames: + if frame.mat().rotate90().show((1080, 1920)) == ord('q'): + break diff --git a/src/utils/RfMat.py b/src/utils/RfMat.py new file mode 100644 index 0000000..d3ff84d --- /dev/null +++ b/src/utils/RfMat.py @@ -0,0 +1,133 @@ +import cv2 +import numpy as np +import cupy as cp + +from utils.RfFile import RfFrame, RfSequenceMeta + + +def bypass(f): + def wrapper(self, *args, **kwargs): + if 'bypass' not in kwargs: + return f(self, *args, **kwargs) + if kwargs['bypass']: + return self + else: + del kwargs['bypass'] + return f(self, *args, **kwargs) + + return wrapper + + +class RfMat: + def __init__(self, + data: cp.ndarray, + frame_meta: RfFrame.RfFrameMeta = None, + seq_meta: RfSequenceMeta = None, + ): + self.m = data + self.cv = False + self.frame_meta = frame_meta + self.seq_meta = seq_meta + if isinstance(data, np.ndarray): + self.device = 'cpu' + elif isinstance(data, cp.ndarray): + self.device = 'gpu' + else: + raise NotImplementedError + + def call(self, f, *args, **kwargs): + return self.copy(f(self.m, *args, **kwargs)) + + def apply(self, f, *args, **kwargs): + f(self.m, *args, **kwargs) + return self + + def copy(self, data=None): + if data is None: + return RfMat(self.m.copy(), self.frame_meta, self.seq_meta) + return RfMat(data, self.frame_meta, self.seq_meta) + + @property + def p(self): + if self.device == 'cpu': + return np + return cp + + def init_cv(self): + cv2.namedWindow('image') + self.cv = True + + def norm(self): + m = self.m.astype(self.p.float32) + m -= m.min() + mmax = m.max() + if mmax == 0: + return self.copy(self.p.zeros_like(m)) + m /= mmax + return self.copy(m) + + def grey(self): + m = self.norm().m + return self.copy((m * 255).astype(self.p.uint8)) + + def cpu(self): + if self.device == 'cpu': + return self + return self.copy(self.m.get()) + + def watermark(self, watermark=None): + assert self.m.dtype == np.uint8 + canvas = np.zeros(self.m.shape, dtype=np.uint8) + ccp = self.copy() + + line1 = '' + line2 = '' + if watermark is not None: + line1 = watermark + else: + if self.frame_meta is not None: + line1 = self.frame_meta.name + if self.seq_meta is not None: + line2 = self.seq_meta.name + + cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, 4, (255,), 8) + cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, 4, (128,), 4) + + cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, 4, (255,), 8) + cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, 4, (128,), 4) + + if canvas.shape.__len__() == 2: + ccp.m[canvas == 255] = 255 + ccp.m[canvas == 128] = 0 + elif canvas.shape[2] == 3: + ccp.m[canvas[:, :] == 255] = 255 + ccp.m[canvas[:, :] == 128] = 0 + else: + raise NotImplementedError() + return ccp + + def show(self, shape=None, watermark=None): + if not self.cv: + self.init_cv() + cv2.imshow('image', self + .grey() + .cpu() + .resize(shape, bypass=shape is None) + .watermark(watermark).m + ) + return cv2.waitKey(0) + + def info(self): + print(f'shape: {self.m.shape},device: {self.device}') + print(self.frame_meta) + print(self.seq_meta) + return self + + @bypass + def resize(self, shape): + if self.device == 'cpu': + return self.copy(cv2.resize(self.m, shape)) + raise NotImplementedError() + + def rotate90(self): + return self.copy(self.p.rot90(self.m, k=3))