From 1d1407880e562cd4d5b19e8cf22aee93dd662e00 Mon Sep 17 00:00:00 2001 From: flandre Date: Thu, 12 Jun 2025 15:17:50 +0800 Subject: [PATCH] feat: meta optional hint --- draft/type_hint_none.py | 31 ++++++++++++ draft/type_hint_tuple.py | 36 ++++++++++++++ flandre/nodes/Beamformer.py | 11 ++--- flandre/utils/Msg.py | 1 - flandre/utils/RfMat.py | 1 + flandre/utils/RfMeta.py | 96 ++++++++++++++++++++++++------------- 6 files changed, 133 insertions(+), 43 deletions(-) create mode 100644 draft/type_hint_none.py create mode 100644 draft/type_hint_tuple.py diff --git a/draft/type_hint_none.py b/draft/type_hint_none.py new file mode 100644 index 0000000..f2273bf --- /dev/null +++ b/draft/type_hint_none.py @@ -0,0 +1,31 @@ +from typing import Annotated, get_type_hints + +from dataclasses import dataclass +import typing + + +@dataclass +class RfFrameMeta: + encoder: Annotated[int, "E"] = None + encoder2: Annotated[int, "E"] | None = None + + +# @classmethod +# def p2a(cls): +# gh = get_type_hints(cls, include_extras=True) +# return {k: gh[k].__metadata__[0] for k in gh} +gh = get_type_hints(RfFrameMeta, include_extras=True) +# print({k: gh[k].__metadata__[0] for k in gh}) + + +def is_optional(field: type): + return typing.get_origin(field) is typing.Union and type(None) in typing.get_args( + field + ) + + +for k, v in gh.items(): + if is_optional(v): + print(k, typing.get_args(v)[0]) + else: + print(k, v) diff --git a/draft/type_hint_tuple.py b/draft/type_hint_tuple.py new file mode 100644 index 0000000..9a59ee9 --- /dev/null +++ b/draft/type_hint_tuple.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from enum import Enum +import types +from typing import Annotated, get_type_hints +import typing + +a = tuple[int, ...] + + +@dataclass +class T: + shape: Annotated[tuple[int, ...], "S"] | None = None + + +def is_optional(field: type): + return typing.get_origin(field) is typing.Union and type(None) in typing.get_args( + field + ) + + +gh = get_type_hints(T) +d: dict[str, type] = dict() +for k, v in gh.items(): + if is_optional(v): + d[k] = typing.get_args(v)[0] + print(typing.get_args(v)[0]) + else: + d[k] = v + +# a = d["shape"] +print(type(a)) +print(a) +print(issubclass(a, Enum)) +print(issubclass(a, tuple)) +print(typing.get_origin(a)) +print(isinstance(tuple[...], types.GenericAlias)) diff --git a/flandre/nodes/Beamformer.py b/flandre/nodes/Beamformer.py index eaaa49f..4794a99 100644 --- a/flandre/nodes/Beamformer.py +++ b/flandre/nodes/Beamformer.py @@ -31,7 +31,7 @@ logger = logging.getLogger(__name__) class Beamformer(Node): topics = [BeamformerMsg] - def __init__(self, level=logging.INFO): + def __init__(self, level: int = logging.INFO): super(Beamformer, self).__init__(level=level) self.tfm = None self.isalive = True @@ -40,7 +40,7 @@ class Beamformer(Node): def custom_setup(self): pass - def process_pwi(self, data: RfMat, arg: ImageArgMsg, pwi): + def process_pwi(self, data: RfMat | None, arg: ImageArgMsg, pwi): if data is None: return d2 = ( @@ -120,14 +120,9 @@ class Beamformer(Node): mat = RfMat.from_rf_frame(rf_frame_msg.rf_frame, "gpu") # logger.info(mat.frame_meta.blake2b) if mat is None: - logger.warning( - f"{rf_frame_msg.rf_frame.seq_meta.prod()} , {rf_frame_msg.rf_frame.__bytes__().__len__() // 2}" - ) + logger.warning("mat is None") continue - last_blake2b = mat.frame_meta.blake2b - if mat is None: - continue if arg_msg.v2 != last_v2 or arg_msg.f_rows != last_f_rows: last_v2 = arg_msg.v2 last_f_rows = arg_msg.f_rows diff --git a/flandre/utils/Msg.py b/flandre/utils/Msg.py index f3decac..8a07ea6 100644 --- a/flandre/utils/Msg.py +++ b/flandre/utils/Msg.py @@ -417,7 +417,6 @@ class RGB888Msg(HeaderByteMsg): class RfMatMsg(HeaderByteMsg): def __init__(self, rfmat: "RfMat"): self.rfmat = rfmat - super().__init__( dict( frame_meta=rfmat.frame_meta.name, diff --git a/flandre/utils/RfMat.py b/flandre/utils/RfMat.py index c02c2fd..86d1b78 100644 --- a/flandre/utils/RfMat.py +++ b/flandre/utils/RfMat.py @@ -126,6 +126,7 @@ class RfMat: seq_meta = frame.seq_meta else: raise NotImplementedError() + assert seq_meta.shape if seq_meta.prod() != frame.__bytes__().__len__() // 2: return None m = ( diff --git a/flandre/utils/RfMeta.py b/flandre/utils/RfMeta.py index 2449529..73c658b 100644 --- a/flandre/utils/RfMeta.py +++ b/flandre/utils/RfMeta.py @@ -1,31 +1,52 @@ from dataclasses import dataclass from enum import Enum, auto from pathlib import Path +import types from typing import Annotated, get_type_hints +import typing COMMIT_KEY = "COMMIT" +def is_optional(field: type): + return typing.get_origin(field) is typing.Union and type(None) in typing.get_args( + field + ) + + class RfMeta: @classmethod - def p2a(clz): - gh = get_type_hints(clz, include_extras=True) - return {k: gh[k].__metadata__[0] for k in gh} + def p2a(cls): + gh = get_type_hints(cls, include_extras=True) + d: dict[str, type] = dict() + for k, v in gh.items(): + if is_optional(v): + d[k] = typing.get_args(v)[0].__metadata__[0] + else: + d[k] = v.__metadata__[0] + return d @classmethod - def p2t(clz): - return get_type_hints(clz) + def p2t(cls): + gh = get_type_hints(cls) + d: dict[str, type] = dict() + for k, v in gh.items(): + if is_optional(v): + d[k] = typing.get_args(v)[0] + else: + d[k] = v + return d @classmethod - def a2p(clz): - return {v: k for k, v in clz.p2a().items()} + def a2p(cls): + return {v: k for k, v in cls.p2a().items()} @property def name(self): p2a = self.p2a() p2t = self.p2t() a2p = self.a2p() - arr = [] + arr: list[str] = [] if COMMIT_KEY in a2p: cp = a2p[COMMIT_KEY] del p2a[cp] @@ -33,6 +54,8 @@ class RfMeta: for p in p2a: t = p2t[p] v = self.__getattribute__(p) + if isinstance(t, types.GenericAlias): + t = typing.get_origin(t) if issubclass(t, Enum): vs = v.name elif issubclass(t, tuple): @@ -43,28 +66,28 @@ class RfMeta: arr.append(f"{p2a[p]}={vs}") return ",".join(arr) - def filename(self, ts=0): + def filename(self, ts: int = 0): return self.name + f".{ts}bin" @classmethod - def from_path(clz, path: Path | str): + def from_path(cls, path: Path | str): path = Path(path) if not path.exists(): raise FileNotFoundError(path) if path.is_file(): - return clz.from_name(path.stem) + return cls.from_name(path.stem) elif path.is_dir(): - return clz.from_name(path.name) + return cls.from_name(path.name) else: raise FileNotFoundError @classmethod - def from_name(clz, name: str): + def from_name(cls, name: str): if Path(name).suffix == ".zip": name = Path(name).stem - p2t = clz.p2t() - a2p = clz.a2p() - c = clz() + p2t = cls.p2t() + a2p = cls.a2p() + c = cls() sp = name.split(",") if COMMIT_KEY in a2p: c.__setattr__(a2p[COMMIT_KEY], sp.pop(0)) @@ -72,6 +95,9 @@ class RfMeta: a, v = pv.split("=") p = a2p[a] t = p2t[p] + if isinstance(t, types.GenericAlias): + t = typing.get_origin(t) + # print(t, issubclass(t, tuple)) if issubclass(t, Enum): c.__setattr__(p, t[v]) elif issubclass(t, tuple): @@ -84,25 +110,25 @@ class RfMeta: @dataclass class RfFrameMeta(RfMeta): - encoder: Annotated[int, "E"] = None - sequence_id: Annotated[int, "S"] = None # test3 - robot_x: Annotated[int, "X"] = None - robot_y: Annotated[int, "Y"] = None - robot_z: Annotated[int, "Z"] = None + encoder: Annotated[int, "E"] | None = None + sequence_id: Annotated[int, "S"] | None = None + robot_x: Annotated[int, "X"] | None = None + robot_y: Annotated[int, "Y"] | None = None + robot_z: Annotated[int, "Z"] | None = None - robot_rx: Annotated[int, "RX"] = None - robot_ry: Annotated[int, "RY"] = None - robot_rz: Annotated[int, "RZ"] = None + robot_rx: Annotated[int, "RX"] | None = None + robot_ry: Annotated[int, "RY"] | None = None + robot_rz: Annotated[int, "RZ"] | None = None - robot_force_x: Annotated[int, "FX"] = None - robot_force_y: Annotated[int, "FY"] = None - robot_force_z: Annotated[int, "FZ"] = None + robot_force_x: Annotated[int, "FX"] | None = None + robot_force_y: Annotated[int, "FY"] | None = None + robot_force_z: Annotated[int, "FZ"] | None = None - robot_force_rx: Annotated[int, "FRX"] = None - robot_force_ry: Annotated[int, "FRY"] = None - robot_force_rz: Annotated[int, "FRZ"] = None + robot_force_rx: Annotated[int, "FRX"] | None = None + robot_force_ry: Annotated[int, "FRY"] | None = None + robot_force_rz: Annotated[int, "FRZ"] | None = None - blake2b: Annotated[str, "B2B"] = None + blake2b: Annotated[str, "B2B"] | None = None @dataclass @@ -111,18 +137,20 @@ class RfSequenceMeta(RfMeta): PWI = auto() TFM = auto() - commit: Annotated[str, COMMIT_KEY] = None - shape: Annotated[tuple, "S"] = None + commit: Annotated[str, COMMIT_KEY] | None = None + shape: Annotated[tuple[int, ...], "S"] | None = None mode: Annotated[RfSequenceMode, "M"] = RfSequenceMode.PWI - us: Annotated[int, "U"] = None + us: Annotated[int, "U"] | None = None def prod(self): + assert self.shape res = 1 for i in self.shape: res *= int(i) return res def d(self): + assert self.shape return self.shape.__len__()