import logging import sys from enum import Enum, auto from PyQt6 import QtCore from PyQt6.QtCore import QByteArray from PyQt6.QtWidgets import QMainWindow, QApplication, QFrame from Main import Ui_MainWindow from ZMQReceiver import ZMQReceiver from nodes.Node import Node from utils.Msg import KillMsg, Msg, ImageArgMsg, SelectSeqMsg, SeqIdMinMax, MoveAxisMsg, SeqListMsg, SetBaseMsg, \ SeqMetaMsg, SetPlayMode, DeviceConnectedMsg, DeviceEnabledMsg, DeviceOnlineMsg, SetDeviceEnabledMsg, \ SetDeviceConnectedMsg, DeviceConfigListMsg, SetDeviceConfigMsg, SetRecordMsg, RobotRtsiMsg from utils.RfFile import RfSequenceMeta class LinkStatus(Enum): RED = auto() YELLOW = auto() GREEN = auto() class Adv(QMainWindow, Ui_MainWindow): def __init__(self, p: Node, parent=None): super(Adv, self).__init__(parent) self.p = p self.setupUi(self) zmq_receiver = ZMQReceiver(self) zmq_receiver.zmq_event.connect(self.on_zmq_event) zmq_receiver.start() self.s_t_start.valueChanged.connect(self.on_t_start) self.s_t_end.valueChanged.connect(self.on_t_end) self.comboBox.currentIndexChanged.connect(self.cbc) self.s_sid.valueChanged.connect(self.c_sid) self.arg = ImageArgMsg('ui', t_start=0, t_end=1499) self.b_base.clicked.connect(lambda: self.p.send(SetBaseMsg(self.l_base.text()))) self.seq_meta: RfSequenceMeta | None = None self.b_play_live.clicked.connect(self.on_play_live) self.b_play_playback.clicked.connect(self.on_play_playback) self.b_record.clicked.connect(self.on_record) self.record = False self.device_connected = False self.device_enabled = False # self.b_device_enabled.clicked.connect(lambda: self.p.send(SetDeviceEnabledMsg(not self.device_enabled))) # self.b_device_connected.clicked.connect(lambda: self.p.send(SetDeviceConnectedMsg(not self.device_connected))) self.c_seq_meta.currentIndexChanged.connect(self.on_m) def set_device_connection(self, status: LinkStatus): match status: case LinkStatus.RED: self.lb_device_connection.setText('Disconnected') self.lb_device_connection.setStyleSheet('background-color: pink;') case LinkStatus.YELLOW: self.lb_device_connection.setText('Waiting') self.lb_device_connection.setStyleSheet('background-color: yellow;') case LinkStatus.GREEN: self.lb_device_connection.setText('Connected') self.lb_device_connection.setStyleSheet('background-color: LightGreen;') def set_device_enable(self, status: LinkStatus): match status: case LinkStatus.RED: self.lb_device_enable.setText('Disabled') self.lb_device_enable.setStyleSheet('background-color: pink;') case LinkStatus.YELLOW: self.lb_device_enable.setText('Waiting') self.lb_device_enable.setStyleSheet('background-color: yellow;') case LinkStatus.GREEN: self.lb_device_enable.setText('Enabled') self.lb_device_enable.setStyleSheet('background-color: LightGreen;') def on_play_live(self): self.b_play_live.setStyleSheet('background-color: red;') self.b_play_playback.setStyleSheet('') self.b_record.setEnabled(True) self.l_record_commit.setEnabled(True) self.p.send(SeqMetaMsg(self.c_seq_meta.itemText(self.c_seq_meta.currentIndex()))) self.p.send(SetPlayMode('live')) self.comboBox.setEnabled(False) def on_play_playback(self): self.b_play_live.setStyleSheet('') self.b_play_playback.setStyleSheet('background-color: red;') self.b_record.setEnabled(False) self.l_record_commit.setEnabled(False) self.p.send(SeqMetaMsg(self.comboBox.itemText(self.comboBox.currentIndex()))) self.p.send(SetPlayMode('playback')) self.comboBox.setEnabled(True) def on_device_disable(self): self.p.send(SetDeviceEnabledMsg(False)) self.b_device_enable.setEnabled(False) self.set_device_enable(LinkStatus.YELLOW) def on_device_enable(self): self.p.send(SetDeviceEnabledMsg(True)) self.b_device_enable.setEnabled(False) self.set_device_enable(LinkStatus.YELLOW) def on_device_disconnect(self): self.p.send(SetDeviceConnectedMsg(False)) self.b_device_connection.setEnabled(False) self.set_device_connection(LinkStatus.YELLOW) def on_device_connect(self): self.p.send(SetDeviceConnectedMsg(True)) self.b_device_connection.setEnabled(False) self.set_device_connection(LinkStatus.YELLOW) def update_device_buttons(self): if self.device_connected and self.device_enabled: self.b_device_connection.setText('Disconnect') self.b_device_connection.setEnabled(False) self.b_device_enable.setText('Disable') self.b_device_enable.setEnabled(True) try: self.b_device_connection.clicked.disconnect() except: pass try: self.b_device_enable.clicked.disconnect() except: pass self.b_device_enable.clicked.connect(self.on_device_disable) if self.device_connected and not self.device_enabled: self.b_device_connection.setText('Disconnect') self.b_device_connection.setEnabled(True) self.b_device_enable.setText('Enable') self.b_device_enable.setEnabled(True) try: self.b_device_connection.clicked.disconnect() except: pass try: self.b_device_enable.clicked.disconnect() except: pass self.b_device_enable.clicked.connect(self.on_device_enable) self.b_device_connection.clicked.connect(self.on_device_disconnect) if not self.device_connected and self.device_enabled: raise Exception("wtf?") if not self.device_connected and not self.device_enabled: self.b_device_connection.setText('Connect') self.b_device_connection.setEnabled(True) self.b_device_enable.setText('Enable') self.b_device_enable.setEnabled(False) try: self.b_device_connection.clicked.disconnect() except: pass try: self.b_device_enable.clicked.disconnect() except: pass self.b_device_connection.clicked.connect(self.on_device_connect) def on_record(self): if self.record: self.p.send(SetRecordMsg(False)) self.record = False self.b_record.setStyleSheet('') else: if self.l_record_commit.text() != '': self.p.send(SetRecordMsg(True, self.l_record_commit.text())) self.record = True self.b_record.setStyleSheet('background-color: red;') def on_zmq_event(self, msg: QByteArray): msg = Msg.decode_msg(msg.data()) if isinstance(msg, KillMsg): if msg.name == '': self.close() elif isinstance(msg, ImageArgMsg): self.s_t_end.setValue(msg.t_start) self.s_t_end.setValue(msg.t_end) elif isinstance(msg, MoveAxisMsg): match msg.axis: case 'S': self.s_sid.setValue(msg.value) elif isinstance(msg, SeqIdMinMax): self.s_sid.setMinimum(msg.min) self.s_sid.setMaximum(msg.max) self.s_sid.setValue(msg.min) elif isinstance(msg, SeqListMsg): self.comboBox.setEnabled(True) self.comboBox.clear() for name in msg.value: self.comboBox.addItem(name) self.p.send(SelectSeqMsg(self.comboBox.itemText(self.comboBox.currentIndex()))) self.b_play_playback.setEnabled(True) elif isinstance(msg, SeqMetaMsg): self.seq_meta = RfSequenceMeta.from_name(msg.s) self.s_t_start.setMaximum(max(self.seq_meta.shape)) self.s_t_end.setMaximum(max(self.seq_meta.shape)) elif isinstance(msg, DeviceConnectedMsg): if msg.value: self.device_connected = True self.set_device_connection(LinkStatus.GREEN) else: self.device_connected = False self.set_device_connection(LinkStatus.RED) self.update_device_buttons() elif isinstance(msg, DeviceEnabledMsg): if msg.value: self.device_enabled = True self.set_device_enable(LinkStatus.GREEN) else: self.device_enabled = False self.set_device_enable(LinkStatus.RED) self.update_device_buttons() elif isinstance(msg, DeviceOnlineMsg): if msg.value: self.l_online.setStyleSheet("") self.l_online.setText("Device Online") else: self.l_online.setStyleSheet("background-color: pink;") self.l_online.setText("Device Offline") elif isinstance(msg, DeviceConfigListMsg): for name, txt in msg.arr: self.c_seq_meta.addItem(name, txt) elif isinstance(msg, RobotRtsiMsg): self.l_robot_pos.setText(','.join([f'{v:.3f}' for v in msg.pos])) self.l_robot_force.setText(','.join([f'{v:.3f}' for v in msg.force])) def closeEvent(self, event): self.p.send(KillMsg('')) # event.accept() # event.ignore() @QtCore.pyqtSlot(int) def on_t_start(self, v): if self.s_t_end.sender() is None: self.arg.t_start = v self.p.send(self.arg) @QtCore.pyqtSlot(int) def on_t_end(self, v): if self.s_t_end.sender() is None: self.arg.t_end = v self.p.send(self.arg) @QtCore.pyqtSlot(int) def cbc(self, v): if self.comboBox.sender() is None or isinstance(self.comboBox.sender(), QFrame): self.p.send(SelectSeqMsg(self.comboBox.itemText(v))) @QtCore.pyqtSlot(int) def c_sid(self, v): if self.s_sid.sender() is None: self.p.send(MoveAxisMsg('ui', 'S', v)) @QtCore.pyqtSlot(int) def on_m(self, v): if self.c_seq_meta.sender() is None or isinstance(self.c_seq_meta.sender(), QFrame): if self.c_seq_meta.itemText(v) != 'Empty': self.p.send(SetDeviceConfigMsg(self.c_seq_meta.itemData(v))) self.p.send(SeqMetaMsg(self.c_seq_meta.itemText(v))) class MainUI(Node): topics = [ImageArgMsg, SeqIdMinMax, MoveAxisMsg, SeqListMsg, SeqMetaMsg, DeviceConnectedMsg, DeviceEnabledMsg, DeviceOnlineMsg, DeviceConfigListMsg, RobotRtsiMsg] def __init__(self, level=logging.INFO): super().__init__(level=level) def loop(self): app = QApplication(sys.argv) MainWindow = Adv(self) # MainWindow.move(int(px), int(py)) # MainWindow.resize(int(sx), int(sy)) MainWindow.show() app.exec()