import logging import platform import sys from enum import Enum, auto from pathlib import Path from PyQt6 import QtCore from PyQt6.QtCore import QByteArray from PyQt6.QtWidgets import QMainWindow, QApplication, QFrame, QMessageBox, QFileDialog from Main import Ui_MainWindow from ZMQReceiver import ZMQReceiver from config import DS from nodes.Node import Node from utils.Msg import KillMsg, Msg, ImageArgMsg, SelectSeqMsg, SeqIdMinMax, MoveAxisMsg, SeqListMsg, SetBaseMsg, \ SeqMetaMsg, SetPlayMode, DeviceConnectedMsg, DeviceEnabledMsg, DeviceOnlineMsg, SetDeviceEnabledMsg, \ SetDeviceConnectedMsg, DeviceConfigListMsg, SetDeviceConfigMsg, SetRecordMsg, RobotRtsiMsg, RecordFrameMsg, \ SeqIdList from utils.RfMeta import RfSequenceMeta logger = logging.getLogger(__name__) class LinkStatus(Enum): RED = auto() YELLOW = auto() GREEN = auto() def humanbytes(B): """Return the given bytes as a human friendly KB, MB, GB, or TB string.""" B = float(B) KB = float(1024) MB = float(KB ** 2) # 1,048,576 GB = float(KB ** 3) # 1,073,741,824 TB = float(KB ** 4) # 1,099,511,627,776 if B < KB: return '{0} {1}'.format(B, 'Bytes' if 0 == B > 1 else 'Byte') elif KB <= B < MB: return '{0:.2f} KB'.format(B / KB) elif MB <= B < GB: return '{0:.2f} MB'.format(B / MB) elif GB <= B < TB: return '{0:.2f} GB'.format(B / GB) elif TB <= B: return '{0:.2f} TB'.format(B / TB) class Adv(QMainWindow, Ui_MainWindow): def __init__(self, p: Node, parent=None): super(Adv, self).__init__(parent) self.p = p self.setupUi(self) zmq_receiver = ZMQReceiver(self) zmq_receiver.zmq_event.connect(self.on_zmq_event) zmq_receiver.start() self.s_t_start.valueChanged.connect(self.on_t_start) self.s_t_end.valueChanged.connect(self.on_t_end) self.c_playback_seq_name.currentIndexChanged.connect(self.on_select_plyayback_seq_name) self.s_sid.valueChanged.connect(self.c_sid) self.arg = ImageArgMsg('ui', t_start=0, t_end=1499) self.b_base.clicked.connect(self.on_base) self.seq_meta: RfSequenceMeta | None = None self.b_play_live.clicked.connect(self.on_play_live) self.b_play_playback.clicked.connect(self.on_play_playback) self.b_record.clicked.connect(self.on_record) self.record = False self.device_connected = False self.device_enabled = False # self.b_device_enabled.clicked.connect(lambda: self.p.send(SetDeviceEnabledMsg(not self.device_enabled))) # self.b_device_connected.clicked.connect(lambda: self.p.send(SetDeviceConnectedMsg(not self.device_connected))) self.c_seq_meta.currentIndexChanged.connect(self.on_seq_meta) self.l_base.setText(DS.__str__()) self.b1, self.b2 = False, False self.b3, self.b4 = False, False self.record_size_cnt = 0 self.record_frame_cnt = 0 self.update_device_buttons() self.seq_id_list = [] self.b_select_base.clicked.connect(self.on_select_base) def on_select_base(self): base = QFileDialog.getExistingDirectory(self, 'Select Base Folder', DS.__str__()) self.l_base.setText(Path(base).__str__()) def update_b_play_live_enabled(self): self.b_play_live.setEnabled(all([self.b1, self.b2])) def on_base(self): if Path(self.l_base.text()).exists(): self.p.send(SetBaseMsg(self.l_base.text())) self.b1 = True self.update_b_play_live_enabled() def set_device_connection(self, status: LinkStatus): match status: case LinkStatus.RED: self.device_connected = False self.c_seq_meta.setEnabled(False) self.lb_device_connection.setText('Disconnected') self.lb_device_connection.setStyleSheet('background-color: pink;') case LinkStatus.YELLOW: self.lb_device_connection.setText('Waiting') self.lb_device_connection.setStyleSheet('background-color: yellow;') case LinkStatus.GREEN: self.device_connected = True self.c_seq_meta.setEnabled(True) self.lb_device_connection.setText('Connected') self.lb_device_connection.setStyleSheet('background-color: LightGreen;') def set_device_enable(self, status: LinkStatus): match status: case LinkStatus.RED: self.device_enabled = False self.lb_device_enable.setText('Disabled') self.lb_device_enable.setStyleSheet('background-color: pink;') case LinkStatus.YELLOW: self.lb_device_enable.setText('Waiting') self.lb_device_enable.setStyleSheet('background-color: yellow;') case LinkStatus.GREEN: self.device_enabled = True self.lb_device_enable.setText('Enabled') self.lb_device_enable.setStyleSheet('background-color: LightGreen;') def update_c_playback_seq_name_enable(self): self.c_playback_seq_name.setEnabled(all([self.b3, self.b4])) def on_play_live(self): self.b_play_live.setStyleSheet('background-color: red;') self.b_play_playback.setStyleSheet('') self.b_record.setEnabled(True) self.l_record_commit.setEnabled(True) self.p.send(SeqMetaMsg('live', self.c_seq_meta.itemText(self.c_seq_meta.currentIndex()))) self.p.send(SetPlayMode('live')) self.b3 = False self.update_c_playback_seq_name_enable() def on_play_playback(self): self.b_play_live.setStyleSheet('') self.b_play_playback.setStyleSheet('background-color: red;') self.b_record.setEnabled(False) self.l_record_commit.setEnabled(False) self.p.send(SeqMetaMsg('playback', self.c_playback_seq_name.itemText(self.c_playback_seq_name.currentIndex()))) self.p.send(SetPlayMode('playback')) logger.debug(f'set playmode playback') self.b3 = True self.update_c_playback_seq_name_enable() def on_device_disable(self): self.p.send(SetDeviceEnabledMsg(False)) self.b_device_enable.setEnabled(False) self.set_device_enable(LinkStatus.YELLOW) def on_device_enable(self): self.p.send(SetDeviceEnabledMsg(True)) self.b_device_enable.setEnabled(False) self.set_device_enable(LinkStatus.YELLOW) def on_device_disconnect(self): self.p.send(SetDeviceConnectedMsg(False)) self.b_device_connection.setEnabled(False) self.set_device_connection(LinkStatus.YELLOW) def on_device_connect(self): self.p.send(SetDeviceConnectedMsg(True)) self.b_device_connection.setEnabled(False) self.set_device_connection(LinkStatus.YELLOW) def update_device_buttons(self): if self.device_connected and self.device_enabled: self.b_device_connection.setText('Disconnect') self.b_device_connection.setEnabled(False) self.b_device_enable.setText('Disable') self.b_device_enable.setEnabled(True) try: self.b_device_connection.clicked.disconnect() except: pass try: self.b_device_enable.clicked.disconnect() except: pass self.b_device_enable.clicked.connect(self.on_device_disable) if self.device_connected and not self.device_enabled: self.b_device_connection.setText('Disconnect') self.b_device_connection.setEnabled(True) self.b_device_enable.setText('Enable') self.b_device_enable.setEnabled(True) try: self.b_device_connection.clicked.disconnect() except: pass try: self.b_device_enable.clicked.disconnect() except: pass self.b_device_enable.clicked.connect(self.on_device_enable) self.b_device_connection.clicked.connect(self.on_device_disconnect) if not self.device_connected and self.device_enabled: raise Exception("wtf?") if not self.device_connected and not self.device_enabled: self.b_device_connection.setText('Connect') self.b_device_connection.setEnabled(True) self.b_device_enable.setText('Enable') self.b_device_enable.setEnabled(False) try: self.b_device_connection.clicked.disconnect() except: pass try: self.b_device_enable.clicked.disconnect() except: pass self.b_device_connection.clicked.connect(self.on_device_connect) def on_record(self): if self.record: self.l_record_commit.setEnabled(True) self.p.send(SetRecordMsg(False)) self.record = False self.b_record.setStyleSheet('') else: self.l_record_commit.setEnabled(False) if self.l_record_commit.text() != '': self.record_size_cnt = 0 self.record_frame_cnt = 0 self.p.send(SetRecordMsg(True, self.l_record_commit.text(), self.l_base.text())) self.record = True self.b_record.setStyleSheet('background-color: red;') else: QMessageBox.warning(None, 'hint', 'Commit is empty!!') def on_zmq_event(self, msg: QByteArray): msg = Msg.decode_msg(msg.data()) if isinstance(msg, KillMsg): if msg.name == '': self.close() elif isinstance(msg, ImageArgMsg): self.s_t_end.setValue(msg.t_start) self.s_t_end.setValue(msg.t_end) elif isinstance(msg, MoveAxisMsg): match msg.axis: case 'S': self.s_sid.setValue(msg.value) self.l_seq_current.setText(str(self.seq_id_list[msg.value])) elif isinstance(msg, SeqIdList): self.seq_id_list = msg.li self.s_sid.setMaximum(msg.li.__len__() - 1) self.sp_sid.setMaximum(msg.li.__len__() - 1) self.l_seq_min.setText(str(min(msg.li))) self.l_seq_max.setText(str(max(msg.li))) elif isinstance(msg, SeqListMsg): self.b4 = True self.update_c_playback_seq_name_enable() self.c_playback_seq_name.clear() for name in msg.value: self.c_playback_seq_name.addItem(name) if msg.value.__len__() > 0: self.p.send(SelectSeqMsg(self.c_playback_seq_name.itemText(self.c_playback_seq_name.currentIndex()))) self.b_play_playback.setEnabled(True) elif isinstance(msg, SeqMetaMsg): self.seq_meta = RfSequenceMeta.from_name(msg.name) self.s_t_start.setMaximum(max(self.seq_meta.shape)) self.s_t_end.setMaximum(max(self.seq_meta.shape)) elif isinstance(msg, DeviceConnectedMsg): if msg.value: self.set_device_connection(LinkStatus.GREEN) else: self.set_device_connection(LinkStatus.RED) self.update_device_buttons() elif isinstance(msg, DeviceEnabledMsg): if msg.value: self.set_device_enable(LinkStatus.GREEN) else: self.set_device_enable(LinkStatus.RED) self.update_device_buttons() elif isinstance(msg, DeviceOnlineMsg): if msg.value: self.l_online.setStyleSheet("") self.l_online.setText("Device Online") else: self.l_online.setStyleSheet("background-color: pink;") self.l_online.setText("Device Offline") elif isinstance(msg, DeviceConfigListMsg): for name, txt in msg.arr: self.c_seq_meta.addItem(name, txt) elif isinstance(msg, RobotRtsiMsg): self.l_robot_pos.setText(','.join([f'{v:.3f}' for v in msg.pos])) self.l_robot_force.setText(','.join([f'{v:.3f}' for v in msg.force])) elif isinstance(msg, RecordFrameMsg): self.record_frame_cnt += 1 self.record_size_cnt += msg.size self.l_record_size.setText(humanbytes(self.record_size_cnt)) self.l_record_frames.setText(str(self.record_frame_cnt)) self.l_record_max_sid.setText(str(msg.current_sid)) def closeEvent(self, event): self.p.send(KillMsg('')) # event.accept() # event.ignore() @QtCore.pyqtSlot(int) def on_t_start(self, v): if self.s_t_end.sender() is None: self.arg.t_start = v self.p.send(self.arg) @QtCore.pyqtSlot(int) def on_t_end(self, v): if self.s_t_end.sender() is None: self.arg.t_end = v self.p.send(self.arg) @QtCore.pyqtSlot(int) def on_select_plyayback_seq_name(self, v): if self.c_playback_seq_name.sender() is None or isinstance(self.c_playback_seq_name.sender(), QFrame): self.p.send(SelectSeqMsg(self.c_playback_seq_name.itemText(v))) @QtCore.pyqtSlot(int) def c_sid(self, v): if self.s_sid.sender() is None: self.sp_sid.setValue(v) self.p.send(MoveAxisMsg('ui', 'S', v)) @QtCore.pyqtSlot(int) def on_seq_meta(self, v): if self.c_seq_meta.sender() is None or isinstance(self.c_seq_meta.sender(), QFrame): if self.c_seq_meta.itemText(v) != 'Empty': self.b2 = True self.update_b_play_live_enabled() self.p.send(SetDeviceConfigMsg(self.c_seq_meta.itemData(v))) self.p.send(SeqMetaMsg('live', self.c_seq_meta.itemText(v))) class MainUI(Node): topics = [ImageArgMsg, SeqIdMinMax, MoveAxisMsg, SeqListMsg, SeqMetaMsg, SeqIdList, DeviceConnectedMsg, DeviceEnabledMsg, DeviceOnlineMsg, DeviceConfigListMsg, RobotRtsiMsg, RecordFrameMsg] def __init__(self, level=logging.INFO): super().__init__(level=level) def loop(self): app = QApplication(sys.argv) if platform.system() == 'Windows': app.setStyle('windowsvista') MainWindow = Adv(self) # MainWindow.move(int(px), int(py)) # MainWindow.resize(int(sx), int(sy)) MainWindow.show() app.exec()