flandre/flandre/nodes/MainUI.py
2025-06-10 20:35:01 +08:00

723 lines
25 KiB
Python

import logging
import platform
import sys
import time
import traceback
from enum import Enum, auto
from pathlib import Path
import zmq
from PyQt6 import QtGui, QtWidgets
from PyQt6.QtCore import QByteArray, pyqtSlot
from PyQt6.QtWidgets import (
QApplication,
QFileDialog,
QFrame,
QLineEdit,
QMainWindow,
QMessageBox,
)
from flandre import C, P
from flandre.nodes.Node import Node
from flandre.pyqt.Main import Ui_MainWindow
from flandre.pyqt.ZMQReceiver import ZMQReceiver
from flandre.utils.Msg import (
DeviceConfigListMsg,
DeviceConnectedMsg,
DeviceEnabledMsg,
DeviceOnlineMsg,
DeviceSwitchMsg,
DeviceZero,
ImageArgMsg,
ImagingConfigNameListMsg,
KillMsg,
MaxMsg,
MoveAxisMsg,
Msg,
PlaybackSeqListMsg,
RecordFrameMsg,
RefreshDeviceMsg,
RobotRtsiMsg,
SeqIdList,
SeqIdMinMax,
SeqMetaMsg,
SetBaseMsg,
SetDeviceConfigMsg,
SetDeviceConnectedMsg,
SetDeviceEnabledMsg,
SetDeviceSwitchMsg,
SetPlayMode,
SetRecordMsg,
SetSeqMetaMsg,
SetSidMsg,
SetWindowVisibleMsg,
)
from flandre.utils.RfMeta import RfSequenceMeta
logger = logging.getLogger(__name__)
class LinkStatus(Enum):
RED = auto()
YELLOW = auto()
GREEN = auto()
ORANGE = auto()
def get_style_sheet(name, color):
return f"""
#{name}{{
background: {color};
border-radius: 7px;
}}
""".strip()
def humanbytes(B):
"""Return the given bytes as a human friendly KB, MB, GB, or TB string."""
B = float(B)
KB = float(1024)
MB = float(KB**2) # 1,048,576
GB = float(KB**3) # 1,073,741,824
TB = float(KB**4) # 1,099,511,627,776
if B < KB:
return "{0} {1}".format(B, "Bytes" if 0 == B > 1 else "Byte")
elif KB <= B < MB:
return "{0:.2f} KB".format(B / KB)
elif MB <= B < GB:
return "{0:.2f} MB".format(B / MB)
elif GB <= B < TB:
return "{0:.2f} GB".format(B / GB)
elif TB <= B:
return "{0:.2f} TB".format(B / TB)
class Adv(QMainWindow, Ui_MainWindow):
def __init__(self, p: Node, parent=None):
super(Adv, self).__init__(parent)
self.p = p
self.setupUi(self)
# set default folder
self.l_base.setText(C.record_folder.__str__())
self.device_switch_state: LinkStatus = LinkStatus.RED
icon = QtGui.QIcon()
icon.addPixmap(QtGui.QPixmap(str(P.ASSETS / "switch_button.png")))
self.b_probe_head_switch.setIcon(icon)
self.b_us_switch.setIcon(icon)
self.b_cobot_switch.setIcon(icon)
icon = QtGui.QIcon()
icon.addPixmap(QtGui.QPixmap(str(P.ASSETS / "refresh_button.png")))
self.b_us_refresh.setIcon(icon)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.zmq_event.connect(self.on_zmq_event)
zmq_receiver.start()
self.s_t_start.valueChanged.connect(self.on_t_start)
self.s_t_end.valueChanged.connect(self.on_t_end)
self.arg = ImageArgMsg("ui", t_start=0, t_end=1499)
self.playback_seq_meta: RfSequenceMeta | None = None
self.live_seq_meta: RfSequenceMeta | None = None
self.record = False
self.device_connected = False
self.device_enabled = False
self.l_base.textChanged.connect(
lambda e: self.l_base.setStyleSheet("")
if Path(e).exists()
else self.l_base.setStyleSheet("background-color: pink;")
)
self.record_size_cnt = 0
self.record_frame_cnt = 0
self.update_device_buttons()
self.seq_id_list = []
self.b_select_base.clicked.connect(self.on_select_base)
self.cb_bscan.stateChanged.connect(self.on_cb_bscan)
self.b_probe_single.clicked.connect(self.on_probe("single"))
self.b_probe_orig.clicked.connect(self.on_probe("orig"))
self.b_probe_start.clicked.connect(self.on_probe("start"))
self.b_probe_stop.clicked.connect(self.on_probe("stop"))
self.b_device_zero.clicked.connect(lambda: self.p.send(DeviceZero()))
self.mi_req_socket = zmq.Context().socket(zmq.REQ)
self.mi_req_socket.connect(C.mi_rep_socket)
@pyqtSlot()
def on_b_us_switch_clicked(self):
match self.device_switch_state:
case LinkStatus.RED:
self.p.send(SetDeviceSwitchMsg(True))
self.p.send(DeviceSwitchMsg("YELLOW"))
case LinkStatus.GREEN | LinkStatus.YELLOW | LinkStatus.ORANGE:
self.p.send(SetDeviceSwitchMsg(False))
@pyqtSlot()
def on_b_us_refresh_clicked(self):
match self.device_switch_state:
case LinkStatus.GREEN | LinkStatus.YELLOW | LinkStatus.ORANGE:
self.p.send(RefreshDeviceMsg())
self.set_device_enable(LinkStatus.RED)
self.set_device_connection(LinkStatus.RED)
self.update_device_buttons()
def on_device_switch_state(self):
match self.device_switch_state:
case LinkStatus.RED:
self.g_device.setEnabled(False)
self.g_us.setStyleSheet(get_style_sheet("g_us", "pink"))
self.set_device_enable(LinkStatus.RED)
self.set_device_connection(LinkStatus.RED)
self.update_device_buttons()
case LinkStatus.YELLOW:
self.g_device.setEnabled(False)
self.g_us.setStyleSheet(get_style_sheet("g_us", "yellow"))
case LinkStatus.GREEN:
self.g_device.setEnabled(True)
self.g_us.setStyleSheet(get_style_sheet("g_us", "LightGreen"))
case LinkStatus.ORANGE:
self.g_device.setEnabled(False)
self.g_us.setStyleSheet(get_style_sheet("g_us", "orange"))
def on_probe(self, arg):
def f():
ctx = zmq.Context()
p = ctx.socket(zmq.PUSH)
p.connect("tcp://q1hyb.as:23456")
time.sleep(0.1)
p.send_string(arg)
p.disconnect("tcp://q1hyb.as:23456")
return f
@pyqtSlot()
def on_b_new_imaging_config_clicked(self):
filename, okPressed = QtWidgets.QInputDialog.getText(
None,
"Set New Imaging Config Name",
"Config Name:",
QLineEdit.EchoMode.Normal,
self.c_imaging_config.currentText(),
)
if okPressed and filename != "":
(C.imaging_config_folder / f"{filename}.json").write_text(self.arg.json())
idx = self.c_imaging_config.findText(filename)
if idx == -1:
self.c_imaging_config.addItem(filename)
idx = self.c_imaging_config.findText(filename)
self.c_imaging_config.setCurrentIndex(idx)
else:
self.c_imaging_config.setCurrentIndex(idx)
@pyqtSlot(int)
def on_c_imaging_config_currentIndexChanged(self, i):
name = self.c_imaging_config.itemText(i)
self.p.send(ImageArgMsg.from_path(C.imaging_config_folder / f"{name}.json"))
def on_select_base(self):
base = QFileDialog.getExistingDirectory(
self, "Select Base Folder", DS.__str__()
)
self.l_base.setText(Path(base).__str__())
@pyqtSlot()
def on_b_test1_clicked(self):
logger.info(f"test1 {self.arg}")
@pyqtSlot()
def on_b_base_clicked(self):
if Path(self.l_base.text()):
self.p.send(SetBaseMsg(self.l_base.text()))
self.g_cap.setEnabled(True)
def set_device_connection(self, status: LinkStatus):
match status:
case LinkStatus.RED:
self.device_connected = False
self.c_live_seq_name.setEnabled(False)
self.lb_device_connection.setText("Disconnected")
self.lb_device_connection.setStyleSheet("background-color: pink;")
case LinkStatus.YELLOW:
self.lb_device_connection.setText("Waiting")
self.lb_device_connection.setStyleSheet("background-color: yellow;")
case LinkStatus.GREEN:
self.device_connected = True
self.c_live_seq_name.setEnabled(True)
self.lb_device_connection.setText("Connected")
self.lb_device_connection.setStyleSheet("background-color: LightGreen;")
def set_device_enable(self, status: LinkStatus):
match status:
case LinkStatus.RED:
self.device_enabled = False
self.lb_device_enable.setText("Disabled")
self.lb_device_enable.setStyleSheet("background-color: pink;")
case LinkStatus.YELLOW:
self.lb_device_enable.setText("Waiting")
self.lb_device_enable.setStyleSheet("background-color: yellow;")
case LinkStatus.GREEN:
self.device_enabled = True
self.lb_device_enable.setText("Enabled")
self.lb_device_enable.setStyleSheet("background-color: LightGreen;")
@pyqtSlot(bool)
def on_g_live_clicked(self, b):
if b:
self.g_playback.setChecked(False)
self.p.send(SetPlayMode("live"))
if self.live_seq_meta is not None:
self.update_max(max(self.live_seq_meta.shape))
logger.info("set playmode live")
@pyqtSlot(bool)
def on_g_playback_clicked(self, b):
if b:
self.g_live.setChecked(False)
self.p.send(SetPlayMode("playback"))
if self.playback_seq_meta is not None:
self.update_max(max(self.playback_seq_meta.shape))
logger.info("set playmode playback")
def on_device_disable(self):
self.p.send(SetDeviceEnabledMsg(False))
self.b_device_enable.setEnabled(False)
self.set_device_enable(LinkStatus.YELLOW)
def on_device_enable(self):
self.p.send(SetDeviceEnabledMsg(True))
self.b_device_enable.setEnabled(False)
self.set_device_enable(LinkStatus.YELLOW)
def on_device_disconnect(self):
self.p.send(SetDeviceConnectedMsg(False))
self.b_device_connection.setEnabled(False)
self.set_device_connection(LinkStatus.YELLOW)
def on_device_connect(self):
logger.info("btn pre")
self.p.send(SetDeviceConnectedMsg(True))
self.b_device_connection.setEnabled(False)
self.set_device_connection(LinkStatus.YELLOW)
def update_device_buttons(self):
if self.device_connected and self.device_enabled:
self.b_device_connection.setText("Disconnect")
self.b_device_connection.setEnabled(False)
self.b_device_enable.setText("Disable")
self.b_device_enable.setEnabled(True)
try:
self.b_device_connection.clicked.disconnect()
except:
pass
try:
self.b_device_enable.clicked.disconnect()
except:
pass
self.b_device_enable.clicked.connect(self.on_device_disable)
if self.device_connected and not self.device_enabled:
self.b_device_connection.setText("Disconnect")
self.b_device_connection.setEnabled(True)
self.b_device_enable.setText("Enable")
self.b_device_enable.setEnabled(True)
try:
self.b_device_connection.clicked.disconnect()
except:
pass
try:
self.b_device_enable.clicked.disconnect()
except:
pass
self.b_device_enable.clicked.connect(self.on_device_enable)
self.b_device_connection.clicked.connect(self.on_device_disconnect)
if not self.device_connected and self.device_enabled:
raise Exception("wtf?")
if not self.device_connected and not self.device_enabled:
self.b_device_connection.setText("Connect")
self.b_device_connection.setEnabled(True)
self.b_device_enable.setText("Enable")
self.b_device_enable.setEnabled(False)
try:
self.b_device_connection.clicked.disconnect()
except:
pass
try:
self.b_device_enable.clicked.disconnect()
except:
pass
self.b_device_connection.clicked.connect(self.on_device_connect)
@pyqtSlot()
def on_b_record_clicked(self):
if self.record:
self.l_record_commit.setEnabled(True)
self.p.send(SetRecordMsg(False))
self.record = False
self.b_record.setStyleSheet("")
else:
if self.l_record_commit.text() != "":
self.l_record_commit.setEnabled(False)
self.record_size_cnt = 0
self.record_frame_cnt = 0
self.p.send(
SetRecordMsg(True, self.l_record_commit.text(), self.l_base.text())
)
self.record = True
self.b_record.setStyleSheet("background-color: red;")
else:
QMessageBox.warning(None, "hint", "Commit is empty!!")
def update_max_2(self, m):
self.s_t_start.setMaximum(m)
self.s_t_end.setMaximum(m)
self.sp_crop_center.setMaximum(m)
self.sp_crop_width.setMaximum(m)
def update_max(self, m):
self.s_f_rows.setMaximum(m)
self.sp_f_rows.setMaximum(m)
self.s_dct_center.setMaximum(m)
self.s_dct_bandwidth.setMaximum(m)
self.sp_dct_center.setMaximum(m)
self.sp_dct_bandwidth.setMaximum(m)
def on_zmq_event(self, msg: QByteArray):
try:
msg = Msg.decode_msg(msg.data())
if isinstance(msg, KillMsg):
if msg.name == "":
self.close()
elif isinstance(msg, ImageArgMsg):
self.arg = msg
self.arg.sender = "ui"
self.s_t_start.setValue(msg.t_start)
self.s_t_end.setValue(msg.t_end)
self.s_f_rows.setValue(msg.f_rows)
self.s_v2.setValue(msg.v2)
self.s_dct_center.setValue(msg.dct_center)
self.s_dct_bandwidth.setValue(msg.dct_bandwidth)
self.s_beta.setValue(msg.beta)
self.sp_crop_center.setValue(msg.t_start)
self.sp_crop_width.setValue(msg.t_end)
self.sp_f_rows.setValue(msg.f_rows)
self.sp_v2.setValue(msg.v2)
self.sp_dct_center.setValue(msg.dct_center)
self.sp_dct_bandwidth.setValue(msg.dct_bandwidth)
self.sp_beta.setValue(msg.beta)
self.s_g1.setValue(msg.g1)
self.s_g2.setValue(msg.g2)
self.s_g3.setValue(msg.g3)
self.s_g4.setValue(msg.g4)
self.s_g5.setValue(msg.g5)
self.s_g6.setValue(msg.g6)
self.s_g7.setValue(msg.g7)
self.s_g8.setValue(msg.g8)
self.sp_g1.setValue(msg.g1)
self.sp_g2.setValue(msg.g2)
self.sp_g3.setValue(msg.g3)
self.sp_g4.setValue(msg.g4)
self.sp_g5.setValue(msg.g5)
self.sp_g6.setValue(msg.g6)
self.sp_g7.setValue(msg.g7)
self.sp_g8.setValue(msg.g8)
elif isinstance(msg, MoveAxisMsg):
match msg.axis:
case "S":
pass
# self.s_sid.setValue(msg.value)
# self.l_seq_current.setText(str(self.seq_id_list[msg.value]))
elif isinstance(msg, SetSidMsg):
self.s_sid.setValue(msg.value)
self.sp_sid.setValue(msg.value)
self.l_seq_current.setText(str(self.seq_id_list[msg.value]))
elif isinstance(msg, SeqIdList):
self.seq_id_list = msg.li
self.s_sid.setMaximum(msg.li.__len__() - 1)
self.sp_sid.setMaximum(msg.li.__len__() - 1)
self.l_seq_min.setText(str(min(msg.li)))
self.l_seq_max.setText(str(max(msg.li)))
elif isinstance(msg, PlaybackSeqListMsg):
self.c_playback_seq_name.clear()
for name in msg.value:
self.c_playback_seq_name.addItem(name)
if msg.value.__len__() > 0:
self.p.send(
SetSeqMetaMsg(
"playback", self.c_playback_seq_name.currentText()
)
)
elif isinstance(msg, SetSeqMetaMsg):
if msg.target == "playback":
self.playback_seq_meta = RfSequenceMeta.from_name(msg.name)
self.update_max(max(self.playback_seq_meta.shape))
elif isinstance(msg, SeqMetaMsg):
if msg.target == "live":
self.l_live_seq_name.setText(msg.name)
self.b_live_seq_apply.setEnabled(True)
self.live_seq_meta = RfSequenceMeta.from_name(msg.name)
self.update_max(max(self.live_seq_meta.shape))
elif isinstance(msg, DeviceConnectedMsg):
if msg.value:
self.set_device_connection(LinkStatus.GREEN)
else:
self.set_device_connection(LinkStatus.RED)
self.update_device_buttons()
elif isinstance(msg, DeviceEnabledMsg):
if msg.value:
self.set_device_enable(LinkStatus.GREEN)
else:
self.set_device_enable(LinkStatus.RED)
self.update_device_buttons()
elif isinstance(msg, DeviceConfigListMsg):
for name, txt in msg.arr:
self.c_live_seq_name.addItem(name, txt)
elif isinstance(msg, RobotRtsiMsg):
self.ln_x.display(f"{msg.pos[0] / 100:.2f}")
self.ln_y.display(f"{msg.pos[1] / 100:.2f}")
self.ln_z.display(f"{msg.pos[2] / 100:.2f}")
self.ln_rx.display(f"{msg.pos[3] / 1000:.3f}")
self.ln_ry.display(f"{msg.pos[4] / 1000:.3f}")
self.ln_rz.display(f"{msg.pos[5] / 1000:.3f}")
self.ln_fx.display(f"{msg.force[0] / 10:.1f}")
self.ln_fy.display(f"{msg.force[1] / 10:.1f}")
self.ln_fz.display(f"{msg.force[2] / 10:.1f}")
self.ln_frx.display(f"{msg.force[3] / 100:.2f}")
self.ln_fry.display(f"{msg.force[4] / 100:.2f}")
self.ln_frz.display(f"{msg.force[5] / 100:.2f}")
elif isinstance(msg, MaxMsg):
self.update_max_2(msg.value)
elif isinstance(msg, RecordFrameMsg):
self.record_frame_cnt += 1
self.record_size_cnt += msg.size
self.l_record_size.setText(humanbytes(self.record_size_cnt))
self.l_record_frames.setText(str(self.record_frame_cnt))
self.l_record_max_sid.setText(str(msg.current_sid))
elif isinstance(msg, SetWindowVisibleMsg):
if msg.name == "bscan" and msg.sender != "ui":
self.cb_bscan.setChecked(msg.value)
elif isinstance(msg, ImagingConfigNameListMsg):
# print(msg, flush=True) todo fix
self.c_imaging_config.clear()
for name in msg.value:
self.c_imaging_config.addItem(name)
elif isinstance(msg, DeviceSwitchMsg):
self.device_switch_state = LinkStatus[msg.value]
self.on_device_switch_state()
except Exception as e:
logger.error(e)
traceback.print_exception(e)
def closeEvent(self, event):
self.p.send(KillMsg(""))
# event.accept()
# event.ignore()
@pyqtSlot(int)
def on_cb_bscan(self, v):
if self.cb_bscan.sender() is None:
self.p.send(SetWindowVisibleMsg("ui", "bscan", v == 2))
@pyqtSlot(int)
def on_sp_crop_center_valueChanged(self, v):
if self.sp_crop_center.sender() is None:
self.arg.t_start = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_sp_crop_width_valueChanged(self, v):
if self.sp_crop_width.sender() is None:
self.arg.t_end = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_sp_v2_valueChanged(self, v):
if self.sp_v2.sender() is None:
self.arg.v2 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_sp_f_rows_valueChanged(self, v):
if self.sp_f_rows.sender() is None:
self.arg.f_rows = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_beta_valueChanged(self, v):
if self.s_beta.sender() is None:
self.arg.beta = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_dct_center_valueChanged(self, v):
if self.s_dct_center.sender() is None:
self.arg.dct_center = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_dct_bandwidth_valueChanged(self, v):
if self.s_dct_bandwidth.sender() is None:
self.arg.dct_bandwidth = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_v2_valueChanged(self, v):
if self.s_v2.sender() is None:
self.arg.v2 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_tgc_valueChanged(self, v):
if self.s_tgc.sender() is None:
self.arg.tgc = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g1_valueChanged(self, v):
if self.s_g1.sender() is None:
self.arg.g1 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g2_valueChanged(self, v):
if self.s_g2.sender() is None:
self.arg.g2 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g3_valueChanged(self, v):
if self.s_g3.sender() is None:
self.arg.g3 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g4_valueChanged(self, v):
if self.s_g4.sender() is None:
self.arg.g4 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g5_valueChanged(self, v):
if self.s_g5.sender() is None:
self.arg.g5 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g6_valueChanged(self, v):
if self.s_g6.sender() is None:
self.arg.g6 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g7_valueChanged(self, v):
if self.s_g7.sender() is None:
self.arg.g7 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_g8_valueChanged(self, v):
if self.s_g8.sender() is None:
self.arg.g8 = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_t_start(self, v):
if self.s_t_end.sender() is None:
self.arg.t_start = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_t_end(self, v):
if self.s_t_end.sender() is None:
self.arg.t_end = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_s_f_rows_valueChanged(self, v):
if self.s_f_rows.sender() is None:
self.arg.f_rows = v
self.p.send(self.arg)
@pyqtSlot(int)
def on_c_playback_seq_name_currentIndexChanged(self, v):
if self.c_playback_seq_name.sender() is None or isinstance(
self.c_playback_seq_name.sender(), QFrame
):
self.p.send(SetSeqMetaMsg("playback", self.c_playback_seq_name.itemText(v)))
@pyqtSlot(int)
def on_s_sid_valueChanged(self, v):
if self.s_sid.sender() is None:
self.p.send(SetSidMsg(v))
@pyqtSlot(int)
def on_sp_sid_valueChanged(self, v):
if self.sp_sid.sender() is None:
self.p.send(SetSidMsg(v))
# @pyqtSlot(int)
@pyqtSlot()
def on_b_live_seq_apply_clicked(self):
v = self.c_live_seq_name.currentIndex()
name = self.c_live_seq_name.currentText()
if name != "Empty":
self.b_live_seq_apply.setEnabled(False)
self.p.send(SetDeviceConfigMsg(name, self.c_live_seq_name.itemData(v)))
class MainUI(Node):
topics = [
ImageArgMsg,
SeqIdMinMax,
MoveAxisMsg,
ImagingConfigNameListMsg,
PlaybackSeqListMsg,
SetSeqMetaMsg,
SeqIdList,
SetWindowVisibleMsg,
SetSidMsg,
DeviceConnectedMsg,
DeviceEnabledMsg,
DeviceOnlineMsg,
DeviceConfigListMsg,
RobotRtsiMsg,
DeviceSwitchMsg,
RecordFrameMsg,
SeqMetaMsg,
MaxMsg,
]
def __init__(self, level=logging.INFO):
super().__init__(level=level)
def loop(self):
try:
app = QApplication(sys.argv)
app.setDesktopFileName("TestTest")
if platform.system() == "Windows":
app.setStyle("windowsvista")
MainWindow = Adv(self)
# icon = QtGui.QIcon()
# icon.addPixmap(QtGui.QPixmap("remilia3.png"),
# QtGui.QIcon.Mode.Normal, QtGui.QIcon.State.Off)
# MainWindow.setApp(icon)
# MainWindow.move(int(px), int(py))
# MainWindow.resize(int(sx), int(sy))
MainWindow.show()
app.exec()
except Exception as e:
traceback.print_exception(e)