flandre/flandre/utils/RfMat.py
2025-04-16 23:31:52 +08:00

412 lines
11 KiB
Python

import inspect
import cupyx
import cv2
import matplotlib
import numpy as np
import cupy as cp
import cupyx
import cupyx.scipy.fft
import cupyx.scipy.ndimage
import cupyx.scipy.signal
import cv2
import scipy
import scipy.signal
from cupyx.scipy.fft import dctn, idctn
from scipy.stats import norm as norms
from flandre.utils.RfFrame import RfFrame, RfFrameFile, RfFrameMemory
from flandre.utils.RfMeta import RfFrameMeta, RfSequenceMeta
def hsv_to_rgb(hsv):
"""
Convert HSV values to RGB.
Parameters
----------
hsv : (..., 3) array-like
All values assumed to be in range [0, 1]
Returns
-------
(..., 3) `~numpy.ndarray`
Colors converted to RGB values in range [0, 1]
"""
hsv = np.asarray(hsv)
# check length of the last dimension, should be _some_ sort of rgb
if hsv.shape[-1] != 3:
raise ValueError("Last dimension of input array must be 3; "
f"shape {hsv.shape} was found.")
in_shape = hsv.shape
hsv = np.array(
hsv, copy=False,
dtype=np.promote_types(hsv.dtype, np.float32), # Don't work on ints.
ndmin=2, # In case input was 1D.
)
h = hsv[..., 0]
s = hsv[..., 1]
v = hsv[..., 2]
r = np.empty_like(h)
g = np.empty_like(h)
b = np.empty_like(h)
i = (h * 6.0).astype(int)
f = (h * 6.0) - i
p = v * (1.0 - s)
q = v * (1.0 - s * f)
t = v * (1.0 - s * (1.0 - f))
idx = i % 6 == 0
r[idx] = v[idx]
g[idx] = t[idx]
b[idx] = p[idx]
# #
idx = i == 1
r[idx] = q[idx]
g[idx] = v[idx]
b[idx] = p[idx]
#
idx = i == 2
r[idx] = p[idx]
g[idx] = v[idx]
b[idx] = t[idx]
idx = i == 3
r[idx] = p[idx]
g[idx] = q[idx]
b[idx] = v[idx]
idx = i == 4
r[idx] = t[idx]
g[idx] = p[idx]
b[idx] = v[idx]
idx = i == 5
r[idx] = v[idx]
g[idx] = p[idx]
b[idx] = q[idx]
idx = s == 0
r[idx] = v[idx]
g[idx] = v[idx]
b[idx] = v[idx]
rgb = np.stack([r, g, b], axis=-1)
return rgb.reshape(in_shape)
def bypass(f):
def wrapper(self, *args, **kwargs):
if 'cond' not in kwargs:
return f(self, *args, **kwargs)
if kwargs['cond']:
del kwargs['cond']
return f(self, *args, **kwargs)
else:
return self
return wrapper
def bypassClass(original_class):
for name, f in inspect.getmembers(original_class, inspect.isfunction):
setattr(original_class, name, bypass(f))
return original_class
@bypassClass
class RfMat:
@staticmethod
def from_rf_frame(frame: RfFrame, device='cpu'):
if isinstance(frame, RfFrameFile):
seq_meta = frame.seq.meta
elif isinstance(frame, RfFrameMemory):
seq_meta = frame.seq_meta
else:
raise NotImplementedError()
m = np.frombuffer(frame.__bytes__(), dtype=np.int16).reshape(seq_meta.shape).copy()
if device == 'gpu':
m = cp.asarray(m)
return RfMat(m, frame.meta, seq_meta)
def __init__(self,
data: cp.ndarray,
frame_meta: RfFrameMeta = None,
seq_meta: RfSequenceMeta = None,
):
self.m = data
self.cv = False
self.frame_meta = frame_meta
self.seq_meta = seq_meta
if isinstance(data, np.ndarray):
self.device = 'cpu'
elif isinstance(data, cp.ndarray):
self.device = 'gpu'
else:
raise NotImplementedError
def call(self, f, *args, **kwargs):
return self.copy(f(self.m, *args, **kwargs))
def apply(self, f, *args, **kwargs):
f(self.m, *args, **kwargs)
return self
def copy(self, data=None):
if data is None:
return RfMat(self.m.copy(), self.frame_meta, self.seq_meta)
return RfMat(data, self.frame_meta, self.seq_meta)
@property
def duration(self):
return self.m.shape[1]
@property
def w(self):
return self.m.shape[1]
@property
def h(self):
return self.m.shape[0]
@property
def p(self):
if self.device == 'cpu':
return np
return cp
def __bytes__(self):
return self.m.tobytes()
def init_cv(self):
cv2.namedWindow('image')
self.cv = True
def norm(self):
m = self.m.astype(self.p.float32)
m -= m.min()
mmax = m.max()
if mmax == 0:
return self.copy(self.p.zeros_like(m))
m /= mmax
return self.copy(m)
def grey(self):
m = self.norm().m
return self.copy((m * 255).astype(self.p.uint8))
def pseudo_color(self):
m = self.norm().m * 0.7
p = self.p
h = m
s = p.zeros_like(h) + 1
v = p.zeros_like(h) + 1
hsv = p.stack((h, s, v), axis=2)
if p == cp:
rgb = hsv_to_rgb(hsv.get())
else:
rgb = hsv_to_rgb(hsv)
return self.copy((rgb * 255).astype(np.uint8))
def cpu(self):
if self.device == 'cpu':
return self
return self.copy(self.m.get())
def crop(self, t_start: int, t_end: int):
return self.copy(self.m[:, t_start:t_end])
def crop_center(self, center: float, width: float):
mmin = max(0, int(center - width / 2))
mmax = min(self.duration, int(center + width / 2))
mmin = min(mmin, self.duration - width)
mmax = max(mmax, width)
return self.crop(mmin, mmax)
def watermark(self, watermark=None):
assert self.m.dtype == np.uint8
canvas = np.zeros(self.m.shape, dtype=np.uint8)
ccp = self.copy()
line1 = ''
line2 = ''
if watermark is not None:
line1 = watermark
else:
if self.frame_meta is not None:
line1 = self.frame_meta.name
if self.seq_meta is not None:
line2 = self.seq_meta.name
if canvas.shape.__len__() == 2:
color1 = (255,)
color2 = (128,)
elif canvas.shape[2] == 3:
color1 = (255, 255, 255)
color2 = (128, 128, 128)
else:
raise NotImplementedError()
fontsize = 2
cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, fontsize, color1, 8)
cv2.putText(canvas, line1, (0, 60), cv2.FONT_HERSHEY_PLAIN, fontsize, color2, 4)
cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, fontsize, color1, 8)
cv2.putText(canvas, line2, (0, 120), cv2.FONT_HERSHEY_PLAIN, fontsize, color2, 4)
if canvas.shape.__len__() == 2:
ccp.m[canvas == 255] = 255
ccp.m[canvas == 128] = 0
elif canvas.shape[2] == 3:
ccp.m[canvas[:, :] == 255] = 255
ccp.m[canvas[:, :] == 128] = 0
else:
raise NotImplementedError()
return ccp
def show(self, shape=None, watermark=None):
if not self.cv:
self.init_cv()
cv2.imshow('image', self
.grey()
.cpu()
.resize(shape, bypass=shape is None)
.watermark(watermark).m
)
return cv2.waitKey(0)
def info(self):
print(f'shape: {self.m.shape},device: {self.device}')
print(self.frame_meta)
print(self.seq_meta)
return self
def resize(self, shape):
if self.device == 'cpu':
return self.copy(cv2.resize(self.m, shape))
raise NotImplementedError()
def rotate90(self):
return self.copy(self.p.rot90(self.m, k=3))
def dct(self, mmin, mmax):
dct_ = scipy.fft.dct
idct = scipy.fft.idct
if self.p == cp:
dct_ = cupyx.scipy.fft.dct
idct = cupyx.scipy.fft.idct
m_dct = dct_(self.m)
if self.seq_meta.d() == 2:
m_dct[:, mmax:] = 0
m_dct[:, :mmin] = 0
elif self.seq_meta.d() == 3:
m_dct[:, :, mmax:] = 0
m_dct[:, :, :mmin] = 0
else:
raise NotImplementedError()
return self.copy(idct(m_dct))
def dct_center(self, center, bandwidth):
mmin = max(0, int(center - bandwidth / 2))
mmax = min(self.duration, int(center + bandwidth / 2))
return self.dct(mmin, mmax)
def argrelextrema(self, axis=1):
arg = scipy.signal.argrelextrema
m = self.m
p = self.p
if p == cp:
arg = cupyx.scipy.signal.argrelextrema
rm = p.zeros_like(m)
indies1 = arg(m, p.greater, axis=axis)
bl = p.zeros_like(m, dtype=p.bool_)
indies2 = arg(m, p.less, axis=axis)
bl2 = p.zeros_like(m, dtype=p.bool_)
bl[indies1] = True
bl2[indies2] = True
i1 = bl & (m > 0)
i2 = bl2 & (m < 0)
idx = i1 | i2
rm[idx] = m[idx].__abs__()
return self.copy(rm)
def conv_guass(self, b=0.01, axis=1):
cv = scipy.ndimage.convolve1d
m = self.m
p = self.p
if p == cp:
cv = cupyx.scipy.ndimage.convolve1d
rv = norms(loc=0, scale=b)
x2 = np.arange(-1, 1.1, 0.1)
w = rv.pdf(x2)
if p == cp:
w = cp.asarray(w)
rm = cv(m, w, axis=axis)
return self.copy(rm)
def time_gain_compensation_linear_float(self, scale: float, start: int = 0):
h = self.m.shape[-1]
addend = self.p.zeros((1, h), dtype=self.p.float32)
addend[:, start:] = (self.p.arange(h - start) * scale) + 1
return self.copy(self.m * addend)
def time_gain_compensation_linear(self, scale: float, start: int = 0):
self.m = self.m.astype(np.float32)
h = self.m.shape[-1]
addend = self.p.zeros((1, h), dtype=np.int64)
addend[:, start:] = self.p.arange(h - start) * scale
self.m[:, start:] *= addend
s2 = self.m[:, h - 500:]
m1 = self.m[:, h - 500:].max()
m2 = m1 * scale
s2[s2 > m2] = m2
self.m[:, h - 500:] = s2 * (1 / scale)
return self
def time_gain_compensation(self, scales: list[float]):
self.m = self.m.astype(np.float32)
h = self.m.shape[-1]
block = h // scales.__len__()
ssss = 0
mmax = self.m.max()
for scale in scales:
s2 = self.m[:, ssss:ssss + block]
new_max = mmax * scale
s2[s2 > new_max] = new_max
self.m[:, ssss:ssss + block] = s2 * (mmax / new_max)
ssss += block
return self
def time_gain_compensation_linear_max(self, scale: float, mmax: int | None = None, start: int = 0):
if scale == 0:
return self
if mmax is None:
mmax = self.m.max()
h = self.m.shape[-1]
self.m = self.m.astype(np.float64)
mmax_arr = self.p.zeros(h) + mmax
mmax_arr[start:] -= self.p.arange(h - start) * scale
for i in range(h):
# a[1, a[1, :] > 99] = 99
self.m[self.m[:, i] > mmax_arr[i], i] = mmax_arr[i]
self.m[:, i] *= (mmax / mmax_arr[i])
self.m[self.m > mmax] = mmax
self.m = self.m.astype(np.int64)
return self
def jupyter(self):
from matplotlib import pyplot as plt
plt.figure(figsize=(40, 20))
plt.imshow(self.m, cmap='grey')
if __name__ == '__main__':
cp.zeros((1, 2, 3)) + 1