feat: meta optional hint

This commit is contained in:
flandre 2025-06-12 15:17:50 +08:00
parent 65c2254676
commit 1d1407880e
6 changed files with 133 additions and 43 deletions

31
draft/type_hint_none.py Normal file
View File

@ -0,0 +1,31 @@
from typing import Annotated, get_type_hints
from dataclasses import dataclass
import typing
@dataclass
class RfFrameMeta:
encoder: Annotated[int, "E"] = None
encoder2: Annotated[int, "E"] | None = None
# @classmethod
# def p2a(cls):
# gh = get_type_hints(cls, include_extras=True)
# return {k: gh[k].__metadata__[0] for k in gh}
gh = get_type_hints(RfFrameMeta, include_extras=True)
# print({k: gh[k].__metadata__[0] for k in gh})
def is_optional(field: type):
return typing.get_origin(field) is typing.Union and type(None) in typing.get_args(
field
)
for k, v in gh.items():
if is_optional(v):
print(k, typing.get_args(v)[0])
else:
print(k, v)

36
draft/type_hint_tuple.py Normal file
View File

@ -0,0 +1,36 @@
from dataclasses import dataclass
from enum import Enum
import types
from typing import Annotated, get_type_hints
import typing
a = tuple[int, ...]
@dataclass
class T:
shape: Annotated[tuple[int, ...], "S"] | None = None
def is_optional(field: type):
return typing.get_origin(field) is typing.Union and type(None) in typing.get_args(
field
)
gh = get_type_hints(T)
d: dict[str, type] = dict()
for k, v in gh.items():
if is_optional(v):
d[k] = typing.get_args(v)[0]
print(typing.get_args(v)[0])
else:
d[k] = v
# a = d["shape"]
print(type(a))
print(a)
print(issubclass(a, Enum))
print(issubclass(a, tuple))
print(typing.get_origin(a))
print(isinstance(tuple[...], types.GenericAlias))

View File

