add rfmat
This commit is contained in:
parent
5727df26c8
commit
24892ea0ea
@ -1,8 +1,9 @@
|
|||||||
from enum import Enum, auto
|
from enum import Enum, auto
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import inspect
|
|
||||||
from typing import Annotated, get_type_hints
|
from typing import Annotated, get_type_hints
|
||||||
|
|
||||||
|
import cupy as cp
|
||||||
|
import numpy as np
|
||||||
from attr import dataclass
|
from attr import dataclass
|
||||||
|
|
||||||
|
|
||||||
@ -112,15 +113,32 @@ class SM:
|
|||||||
vs = f'({' '.join([str(vv) for vv in v])})'
|
vs = f'({' '.join([str(vv) for vv in v])})'
|
||||||
else:
|
else:
|
||||||
vs = str(v)
|
vs = str(v)
|
||||||
|
if v is not None:
|
||||||
arr.append(f'{p2a[p]}={vs}')
|
arr.append(f'{p2a[p]}={vs}')
|
||||||
return ",".join(arr) + '.bin'
|
return ",".join(arr)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def filename(self):
|
||||||
|
return self.name + '.bin'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_path(clz, path: Path | str):
|
||||||
|
path = Path(path)
|
||||||
|
if not path.exists():
|
||||||
|
raise FileNotFoundError(path)
|
||||||
|
if path.is_file():
|
||||||
|
return clz.from_name(path.stem)
|
||||||
|
elif path.is_dir():
|
||||||
|
return clz.from_name(path.name)
|
||||||
|
else:
|
||||||
|
raise FileNotFoundError
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_name(clz, name: str):
|
def from_name(clz, name: str):
|
||||||
p2t = clz.p2t
|
p2t = clz.p2t
|
||||||
a2p = clz.a2p
|
a2p = clz.a2p
|
||||||
c = clz()
|
c = clz()
|
||||||
sp = Path(name).stem.split(',')
|
sp = name.split(',')
|
||||||
if COMMIT_KEY in a2p:
|
if COMMIT_KEY in a2p:
|
||||||
c.__setattr__(a2p[COMMIT_KEY], sp.pop(0))
|
c.__setattr__(a2p[COMMIT_KEY], sp.pop(0))
|
||||||
for pv in sp:
|
for pv in sp:
|
||||||
@ -146,13 +164,20 @@ class RfFrame:
|
|||||||
robot_y: Annotated[int, 'Y'] = None
|
robot_y: Annotated[int, 'Y'] = None
|
||||||
robot_z: Annotated[int, 'Z'] = None
|
robot_z: Annotated[int, 'Z'] = None
|
||||||
|
|
||||||
def __init__(self, data: bytes | Path, meta: RfFrameMeta):
|
def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None):
|
||||||
self.data = data
|
self.data = data
|
||||||
self.meta = meta
|
self.meta = meta
|
||||||
|
self._seq = seq
|
||||||
|
|
||||||
def save(self, folder: Path):
|
def save(self, folder: Path):
|
||||||
(folder / self.meta.name).write_bytes(self.bytes)
|
(folder / self.meta.name).write_bytes(self.bytes)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def seq(self):
|
||||||
|
if self._seq is None and isinstance(self.data, Path):
|
||||||
|
return RfSequence.from_folder(self.data.parent)
|
||||||
|
return self._seq
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def bytes(self):
|
def bytes(self):
|
||||||
if isinstance(self.data, bytes):
|
if isinstance(self.data, bytes):
|
||||||
@ -160,6 +185,15 @@ class RfFrame:
|
|||||||
if isinstance(self.data, Path):
|
if isinstance(self.data, Path):
|
||||||
return self.data.read_bytes()
|
return self.data.read_bytes()
|
||||||
|
|
||||||
|
def mat(self, device='gpu'):
|
||||||
|
from utils.RfMat import RfMat
|
||||||
|
|
||||||
|
if device == 'gpu':
|
||||||
|
arr = cp.frombuffer(self.bytes, dtype=cp.int16)
|
||||||
|
else:
|
||||||
|
arr = np.frombuffer(self.bytes, dtype=np.int16)
|
||||||
|
return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RfSequenceMeta(SM):
|
class RfSequenceMeta(SM):
|
||||||
@ -181,11 +215,13 @@ class RfSequence:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def from_folder(cls, folder: Path | str) -> 'RfSequence':
|
def from_folder(cls, folder: Path | str) -> 'RfSequence':
|
||||||
folder = Path(folder)
|
folder = Path(folder)
|
||||||
meta = RfSequenceMeta.from_name(folder.name)
|
if not folder.exists():
|
||||||
arr = []
|
raise FileNotFoundError
|
||||||
|
meta = RfSequenceMeta.from_path(folder)
|
||||||
|
rs = RfSequence([], meta)
|
||||||
for f in folder.glob('*.bin'):
|
for f in folder.glob('*.bin'):
|
||||||
arr.append(RfFrame(f, RfFrame.RfFrameMeta.from_name(f.name)))
|
rs.frames.append(RfFrame(f, RfFrame.RfFrameMeta.from_path(f), seq=rs))
|
||||||
return RfSequence(arr, meta)
|
return rs
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def all(self):
|
def all(self):
|
||||||
@ -196,11 +232,17 @@ class RfSequence:
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
t = (1, 2)
|
# t = (1, 2)
|
||||||
f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI')
|
# f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI')
|
||||||
# print(f.commit)
|
# print(f.commit)
|
||||||
print(f.name)
|
# print(f.name)
|
||||||
# print(RfSequence.RfSequenceMeta.p2t)
|
# print(RfSequence.RfSequenceMeta.p2t)
|
||||||
# f = RfFrame.RfFrameMeta(123, 345)
|
# f = RfFrame.RfFrameMeta(123, 345)
|
||||||
# print(f.name)
|
# print(f.name)
|
||||||
rs = RfSequence([], RfSequenceMeta())
|
# rs = RfSequence([], RfSequenceMeta())
|
||||||
|
rs = RfSequence.from_folder(
|
||||||
|
'/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)')
|
||||||
|
# print(rs.meta)
|
||||||
|
for frame in rs.frames:
|
||||||
|
if frame.mat().rotate90().show((1080, 1920)) == ord('q'):
|
||||||
|
break
|
||||||
|
|||||||
133
src/utils/RfMat.py
Normal file
133
src/utils/RfMat.py
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import cupy as cp
|
||||||
|
|
||||||
|
from utils.RfFile import RfFrame, RfSequenceMeta
|
||||||
|
|
||||||
|
|
||||||
|
def bypass(f):
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
if 'bypass' not in kwargs:
|
||||||
|
return f(self, *args, **kwargs)
|
||||||
|
if kwargs['bypass']:
|
||||||
|
return self
|
||||||
|
else:
|
||||||
|
del kwargs['bypass']
|
||||||
|
return f(self, *args, **kwargs)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
class RfMat:
|
||||||
|
def __init__(self,
|
||||||
|
data: cp.ndarray,
|
||||||
|
frame_meta: RfFrame.RfFrameMeta = None,
|
||||||
|
seq_meta: RfSequenceMeta = None,
|
||||||
|
):
|
||||||
|
self.m = data
|
||||||
|
self.cv = False
|
||||||
|
self.frame_meta = frame_meta
|
||||||
|
self.seq_meta = seq_meta
|
||||||
|
if isinstance(data, np.ndarray):
|
||||||
|
self.device = 'cpu'
|
||||||
|
elif isinstance(data, cp.ndarray):
|
||||||
|
self.device = 'gpu'
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def call(self, f, *args, **kwargs):
|
||||||
|
return self.copy(f(self.m, *args, **kwargs))
|
||||||
|
|
||||||
|
def apply(self, f, *args, **kwargs):
|
||||||
|
f(self.m, *args, **kwargs)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def copy(self, data=None):
|
||||||
|
if data is None:
|
||||||
|
return RfMat(self.m.copy(), self.frame_meta, self.seq_meta)
|
||||||
|
return RfMat(data, self.frame_meta, self.seq_meta)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def p(self):
|
||||||
|
if self.device == 'cpu':
|
||||||
|
return np
|
||||||
|
return cp
|
||||||
|
|
||||||
|
def init_cv(self):
|
||||||
|
cv2.namedWindow('image')
|
||||||
|
self.cv = True
|
||||||
|
|
||||||
|
def norm(self):
|
||||||
|
m = self.m.astype(self.p.float32)
|
||||||
|
m -= m.min()
|
||||||
|
mmax = m.max()
|
||||||
|
if mmax == 0:
|
||||||
|
return self.copy(self.p.zeros_like(m))
|
||||||
|
m /= mmax
|
||||||
|
return self.copy(m)
|
||||||
|
|
||||||
|
def grey(self):
|
||||||
|
m = self.norm().m
|
||||||
|
return self.copy((m * 255).astype(self.p.uint8))
|
||||||
|
|
||||||
|
def cpu(self):
|
||||||
|
if self.device == 'cpu':
|
||||||
|
return self
|
||||||
|
return self.copy(self.m.get())
|
||||||
|
|
||||||
|
def watermark(self, watermark=None):
|
||||||
|
assert self.m.dtype == np.uint8
|
||||||
|
canvas = np.zeros(self.m.shape, dtype=np.uint8)
|
||||||
|
ccp = self.copy()
|
||||||
|
|
||||||
|
line1 = ''
|
||||||
|
line2 = ''
|
||||||
|
if watermark is not None:
|
||||||
|
line1 = watermark
|
||||||
|
else:
|
||||||
|
if self.frame_meta is not None:
|
||||||
|
line1 = self.frame_meta.name
|
||||||
|
if self.seq_meta is not None:
|
||||||
|
line2 = self.seq_meta.name
|
||||||
|
|
||||||
|
cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, 4, (255,), 8)
|
||||||
|
cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, 4, (128,), 4)
|
||||||
|
|
||||||
|
cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, 4, (255,), 8)
|
||||||
|
cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, 4, (128,), 4)
|
||||||
|
|
||||||
|
if canvas.shape.__len__() == 2:
|
||||||
|
ccp.m[canvas == 255] = 255
|
||||||
|
ccp.m[canvas == 128] = 0
|
||||||
|
elif canvas.shape[2] == 3:
|
||||||
|
ccp.m[canvas[:, :] == 255] = 255
|
||||||
|
ccp.m[canvas[:, :] == 128] = 0
|
||||||
|
else:
|
||||||
|
raise NotImplementedError()
|
||||||
|
return ccp
|
||||||
|
|
||||||
|
def show(self, shape=None, watermark=None):
|
||||||
|
if not self.cv:
|
||||||
|
self.init_cv()
|
||||||
|
cv2.imshow('image', self
|
||||||
|
.grey()
|
||||||
|
.cpu()
|
||||||
|
.resize(shape, bypass=shape is None)
|
||||||
|
.watermark(watermark).m
|
||||||
|
)
|
||||||
|
return cv2.waitKey(0)
|
||||||
|
|
||||||
|
def info(self):
|
||||||
|
print(f'shape: {self.m.shape},device: {self.device}')
|
||||||
|
print(self.frame_meta)
|
||||||
|
print(self.seq_meta)
|
||||||
|
return self
|
||||||
|
|
||||||
|
@bypass
|
||||||
|
def resize(self, shape):
|
||||||
|
if self.device == 'cpu':
|
||||||
|
return self.copy(cv2.resize(self.m, shape))
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def rotate90(self):
|
||||||
|
return self.copy(self.p.rot90(self.m, k=3))
|
||||||
Loading…
Reference in New Issue
Block a user