flandre/src/utils/RfFile.py

249 lines
6.4 KiB
Python
Raw Normal View History

2025-01-13 14:21:01 +08:00
from enum import Enum, auto
2025-01-06 11:21:04 +08:00
from pathlib import Path
2025-01-13 14:21:01 +08:00
from typing import Annotated, get_type_hints
2025-01-06 11:21:04 +08:00
2025-01-16 15:14:53 +08:00
import cupy as cp
import numpy as np
2025-01-06 11:21:04 +08:00
from attr import dataclass
@dataclass
class RfFolder:
parent: Path
D: str
L: int = 30
C: str = 'PAR'
2025-01-13 14:21:01 +08:00
SHAPE = (1500, 256)
2025-01-06 11:21:04 +08:00
@staticmethod
def from_path(p: Path | str) -> 'RfFolder':
p = Path(p)
D, L, C = p.name.split(',')
L = int(L.replace('L=', ''))
C = C.replace('C=', '')
return RfFolder(p.parent, D, L, C)
@property
def name(self):
return f'{self.D},L={self.L},C={self.C}'
@property
def path(self):
return self.parent / self.name
@property
def all(self):
return [RfFile.from_path(f) for f in self.path.glob('*.bin')]
@dataclass
class RfFile:
folder: RfFolder
X: int = None
Y: int = None
Z: int = None
S: int = None
E: int = None
@staticmethod
def from_path(p: Path | str) -> 'RfFile':
p = Path(p)
folder = RfFolder.from_path(p.parent)
return RfFile(folder, **{(c := pair.split('='))[0]: int(c[1]) for pair in p.stem.split(',')})
@property
def path(self):
arr = []
d = self.__dict__.copy()
# print(d)
d.__delitem__('folder')
for k in d:
if d[k] is not None:
arr.append(f'{k}={d[k]}')
filename = ','.join(arr) + '.bin'
# print(filename)
return self.folder.path / filename
2025-01-13 14:21:01 +08:00
@property
def bin(self):
return self.path.read_bytes()
2025-01-06 11:21:04 +08:00
def test2():
r = RfFile.from_path('/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/TEST1,L=30,C=PAR/S=925,E=4.bin')
print(r)
2025-01-13 14:21:01 +08:00
COMMIT_KEY = 'COMMIT'
class SM:
@classmethod
@property
def p2a(clz):
gh = get_type_hints(clz, include_extras=True)
return {k: gh[k].__metadata__[0] for k in gh}
@classmethod
@property
def p2t(clz):
return get_type_hints(clz)
@classmethod
@property
def a2p(clz):
return {v: k for k, v in clz.p2a.items()}
@property
def name(self):
p2a = self.p2a
p2t = self.p2t
a2p = self.a2p
arr = []
if COMMIT_KEY in a2p:
cp = a2p[COMMIT_KEY]
del p2a[cp]
arr.append(f'{self.__getattribute__(cp)}')
for p in p2a:
t = p2t[p]
v = self.__getattribute__(p)
if issubclass(t, Enum):
vs = v.name
elif issubclass(t, tuple):
vs = f'({' '.join([str(vv) for vv in v])})'
else:
vs = str(v)
2025-01-16 15:14:53 +08:00
if v is not None:
arr.append(f'{p2a[p]}={vs}')
return ",".join(arr)
@property
def filename(self):
return self.name + '.bin'
@classmethod
def from_path(clz, path: Path | str):
path = Path(path)
if not path.exists():
raise FileNotFoundError(path)
if path.is_file():
return clz.from_name(path.stem)
elif path.is_dir():
return clz.from_name(path.name)
else:
raise FileNotFoundError
2025-01-13 14:21:01 +08:00
@classmethod
def from_name(clz, name: str):
p2t = clz.p2t
a2p = clz.a2p
c = clz()
2025-01-16 15:14:53 +08:00
sp = name.split(',')
2025-01-13 14:21:01 +08:00
if COMMIT_KEY in a2p:
c.__setattr__(a2p[COMMIT_KEY], sp.pop(0))
for pv in sp:
a, v = pv.split('=')
p = a2p[a]
t = p2t[p]
if issubclass(t, Enum):
c.__setattr__(p, t[v])
elif issubclass(t, tuple):
c.__setattr__(p, tuple(int(i) for i in v[1:-1].split(' ')))
else:
c.__setattr__(p, t(v))
return c
class RfFrame:
@dataclass
class RfFrameMeta(SM):
encoder: Annotated[int, 'E'] = None
sequence_id: Annotated[int, 'S'] = None # test3
robot_x: Annotated[int, 'X'] = None
robot_y: Annotated[int, 'Y'] = None
robot_z: Annotated[int, 'Z'] = None
2025-01-16 15:14:53 +08:00
def __init__(self, data: bytes | Path, meta: RfFrameMeta, seq: 'RfSequence' = None):
2025-01-13 14:21:01 +08:00
self.data = data
self.meta = meta
2025-01-16 15:14:53 +08:00
self._seq = seq
2025-01-13 14:21:01 +08:00
def save(self, folder: Path):
(folder / self.meta.name).write_bytes(self.bytes)
2025-01-16 15:14:53 +08:00
@property
def seq(self):
if self._seq is None and isinstance(self.data, Path):
return RfSequence.from_folder(self.data.parent)
return self._seq
2025-01-13 14:21:01 +08:00
@property
def bytes(self):
if isinstance(self.data, bytes):
return self.data
if isinstance(self.data, Path):
return self.data.read_bytes()
2025-01-16 15:14:53 +08:00
def mat(self, device='gpu'):
from utils.RfMat import RfMat
if device == 'gpu':
arr = cp.frombuffer(self.bytes, dtype=cp.int16)
else:
arr = np.frombuffer(self.bytes, dtype=np.int16)
return RfMat(arr.reshape(self.seq.meta.shape), frame_meta=self.meta, seq_meta=self.seq.meta)
2025-01-13 14:21:01 +08:00
@dataclass
class RfSequenceMeta(SM):
class RfSequenceMode(Enum):
PWI = auto()
TFM = auto()
commit: Annotated[int, COMMIT_KEY] = None
shape: Annotated[tuple, 'S'] = None
mode: Annotated[RfSequenceMode, 'M'] = RfSequenceMode.PWI
us: Annotated[int, 'U'] = None
class RfSequence:
def __init__(self, frames: list[RfFrame], meta: RfSequenceMeta):
self.frames = frames
self.meta = meta
@classmethod
def from_folder(cls, folder: Path | str) -> 'RfSequence':
folder = Path(folder)
2025-01-16 15:14:53 +08:00
if not folder.exists():
raise FileNotFoundError
meta = RfSequenceMeta.from_path(folder)
rs = RfSequence([], meta)
2025-01-13 14:21:01 +08:00
for f in folder.glob('*.bin'):
2025-01-16 15:14:53 +08:00
rs.frames.append(RfFrame(f, RfFrame.RfFrameMeta.from_path(f), seq=rs))
return rs
2025-01-13 14:21:01 +08:00
@property
def all(self):
pass
def query(self):
pass
2025-01-06 11:21:04 +08:00
if __name__ == '__main__':
2025-01-16 15:14:53 +08:00
# t = (1, 2)
# f = RfSequenceMeta.from_name('123123,U=321,S=(1 2 3),M=PWI')
2025-01-13 14:21:01 +08:00
# print(f.commit)
2025-01-16 15:14:53 +08:00
# print(f.name)
2025-01-13 14:21:01 +08:00
# print(RfSequence.RfSequenceMeta.p2t)
# f = RfFrame.RfFrameMeta(123, 345)
# print(f.name)
2025-01-16 15:14:53 +08:00
# rs = RfSequence([], RfSequenceMeta())
rs = RfSequence.from_folder(
'/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1.1,U=30,M=PWI,S=(256 1502)')
# print(rs.meta)
for frame in rs.frames:
if frame.mat().rotate90().show((1080, 1920)) == ord('q'):
break