flandre/src/nodes/MainUI.py
2025-02-18 23:31:44 +08:00

389 lines
16 KiB
Python

import logging
import platform
import sys
import traceback
from enum import Enum, auto
from pathlib import Path
from PyQt6 import QtCore
from PyQt6.QtCore import QByteArray
from PyQt6.QtWidgets import QMainWindow, QApplication, QFrame, QMessageBox, QFileDialog
from Main import Ui_MainWindow
from ZMQReceiver import ZMQReceiver
from config import DS, SOFTWARE_CONFIG
from nodes.Node import Node
from utils.Msg import KillMsg, Msg, ImageArgMsg, SelectSeqMsg, SeqIdMinMax, MoveAxisMsg, SeqListMsg, SetBaseMsg, \
SetSeqMetaMsg, SetPlayMode, DeviceConnectedMsg, DeviceEnabledMsg, DeviceOnlineMsg, SetDeviceEnabledMsg, \
SetDeviceConnectedMsg, DeviceConfigListMsg, SetDeviceConfigMsg, SetRecordMsg, RobotRtsiMsg, RecordFrameMsg, \
SeqIdList, SetWindowVisibleMsg, SetSidMsg
from utils.RfMeta import RfSequenceMeta
logger = logging.getLogger(__name__)
class LinkStatus(Enum):
RED = auto()
YELLOW = auto()
GREEN = auto()
def humanbytes(B):
"""Return the given bytes as a human friendly KB, MB, GB, or TB string."""
B = float(B)
KB = float(1024)
MB = float(KB ** 2) # 1,048,576
GB = float(KB ** 3) # 1,073,741,824
TB = float(KB ** 4) # 1,099,511,627,776
if B < KB:
return '{0} {1}'.format(B, 'Bytes' if 0 == B > 1 else 'Byte')
elif KB <= B < MB:
return '{0:.2f} KB'.format(B / KB)
elif MB <= B < GB:
return '{0:.2f} MB'.format(B / MB)
elif GB <= B < TB:
return '{0:.2f} GB'.format(B / GB)
elif TB <= B:
return '{0:.2f} TB'.format(B / TB)
class Adv(QMainWindow, Ui_MainWindow):
def __init__(self, p: Node, parent=None):
super(Adv, self).__init__(parent)
self.p = p
self.setupUi(self)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.zmq_event.connect(self.on_zmq_event)
zmq_receiver.start()
self.s_t_start.valueChanged.connect(self.on_t_start)
self.s_t_end.valueChanged.connect(self.on_t_end)
self.c_playback_seq_name.currentIndexChanged.connect(self.on_select_plyayback_seq_name)
self.s_sid.valueChanged.connect(self.c_sid)
self.arg = ImageArgMsg('ui', t_start=0, t_end=1499)
self.b_base.clicked.connect(self.on_base_open)
self.seq_meta: RfSequenceMeta | None = None
self.b_play_live.clicked.connect(self.on_play_live)
self.b_play_playback.clicked.connect(self.on_play_playback)
self.b_record.clicked.connect(self.on_record)
self.record = False
self.device_connected = False
self.device_enabled = False
# self.b_device_enabled.clicked.connect(lambda: self.p.send(SetDeviceEnabledMsg(not self.device_enabled)))
# self.b_device_connected.clicked.connect(lambda: self.p.send(SetDeviceConnectedMsg(not self.device_connected)))
self.c_live_seq_name.currentIndexChanged.connect(self.on_seq_meta)
self.l_base.setText(SOFTWARE_CONFIG.base_dir.__str__())
self.b1, self.b2 = False, False
self.b3, self.b4 = False, False
self.record_size_cnt = 0
self.record_frame_cnt = 0
self.update_device_buttons()
self.seq_id_list = []
self.b_select_base.clicked.connect(self.on_select_base)
self.cb_bscan.stateChanged.connect(self.on_cb_bscan)
# self.cb_bscan.checkStateChanged.connect(lambda e: print(e == 2, flush=True))
# self.cb_bscan.checkStateChanged.connect(lambda e: print(e.name, flush=True))
def on_select_base(self):
base = QFileDialog.getExistingDirectory(self, 'Select Base Folder', DS.__str__())
self.l_base.setText(Path(base).__str__())
def update_b_play_live_enabled(self):
self.b_play_live.setEnabled(all([self.b1, self.b2]))
def on_base_open(self):
if Path(self.l_base.text()):
self.p.send(SetBaseMsg(self.l_base.text()))
self.b1 = True
self.update_b_play_live_enabled()
def set_device_connection(self, status: LinkStatus):
match status:
case LinkStatus.RED:
self.device_connected = False
self.c_live_seq_name.setEnabled(False)
self.lb_device_connection.setText('Disconnected')
self.lb_device_connection.setStyleSheet('background-color: pink;')
case LinkStatus.YELLOW:
self.lb_device_connection.setText('Waiting')
self.lb_device_connection.setStyleSheet('background-color: yellow;')
case LinkStatus.GREEN:
self.device_connected = True
self.c_live_seq_name.setEnabled(True)
self.lb_device_connection.setText('Connected')
self.lb_device_connection.setStyleSheet('background-color: LightGreen;')
def set_device_enable(self, status: LinkStatus):
match status:
case LinkStatus.RED:
self.device_enabled = False
self.lb_device_enable.setText('Disabled')
self.lb_device_enable.setStyleSheet('background-color: pink;')
case LinkStatus.YELLOW:
self.lb_device_enable.setText('Waiting')
self.lb_device_enable.setStyleSheet('background-color: yellow;')
case LinkStatus.GREEN:
self.device_enabled = True
self.lb_device_enable.setText('Enabled')
self.lb_device_enable.setStyleSheet('background-color: LightGreen;')
def update_c_playback_seq_name_enable(self):
self.c_playback_seq_name.setEnabled(all([self.b3, self.b4]))
def on_play_live(self):
self.s_sid.setEnabled(False)
self.b_play_live.setStyleSheet('background-color: red;')
self.b_play_playback.setStyleSheet('')
self.b_record.setEnabled(True)
self.l_record_commit.setEnabled(True)
self.p.send(SetSeqMetaMsg('live', self.c_live_seq_name.itemText(self.c_live_seq_name.currentIndex())))
self.p.send(SetPlayMode('live'))
self.b3 = False
self.update_c_playback_seq_name_enable()
def on_play_playback(self):
self.s_sid.setEnabled(True)
self.b_play_live.setStyleSheet('')
self.b_play_playback.setStyleSheet('background-color: red;')
self.b_record.setEnabled(False)
self.l_record_commit.setEnabled(False)
self.p.send(
SetSeqMetaMsg('playback', self.c_playback_seq_name.itemText(self.c_playback_seq_name.currentIndex())))
self.p.send(SetPlayMode('playback'))
logger.debug(f'set playmode playback')
self.b3 = True
self.update_c_playback_seq_name_enable()
def on_device_disable(self):
self.p.send(SetDeviceEnabledMsg(False))
self.b_device_enable.setEnabled(False)
self.set_device_enable(LinkStatus.YELLOW)
def on_device_enable(self):
self.p.send(SetDeviceEnabledMsg(True))
self.b_device_enable.setEnabled(False)
self.set_device_enable(LinkStatus.YELLOW)
def on_device_disconnect(self):
self.p.send(SetDeviceConnectedMsg(False))
self.b_device_connection.setEnabled(False)
self.set_device_connection(LinkStatus.YELLOW)
def on_device_connect(self):
self.p.send(SetDeviceConnectedMsg(True))
self.b_device_connection.setEnabled(False)
self.set_device_connection(LinkStatus.YELLOW)
def update_device_buttons(self):
if self.device_connected and self.device_enabled:
self.b_device_connection.setText('Disconnect')
self.b_device_connection.setEnabled(False)
self.b_device_enable.setText('Disable')
self.b_device_enable.setEnabled(True)
try:
self.b_device_connection.clicked.disconnect()
except:
pass
try:
self.b_device_enable.clicked.disconnect()
except:
pass
self.b_device_enable.clicked.connect(self.on_device_disable)
if self.device_connected and not self.device_enabled:
self.b_device_connection.setText('Disconnect')
self.b_device_connection.setEnabled(True)
self.b_device_enable.setText('Enable')
self.b_device_enable.setEnabled(True)
try:
self.b_device_connection.clicked.disconnect()
except:
pass
try:
self.b_device_enable.clicked.disconnect()
except:
pass
self.b_device_enable.clicked.connect(self.on_device_enable)
self.b_device_connection.clicked.connect(self.on_device_disconnect)
if not self.device_connected and self.device_enabled:
raise Exception("wtf?")
if not self.device_connected and not self.device_enabled:
self.b_device_connection.setText('Connect')
self.b_device_connection.setEnabled(True)
self.b_device_enable.setText('Enable')
self.b_device_enable.setEnabled(False)
try:
self.b_device_connection.clicked.disconnect()
except:
pass
try:
self.b_device_enable.clicked.disconnect()
except:
pass
self.b_device_connection.clicked.connect(self.on_device_connect)
def on_record(self):
if self.record:
self.l_record_commit.setEnabled(True)
self.p.send(SetRecordMsg(False))
self.record = False
self.b_record.setStyleSheet('')
else:
self.l_record_commit.setEnabled(False)
if self.l_record_commit.text() != '':
self.record_size_cnt = 0
self.record_frame_cnt = 0
self.p.send(SetRecordMsg(True, self.l_record_commit.text(), self.l_base.text()))
self.record = True
self.b_record.setStyleSheet('background-color: red;')
else:
QMessageBox.warning(None, 'hint', 'Commit is empty!!')
def on_zmq_event(self, msg: QByteArray):
try:
msg = Msg.decode_msg(msg.data())
if isinstance(msg, KillMsg):
if msg.name == '':
self.close()
elif isinstance(msg, ImageArgMsg):
self.s_t_end.setValue(msg.t_start)
self.s_t_end.setValue(msg.t_end)
elif isinstance(msg, MoveAxisMsg):
match msg.axis:
case 'S':
pass
# self.s_sid.setValue(msg.value)
# self.l_seq_current.setText(str(self.seq_id_list[msg.value]))
elif isinstance(msg, SetSidMsg):
self.s_sid.setValue(msg.value)
self.l_seq_current.setText(str(self.seq_id_list[msg.value]))
elif isinstance(msg, SeqIdList):
self.seq_id_list = msg.li
self.s_sid.setMaximum(msg.li.__len__() - 1)
self.sp_sid.setMaximum(msg.li.__len__() - 1)
self.l_seq_min.setText(str(min(msg.li)))
self.l_seq_max.setText(str(max(msg.li)))
elif isinstance(msg, SeqListMsg):
self.b4 = True
self.update_c_playback_seq_name_enable()
self.c_playback_seq_name.clear()
for name in msg.value:
self.c_playback_seq_name.addItem(name)
if msg.value.__len__() > 0:
self.p.send(
SelectSeqMsg(self.c_playback_seq_name.itemText(self.c_playback_seq_name.currentIndex())))
self.b_play_playback.setEnabled(True)
elif isinstance(msg, SetSeqMetaMsg):
self.seq_meta = RfSequenceMeta.from_name(msg.name)
self.s_t_start.setMaximum(max(self.seq_meta.shape))
self.s_t_end.setMaximum(max(self.seq_meta.shape))
elif isinstance(msg, DeviceConnectedMsg):
if msg.value:
self.set_device_connection(LinkStatus.GREEN)
else:
self.set_device_connection(LinkStatus.RED)
self.update_device_buttons()
elif isinstance(msg, DeviceEnabledMsg):
if msg.value:
self.set_device_enable(LinkStatus.GREEN)
else:
self.set_device_enable(LinkStatus.RED)
self.update_device_buttons()
elif isinstance(msg, DeviceOnlineMsg):
if msg.value:
self.l_online.setStyleSheet("")
self.l_online.setText("Device Online")
else:
self.l_online.setStyleSheet("background-color: pink;")
self.l_online.setText("Device Offline")
elif isinstance(msg, DeviceConfigListMsg):
for name, txt in msg.arr:
self.c_live_seq_name.addItem(name, txt)
elif isinstance(msg, RobotRtsiMsg):
self.l_robot_pos.setText(','.join([f'{v:.3f}' for v in msg.pos]))
self.l_robot_force.setText(','.join([f'{v:.3f}' for v in msg.force]))
elif isinstance(msg, RecordFrameMsg):
self.record_frame_cnt += 1
self.record_size_cnt += msg.size
self.l_record_size.setText(humanbytes(self.record_size_cnt))
self.l_record_frames.setText(str(self.record_frame_cnt))
self.l_record_max_sid.setText(str(msg.current_sid))
elif isinstance(msg, SetWindowVisibleMsg):
if msg.name == 'bscan' and msg.sender != 'ui':
self.cb_bscan.setChecked(msg.value)
except Exception as e:
logger.error(e)
traceback.print_exception(e)
def closeEvent(self, event):
self.p.send(KillMsg(''))
# event.accept()
# event.ignore()
@QtCore.pyqtSlot(int)
def on_cb_bscan(self, v):
if self.cb_bscan.sender() is None:
self.p.send(SetWindowVisibleMsg('ui', 'bscan', v == 2))
@QtCore.pyqtSlot(int)
def on_t_start(self, v):
if self.s_t_end.sender() is None:
self.arg.t_start = v
self.p.send(self.arg)
@QtCore.pyqtSlot(int)
def on_t_end(self, v):
if self.s_t_end.sender() is None:
self.arg.t_end = v
self.p.send(self.arg)
@QtCore.pyqtSlot(int)
def on_select_plyayback_seq_name(self, v):
if self.c_playback_seq_name.sender() is None or isinstance(self.c_playback_seq_name.sender(), QFrame):
self.p.send(SelectSeqMsg(self.c_playback_seq_name.itemText(v)))
@QtCore.pyqtSlot(int)
def c_sid(self, v):
if self.s_sid.sender() is None:
self.sp_sid.setValue(v)
self.p.send(SetSidMsg(v))
# self.p.send(MoveAxisMsg('ui', 'S', v))
@QtCore.pyqtSlot(int)
def on_seq_meta(self, v):
if self.c_live_seq_name.sender() is None or isinstance(self.c_live_seq_name.sender(), QFrame):
if self.c_live_seq_name.itemText(v) != 'Empty':
self.b2 = True
self.update_b_play_live_enabled()
self.p.send(SetDeviceConfigMsg(self.c_live_seq_name.itemData(v)))
self.p.send(SetSeqMetaMsg('live', self.c_live_seq_name.itemText(v)))
class MainUI(Node):
topics = [ImageArgMsg, SeqIdMinMax, MoveAxisMsg,
SeqListMsg, SetSeqMetaMsg, SeqIdList, SetWindowVisibleMsg, SetSidMsg,
DeviceConnectedMsg, DeviceEnabledMsg, DeviceOnlineMsg, DeviceConfigListMsg,
RobotRtsiMsg,
RecordFrameMsg]
def __init__(self, level=logging.INFO):
super().__init__(level=level)
def loop(self):
try:
app = QApplication(sys.argv)
if platform.system() == 'Windows':
app.setStyle('windowsvista')
MainWindow = Adv(self)
# MainWindow.move(int(px), int(py))
# MainWindow.resize(int(sx), int(sy))
MainWindow.show()
app.exec()
except Exception as e:
print(e)