@ -31,7 +31,7 @@ logger = logging.getLogger(__name__)
class Beamformer(Node):
topics = [BeamformerMsg]
def __init__(self, level=logging.INFO):
def __init__(self, level: int = logging.INFO):
super(Beamformer, self).__init__(level=level)
self.tfm = None
self.isalive = True
@ -40,7 +40,7 @@ class Beamformer(Node):
def custom_setup(self):
pass
def process_pwi(self, data: RfMat, arg: ImageArgMsg, pwi):
def process_pwi(self, data: RfMat | None, arg: ImageArgMsg, pwi):
if data is None:
return
d2 = (
@ -120,14 +120,9 @@ class Beamformer(Node):
mat = RfMat.from_rf_frame(rf_frame_msg.rf_frame, "gpu")
# logger.info(mat.frame_meta.blake2b)
if mat is None:
logger.warning(
f"{rf_frame_msg.rf_frame.seq_meta.prod()} , {rf_frame_msg.rf_frame.__bytes__().__len__() // 2}"
)
logger.warning("mat is None")
continue
last_blake2b = mat.frame_meta.blake2b
if mat is None:
continue
if arg_msg.v2 != last_v2 or arg_msg.f_rows != last_f_rows:
last_v2 = arg_msg.v2
last_f_rows = arg_msg.f_rows

View File

@ -417,7 +417,6 @@ class RGB888Msg(HeaderByteMsg):
class RfMatMsg(HeaderByteMsg):
def __init__(self, rfmat: "RfMat"):
self.rfmat = rfmat
super().__init__(
dict(
frame_meta=rfmat.frame_meta.name,

View File

@ -126,6 +126,7 @@ class RfMat:
seq_meta = frame.seq_meta
else:
raise NotImplementedError()
assert seq_meta.shape
if seq_meta.prod() != frame.__bytes__().__len__() // 2:
return None
m = (

View File

@ -1,31 +1,52 @@
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
import types
from typing import Annotated, get_type_hints
import typing
COMMIT_KEY = "COMMIT"
def is_optional(field: type):
return typing.get_origin(field) is typing.Union and type(None) in typing.get_args(
field
)
class RfMeta:
@classmethod
def p2a(clz):
gh = get_type_hints(clz, include_extras=True)
return {k: gh[k].__metadata__[0] for k in gh}
def p2a(cls):
gh = get_type_hints(cls, include_extras=True)
d: dict[str, type] = dict()
for k, v in gh.items():
if is_optional(v):
d[k] = typing.get_args(v)[0].__metadata__[0]
else:
d[k] = v.__metadata__[0]
return d
@classmethod
def p2t(clz):
return get_type_hints(clz)
def p2t(cls):
gh = get_type_hints(cls)
d: dict[str, type] = dict()
for k, v in gh.items():
if is_optional(v):
d[k] = typing.get_args(v)[0]
else:
d[k] = v
return d
@classmethod
def a2p(clz):
return {v: k for k, v in clz.p2a().items()}
def a2p(cls):
return {v: k for k, v in cls.p2a().items()}
@property
def name(self):
p2a = self.p2a()
p2t = self.p2t()
a2p = self.a2p()
arr = []
arr: list[str] = []
if COMMIT_KEY in a2p:
cp = a2p[COMMIT_KEY]
del p2a[cp]
@ -33,6 +54,8 @@ class RfMeta:
for p in p2a:
t = p2t[p]
v = self.__getattribute__(p)
if isinstance(t, types.GenericAlias):
t = typing.get_origin(t)
if issubclass(t, Enum):
vs = v.name
elif issubclass(t, tuple):
@ -43,28 +66,28 @@ class RfMeta:
arr.append(f"{p2a[p]}={vs}")
return ",".join(arr)
def filename(self, ts=0):
def filename(self, ts: int = 0):
return self.name + f".{ts}bin"
@classmethod
def from_path(clz, path: Path | str):
def from_path(cls, path: Path | str):
path = Path(path)
if not path.exists():
raise FileNotFoundError(path)
if path.is_file():
return clz.from_name(path.stem)
return cls.from_name(path.stem)
elif path.is_dir():
return clz.from_name(path.name)
return cls.from_name(path.name)
else:
raise FileNotFoundError
@classmethod
def from_name(clz, name: str):
def from_name(cls, name: str):
if Path(name).suffix == ".zip":
name = Path(name).stem
p2t = clz.p2t()
a2p = clz.a2p()
c = clz()
p2t = cls.p2t()
a2p = cls.a2p()
c = cls()
sp = name.split(",")
if COMMIT_KEY in a2p:
c.__setattr__(a2p[COMMIT_KEY], sp.pop(0))
@ -72,6 +95,9 @@ class RfMeta:
a, v = pv.split("=")
p = a2p[a]
t = p2t[p]
if isinstance(t, types.GenericAlias):
t = typing.get_origin(t)
# print(t, issubclass(t, tuple))
if issubclass(t, Enum):
c.__setattr__(p, t[v])
elif issubclass(t, tuple):
@ -84,25 +110,25 @@ class RfMeta:
@dataclass
class RfFrameMeta(RfMeta):
encoder: Annotated[int, "E"] = None
sequence_id: Annotated[int, "S"] = None # test3
robot_x: Annotated[int, "X"] = None
robot_y: Annotated[int, "Y"] = None
robot_z: Annotated[int, "Z"] = None
encoder: Annotated[int, "E"] | None = None
sequence_id: Annotated[int, "S"] | None = None
robot_x: Annotated[int, "X"] | None = None
robot_y: Annotated[int, "Y"] | None = None
robot_z: Annotated[int, "Z"] | None = None
robot_rx: Annotated[int, "RX"] = None
robot_ry: Annotated[int, "RY"] = None
robot_rz: Annotated[int, "RZ"] = None
robot_rx: Annotated[int, "RX"] | None = None
robot_ry: Annotated[int, "RY"] | None = None
robot_rz: Annotated[int, "RZ"] | None = None
robot_force_x: Annotated[int, "FX"] = None
robot_force_y: Annotated[int, "FY"] = None
robot_force_z: Annotated[int, "FZ"] = None
robot_force_x: Annotated[int, "FX"] | None = None
robot_force_y: Annotated[int, "FY"] | None = None
robot_force_z: Annotated[int, "FZ"] | None = None
robot_force_rx: Annotated[int, "FRX"] = None
robot_force_ry: Annotated[int, "FRY"] = None
robot_force_rz: Annotated[int, "FRZ"] = None
robot_force_rx: Annotated[int, "FRX"] | None = None
robot_force_ry: Annotated[int, "FRY"] | None = None
robot_force_rz: Annotated[int, "FRZ"] | None = None
blake2b: Annotated[str, "B2B"] = None
blake2b: Annotated[str, "B2B"] | None = None
@dataclass
@ -111,18 +137,20 @@ class RfSequenceMeta(RfMeta):
PWI = auto()
TFM = auto()
commit: Annotated[str, COMMIT_KEY] = None
shape: Annotated[tuple, "S"] = None
commit: Annotated[str, COMMIT_KEY] | None = None
shape: Annotated[tuple[int, ...], "S"] | None = None
mode: Annotated[RfSequenceMode, "M"] = RfSequenceMode.PWI
us: Annotated[int, "U"] = None
us: Annotated[int, "U"] | None = None
def prod(self):
assert self.shape
res = 1
for i in self.shape:
res *= int(i)
return res
def d(self):
assert self.shape
return self.shape.__len__()