flandre/flandre/utils/RfMat.py
2025-04-15 21:18:04 +08:00

340 lines
8.7 KiB
Python

import inspect
import cupyx
import cv2
import matplotlib
import numpy as np
import cupy as cp
import cupyx
import cupyx.scipy.fft
import cupyx.scipy.ndimage
import cupyx.scipy.signal
import cv2
import scipy
import scipy.signal
from cupyx.scipy.fft import dctn, idctn
from scipy.stats import norm as norms
from flandre.utils.RfFile import RfFrame, RfSequenceMeta
from flandre.utils.RfMeta import RfFrameMeta
def hsv_to_rgb(hsv):
"""
Convert HSV values to RGB.
Parameters
----------
hsv : (..., 3) array-like
All values assumed to be in range [0, 1]
Returns
-------
(..., 3) `~numpy.ndarray`
Colors converted to RGB values in range [0, 1]
"""
hsv = np.asarray(hsv)
# check length of the last dimension, should be _some_ sort of rgb
if hsv.shape[-1] != 3:
raise ValueError("Last dimension of input array must be 3; "
f"shape {hsv.shape} was found.")
in_shape = hsv.shape
hsv = np.array(
hsv, copy=False,
dtype=np.promote_types(hsv.dtype, np.float32), # Don't work on ints.
ndmin=2, # In case input was 1D.
)
h = hsv[..., 0]
s = hsv[..., 1]
v = hsv[..., 2]
r = np.empty_like(h)
g = np.empty_like(h)
b = np.empty_like(h)
i = (h * 6.0).astype(int)
f = (h * 6.0) - i
p = v * (1.0 - s)
q = v * (1.0 - s * f)
t = v * (1.0 - s * (1.0 - f))
idx = i % 6 == 0
r[idx] = v[idx]
g[idx] = t[idx]
b[idx] = p[idx]
# #
idx = i == 1
r[idx] = q[idx]
g[idx] = v[idx]
b[idx] = p[idx]
#
idx = i == 2
r[idx] = p[idx]
g[idx] = v[idx]
b[idx] = t[idx]
idx = i == 3
r[idx] = p[idx]
g[idx] = q[idx]
b[idx] = v[idx]
idx = i == 4
r[idx] = t[idx]
g[idx] = p[idx]
b[idx] = v[idx]
idx = i == 5
r[idx] = v[idx]
g[idx] = p[idx]
b[idx] = q[idx]
idx = s == 0
r[idx] = v[idx]
g[idx] = v[idx]
b[idx] = v[idx]
rgb = np.stack([r, g, b], axis=-1)
return rgb.reshape(in_shape)
def bypass(f):
def wrapper(self, *args, **kwargs):
if 'cond' not in kwargs:
return f(self, *args, **kwargs)
if kwargs['cond']:
del kwargs['cond']
return f(self, *args, **kwargs)
else:
return self
return wrapper
def bypassClass(original_class):
for name, f in inspect.getmembers(original_class, inspect.isfunction):
setattr(original_class, name, bypass(f))
return original_class
@bypassClass
class RfMat:
def __init__(self,
data: cp.ndarray,
frame_meta: RfFrameMeta = None,
seq_meta: RfSequenceMeta = None,
):
self.m = data
self.cv = False
self.frame_meta = frame_meta
self.seq_meta = seq_meta
if isinstance(data, np.ndarray):
self.device = 'cpu'
elif isinstance(data, cp.ndarray):
self.device = 'gpu'
else:
raise NotImplementedError
def call(self, f, *args, **kwargs):
return self.copy(f(self.m, *args, **kwargs))
def apply(self, f, *args, **kwargs):
f(self.m, *args, **kwargs)
return self
def copy(self, data=None):
if data is None:
return RfMat(self.m.copy(), self.frame_meta, self.seq_meta)
return RfMat(data, self.frame_meta, self.seq_meta)
@property
def duration(self):
return self.m.shape[1]
@property
def w(self):
return self.m.shape[1]
@property
def h(self):
return self.m.shape[0]
@property
def p(self):
if self.device == 'cpu':
return np
return cp
def __bytes__(self):
return self.m.tobytes()
def init_cv(self):
cv2.namedWindow('image')
self.cv = True
def norm(self):
m = self.m.astype(self.p.float32)
m -= m.min()
mmax = m.max()
if mmax == 0:
return self.copy(self.p.zeros_like(m))
m /= mmax
return self.copy(m)
def grey(self):
m = self.norm().m
return self.copy((m * 255).astype(self.p.uint8))
def pseudo_color(self):
m = self.norm().m * 0.7
p = self.p
h = m
s = p.zeros_like(h) + 1
v = p.zeros_like(h) + 1
hsv = p.stack((h, s, v), axis=2)
if p == cp:
rgb = hsv_to_rgb(hsv.get())
else:
rgb = hsv_to_rgb(hsv)
return self.copy((rgb * 255).astype(np.uint8))
def cpu(self):
if self.device == 'cpu':
return self
return self.copy(self.m.get())
def crop(self, t_start: int, t_end: int):
return self.copy(self.m[:, t_start:t_end])
def crop_center(self, center: float, width: float):
mmin = max(0, int(center - width / 2))
mmax = min(self.duration, int(center + width / 2))
return self.crop(mmin, mmax)
def watermark(self, watermark=None):
assert self.m.dtype == np.uint8
canvas = np.zeros(self.m.shape, dtype=np.uint8)
ccp = self.copy()
line1 = ''
line2 = ''
if watermark is not None:
line1 = watermark
else:
if self.frame_meta is not None:
line1 = self.frame_meta.name
if self.seq_meta is not None:
line2 = self.seq_meta.name
if canvas.shape.__len__() == 2:
color1 = (255,)
color2 = (128,)
elif canvas.shape[2] == 3:
color1 = (255, 255, 255)
color2 = (128, 128, 128)
else:
raise NotImplementedError()
fontsize = 2
cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, fontsize, color1, 8)
cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, fontsize, color2, 4)
cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, fontsize, color1, 8)
cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, fontsize, color2, 4)
if canvas.shape.__len__() == 2:
ccp.m[canvas == 255] = 255
ccp.m[canvas == 128] = 0
elif canvas.shape[2] == 3:
ccp.m[canvas[:, :] == 255] = 255
ccp.m[canvas[:, :] == 128] = 0
else:
raise NotImplementedError()
return ccp
def show(self, shape=None, watermark=None):
if not self.cv:
self.init_cv()
cv2.imshow('image', self
.grey()
.cpu()
.resize(shape, bypass=shape is None)
.watermark(watermark).m
)
return cv2.waitKey(0)
def info(self):
print(f'shape: {self.m.shape},device: {self.device}')
print(self.frame_meta)
print(self.seq_meta)
return self
def resize(self, shape):
if self.device == 'cpu':
return self.copy(cv2.resize(self.m, shape))
raise NotImplementedError()
def rotate90(self):
return self.copy(self.p.rot90(self.m, k=3))
def dct(self, mmin, mmax):
dct_ = scipy.fft.dct
idct = scipy.fft.idct
if self.p == cp:
dct_ = cupyx.scipy.fft.dct
idct = cupyx.scipy.fft.idct
m_dct = dct_(self.m)
if self.seq_meta.d() == 2:
m_dct[:, mmax:] = 0
m_dct[:, :mmin] = 0
elif self.seq_meta.d() == 3:
m_dct[:, :, mmax:] = 0
m_dct[:, :, :mmin] = 0
else:
raise NotImplementedError()
return self.copy(idct(m_dct))
def dct_center(self, center, bandwidth):
mmin = max(0, int(center - bandwidth / 2))
mmax = min(self.duration, int(center + bandwidth / 2))
return self.dct(mmin, mmax)
def argrelextrema(self, axis=1):
arg = scipy.signal.argrelextrema
m = self.m
p = self.p
if p == cp:
arg = cupyx.scipy.signal.argrelextrema
rm = p.zeros_like(m)
indies1 = arg(m, p.greater, axis=axis)
bl = p.zeros_like(m, dtype=p.bool_)
indies2 = arg(m, p.less, axis=axis)
bl2 = p.zeros_like(m, dtype=p.bool_)
bl[indies1] = True
bl2[indies2] = True
i1 = bl & (m > 0)
i2 = bl2 & (m < 0)
idx = i1 | i2
rm[idx] = m[idx].__abs__()
return self.copy(rm)
def conv_guass(self, b=0.01, axis=1):
cv = scipy.ndimage.convolve1d
m = self.m
p = self.p
if p == cp:
cv = cupyx.scipy.ndimage.convolve1d
rv = norms(loc=0, scale=b)
x2 = np.arange(-1, 1.1, 0.1)
w = rv.pdf(x2)
if p == cp:
w = cp.asarray(w)
rm = cv(m, w, axis=axis)
return self.copy(rm)
if __name__ == '__main__':
cp.zeros((1, 2, 3)) + 1