fix launch sleep
fix device connect
This commit is contained in:
parent
41b0e01d7d
commit
339194200c
@ -1,5 +1,7 @@
|
||||
import logging
|
||||
|
||||
import zmq
|
||||
from zmq import Context
|
||||
from zmq import Context, Socket
|
||||
|
||||
from flandre.utils.Msg import Msg, InterruptMsg
|
||||
|
||||
@ -17,6 +19,7 @@ class BusClient:
|
||||
poller=False,
|
||||
req_socket_str: str = None,
|
||||
):
|
||||
self.sub: Socket = None
|
||||
if ctx is None:
|
||||
self.ctx: Context = zmq.Context()
|
||||
else:
|
||||
@ -38,9 +41,14 @@ class BusClient:
|
||||
self.pub.connect(f'tcp://127.0.0.1:{self.fp}')
|
||||
self.req_socket = None
|
||||
if req_socket_str is not None:
|
||||
self.poller2 = zmq.Poller()
|
||||
self.sub2 = self.ctx.socket(zmq.SUB)
|
||||
self.sub2.setsockopt(zmq.SUBSCRIBE, InterruptMsg.magic() + InterruptMsg.eid())
|
||||
# self.sub2.connect(f'tcp://127.0.0.1:{self.bp}')
|
||||
self.req_socket_str = req_socket_str
|
||||
self.poller.register(self.req_socket, zmq.POLLIN)
|
||||
self.req_socket = self.ctx.socket(zmq.REQ)
|
||||
self.poller2.register(self.req_socket, zmq.POLLIN)
|
||||
self.poller2.register(self.sub2, zmq.POLLIN)
|
||||
self.req_socket.connect(req_socket_str)
|
||||
# todo fix poller
|
||||
|
||||
@ -48,6 +56,11 @@ class BusClient:
|
||||
b = self.sub.recv()
|
||||
return Msg.decode_msg(b)
|
||||
|
||||
def poll(self, timeout):
|
||||
b = self.sub.poll(timeout)
|
||||
if b != 0:
|
||||
return self.recv()
|
||||
|
||||
def send(self, msg: Msg):
|
||||
return self.pub.send(msg.encode_msg())
|
||||
|
||||
@ -58,17 +71,34 @@ class BusClient:
|
||||
async def send_async(self, msg: Msg):
|
||||
return self.pub.send(msg.encode_msg())
|
||||
|
||||
def req_interrupt(self, b: bytes, interrupt_name: str, timeout=3000):
|
||||
while True:
|
||||
self.req_socket.send(b)
|
||||
r = dict(self.poller.poll(timeout))
|
||||
def req_interrupt(self,
|
||||
data: bytes,
|
||||
interrupt_name: str,
|
||||
timeout=3000,
|
||||
retry_times=114514,
|
||||
cb_retry=None,
|
||||
):
|
||||
self.sub2.connect(f'tcp://127.0.0.1:{self.bp}')
|
||||
|
||||
for _ in range(retry_times):
|
||||
self.req_socket.send(data)
|
||||
r = dict(self.poller2.poll(timeout))
|
||||
if self.req_socket in r:
|
||||
self.sub2.disconnect(f'tcp://127.0.0.1:{self.bp}')
|
||||
return self.req_socket.recv()
|
||||
if cb_retry is not None:
|
||||
cb_retry()
|
||||
self.poller2.unregister(self.req_socket)
|
||||
self.req_socket.close()
|
||||
self.req_socket = self.ctx.socket(zmq.REQ)
|
||||
self.poller2.register(self.req_socket, zmq.POLLIN)
|
||||
self.req_socket.connect(self.req_socket_str)
|
||||
if self.sub in r:
|
||||
msg = self.recv()
|
||||
if self.sub2 in r:
|
||||
msg = Msg.decode_msg(self.sub2.recv())
|
||||
if isinstance(msg, InterruptMsg):
|
||||
if msg.value == interrupt_name:
|
||||
self.sub2.disconnect(f'tcp://127.0.0.1:{self.bp}')
|
||||
return None
|
||||
logging.warning('timeout')
|
||||
self.sub2.disconnect(f'tcp://127.0.0.1:{self.bp}')
|
||||
return 'timeout'
|
||||
|
||||
@ -20,7 +20,7 @@ from flandre.nodes.ImageFFMPEG import ImageFFMPEG
|
||||
from flandre.nodes.ImageQt import ImageQt
|
||||
from flandre.nodes.Midi import Midi
|
||||
from flandre.nodes.Mi import Mi
|
||||
from flandre.utils.Msg import KillMsg, NodeOnlineMsg, Msg1
|
||||
from flandre.utils.Msg import KillMsg, NodeOnlineMsg, Msg1, Msg2
|
||||
from flandre.config import CONFIG_FOLDER
|
||||
|
||||
|
||||
@ -58,18 +58,28 @@ def launch(arg: dict[LaunchComponent, dict]):
|
||||
|
||||
c = BusClient(KillMsg, NodeOnlineMsg)
|
||||
cnt = 0
|
||||
ready = False
|
||||
dd = dict()
|
||||
while True:
|
||||
msg = c.recv()
|
||||
if ready:
|
||||
msg = c.recv()
|
||||
else:
|
||||
c.send(Msg2())
|
||||
msg = c.poll(100)
|
||||
if msg is None:
|
||||
continue
|
||||
if isinstance(msg, KillMsg):
|
||||
if msg.name == '':
|
||||
break
|
||||
if isinstance(msg, NodeOnlineMsg):
|
||||
cnt += 1
|
||||
logging.info(msg)
|
||||
if cnt == len(ps):
|
||||
logging.info(f'launcher stand by ready')
|
||||
c.send(Msg1())
|
||||
|
||||
if not ready:
|
||||
if isinstance(msg, NodeOnlineMsg):
|
||||
if msg.name not in dd:
|
||||
logging.info(msg)
|
||||
dd[msg.name] = 1
|
||||
if dd.keys().__len__() == len(ps):
|
||||
logging.info(f'launcher stand by ready')
|
||||
c.send(Msg1())
|
||||
ready = True
|
||||
for p in pps:
|
||||
p.kill()
|
||||
bp.kill()
|
||||
|
||||
@ -7,12 +7,11 @@ from threading import Thread
|
||||
|
||||
import zmq
|
||||
|
||||
from flandre.BusClient import BusClient
|
||||
from flandre.config import C
|
||||
from flandre.nodes.Node import Node
|
||||
from flandre.utils.Msg import ImageArgMsg, KillMsg, SetDeviceConnectedMsg, SetDeviceEnabledMsg, DeviceEnabledMsg, \
|
||||
DeviceConnectedMsg, SetDeviceConfigMsg, DeviceOnlineMsg, DeviceConfigListMsg, RequestRfFrameMsg, \
|
||||
DeviceZero, SetDeviceSwitchMsg, DeviceSwitchMsg, SeqMetaMsg, RefreshDeviceMsg, RfFrameMsg
|
||||
DeviceZero, DeviceSwitchMsg, SeqMetaMsg, RfFrameMsg
|
||||
from flandre.utils.RfFrame import RfFrameMemory
|
||||
from flandre.utils.RfMeta import RfFrameMeta, RfSequenceMeta
|
||||
|
||||
@ -42,10 +41,9 @@ class Device(Node):
|
||||
]
|
||||
|
||||
def __init__(self, level=logging.INFO):
|
||||
super(Device, self).__init__(level=level)
|
||||
super(Device, self).__init__(level=level, req=C.live_rep_socket)
|
||||
self.arg = ImageArgMsg('', t_start=0, t_end=1499)
|
||||
self.seq_meta: RfSequenceMeta | None = None
|
||||
self.req_driver_socket: zmq.Socket = None
|
||||
self.rep_socket: zmq.Socket = None
|
||||
self.ok = b'ok\x00'
|
||||
self.loop2_t = None
|
||||
@ -56,53 +54,42 @@ class Device(Node):
|
||||
self.online()
|
||||
time.sleep(1)
|
||||
|
||||
def d2b(self, cmd: DeviceCmd, v: bytes = b''):
|
||||
return struct.pack('i', self.magic) + struct.pack('i', cmd.value) + v
|
||||
|
||||
def device_cmd(self, cmd: DeviceCmd, v: bytes = b''):
|
||||
self.req_driver_socket.send(struct.pack('i', self.magic) + struct.pack('i', cmd.value) + v)
|
||||
return self.c.req_interrupt(
|
||||
self.d2b(cmd, v),
|
||||
interrupt_name='device123',
|
||||
retry_times=5,
|
||||
cb_retry=lambda : logger.warning(f'retry {cmd}'),
|
||||
)
|
||||
|
||||
def connect(self):
|
||||
temp_client = BusClient(SetDeviceSwitchMsg, RefreshDeviceMsg, poller=True)
|
||||
temp_client.poller.register(self.req_driver_socket, zmq.POLLIN)
|
||||
self.device_cmd(DeviceCmd.SetConnectionOn)
|
||||
logger.warning('onn')
|
||||
d = dict(temp_client.poller.poll())
|
||||
|
||||
if self.req_driver_socket in d:
|
||||
rb = self.req_driver_socket.recv()
|
||||
if rb == self.ok:
|
||||
self.send(DeviceConnectedMsg(True))
|
||||
# r = self.c.req_interrupt(
|
||||
# self.d2b(DeviceCmd.SetConnectionOn),
|
||||
# retry_times=3,
|
||||
# interrupt_name='device123',
|
||||
# cb_retry=lambda: logger.warning('retry'))
|
||||
r = self.device_cmd(DeviceCmd.SetConnectionOn)
|
||||
if r == self.ok:
|
||||
self.send(DeviceConnectedMsg(True))
|
||||
else:
|
||||
if r is None:
|
||||
logger.error('interrupt')
|
||||
else:
|
||||
logger.error(f"Device msg: {rb}")
|
||||
if temp_client.sub in d:
|
||||
msg = temp_client.recv()
|
||||
if isinstance(msg, SetDeviceSwitchMsg):
|
||||
if not msg.value:
|
||||
self.switch = False
|
||||
logger.warning(f"interrupt connecting")
|
||||
self.req_driver_socket.close()
|
||||
self.context.term()
|
||||
self.context = zmq.Context()
|
||||
self.req_driver_socket = self.context.socket(zmq.REQ)
|
||||
self.req_driver_socket.connect(C.live_rep_socket)
|
||||
if isinstance(msg, RefreshDeviceMsg):
|
||||
self.switch = False
|
||||
logger.warning(f"interrupt connecting")
|
||||
self.req_driver_socket.close()
|
||||
self.context.term()
|
||||
self.context = zmq.Context()
|
||||
self.req_driver_socket = self.context.socket(zmq.REQ)
|
||||
self.req_driver_socket.connect(C.live_rep_socket)
|
||||
logger.error(f"Device msg: {r}")
|
||||
self.send(DeviceConnectedMsg(False))
|
||||
|
||||
def disconnect(self):
|
||||
self.device_cmd(DeviceCmd.SetConnectionOff)
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.SetConnectionOff)
|
||||
if rb == self.ok:
|
||||
self.send(DeviceConnectedMsg(False))
|
||||
else:
|
||||
logger.error(f"Device msg: {rb}")
|
||||
|
||||
def enable(self):
|
||||
self.device_cmd(DeviceCmd.SetEnableOn)
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.SetEnableOn)
|
||||
if rb == self.ok:
|
||||
self.send(DeviceEnabledMsg(True))
|
||||
return True
|
||||
@ -111,8 +98,7 @@ class Device(Node):
|
||||
return False
|
||||
|
||||
def disable(self):
|
||||
self.device_cmd(DeviceCmd.SetEnableOff)
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.SetEnableOff)
|
||||
if rb == self.ok:
|
||||
self.send(DeviceEnabledMsg(False))
|
||||
return True
|
||||
@ -133,8 +119,7 @@ class Device(Node):
|
||||
return True
|
||||
|
||||
def get_enable(self):
|
||||
self.device_cmd(DeviceCmd.GetEnable)
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.GetEnable)
|
||||
match rb:
|
||||
case b'true':
|
||||
self.send(DeviceEnabledMsg(True))
|
||||
@ -146,16 +131,14 @@ class Device(Node):
|
||||
logger.error(f"Device msg: {rb}")
|
||||
|
||||
def get_seq_meta_name(self):
|
||||
self.device_cmd(DeviceCmd.GetName)
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.GetName)
|
||||
name = rb.decode()
|
||||
self.seq_meta = RfSequenceMeta.from_name(name)
|
||||
if rb != b'':
|
||||
self.send(SeqMetaMsg('live', name))
|
||||
|
||||
def get_connection(self):
|
||||
self.device_cmd(DeviceCmd.GetConnection)
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.GetConnection)
|
||||
match rb:
|
||||
case b'true':
|
||||
self.send(DeviceConnectedMsg(True))
|
||||
@ -178,9 +161,8 @@ class Device(Node):
|
||||
|
||||
def set_name_and_file_only(self, name: str, txt: str):
|
||||
name_encoded = name.encode()
|
||||
self.device_cmd(DeviceCmd.SetNameAndFileOnly,
|
||||
struct.pack('I', name_encoded.__len__()) + name.encode() + txt.encode())
|
||||
rb = self.req_driver_socket.recv()
|
||||
rb = self.device_cmd(DeviceCmd.SetNameAndFileOnly,
|
||||
struct.pack('I', name_encoded.__len__()) + name.encode() + txt.encode())
|
||||
if rb == self.ok:
|
||||
return True
|
||||
else:
|
||||
@ -188,21 +170,17 @@ class Device(Node):
|
||||
return False
|
||||
|
||||
def get_data(self):
|
||||
self.device_cmd(DeviceCmd.GetData)
|
||||
return self.req_driver_socket.recv()
|
||||
|
||||
return self.device_cmd(DeviceCmd.GetData)
|
||||
|
||||
def set_zero(self):
|
||||
self.device_cmd(DeviceCmd.SetZero)
|
||||
return self.req_driver_socket.recv()
|
||||
return self.device_cmd(DeviceCmd.SetZero)
|
||||
|
||||
def custom_setup(self):
|
||||
self.rep_socket = self.context.socket(zmq.REP)
|
||||
self.rep_socket.bind(f"tcp://localhost:{C.driver_rep_port}")
|
||||
self.c.poller.register(self.rep_socket, zmq.POLLIN)
|
||||
|
||||
self.req_driver_socket = self.context.socket(zmq.REQ)
|
||||
self.req_driver_socket.connect(C.live_rep_socket)
|
||||
|
||||
def loop(self):
|
||||
self.loop2_t = Thread(target=self.loop2)
|
||||
self.loop2_t.start()
|
||||
|
||||
@ -120,13 +120,12 @@ class Adv(QMainWindow, Ui_MainWindow):
|
||||
self.b_probe_stop.clicked.connect(self.on_probe('stop'))
|
||||
|
||||
self.b_device_zero.clicked.connect(lambda: self.p.send(DeviceZero()))
|
||||
self.b_us_switch.clicked.connect(self.on_deivce_switch)
|
||||
self.b_us_refresh.clicked.connect(self.on_deivce_refresh)
|
||||
|
||||
self.mi_req_socket = zmq.Context().socket(zmq.REQ)
|
||||
self.mi_req_socket.connect(C.mi_rep_socket)
|
||||
|
||||
def on_deivce_switch(self):
|
||||
@pyqtSlot()
|
||||
def on_b_us_switch_clicked(self):
|
||||
match self.device_switch_state:
|
||||
case LinkStatus.RED:
|
||||
self.p.send(SetDeviceSwitchMsg(True))
|
||||
@ -134,7 +133,8 @@ class Adv(QMainWindow, Ui_MainWindow):
|
||||
case LinkStatus.GREEN | LinkStatus.YELLOW | LinkStatus.ORANGE:
|
||||
self.p.send(SetDeviceSwitchMsg(False))
|
||||
|
||||
def on_deivce_refresh(self):
|
||||
@pyqtSlot()
|
||||
def on_b_us_refresh_clicked(self):
|
||||
match self.device_switch_state:
|
||||
case LinkStatus.GREEN | LinkStatus.YELLOW | LinkStatus.ORANGE:
|
||||
self.p.send(RefreshDeviceMsg())
|
||||
|
||||
@ -6,7 +6,7 @@ import zmq
|
||||
|
||||
from flandre.config import C
|
||||
from flandre.nodes.Node import Node
|
||||
from flandre.utils.Msg import KillMsg, SetDeviceSwitchMsg, DeviceSwitchMsg, RefreshDeviceMsg
|
||||
from flandre.utils.Msg import KillMsg, SetDeviceSwitchMsg, DeviceSwitchMsg, RefreshDeviceMsg, InterruptMsg
|
||||
from flandre.utils.mi import c1_connect, c1_connected, c1_disconnect
|
||||
from flandre.utils.network import check_port, check_socket
|
||||
|
||||
@ -106,7 +106,7 @@ class Mi(Node):
|
||||
# self.mi_rep_socket.send(b'ok')
|
||||
if self.c.sub in r:
|
||||
msg = self.recv()
|
||||
logger.info(f'{msg}')
|
||||
# logger.info(f'{msg}')
|
||||
if isinstance(msg, KillMsg):
|
||||
if msg.name == '':
|
||||
return
|
||||
@ -121,6 +121,7 @@ class Mi(Node):
|
||||
self.device_py_req_socket.recv()
|
||||
self.send(DeviceSwitchMsg('RED'))
|
||||
elif isinstance(msg, RefreshDeviceMsg):
|
||||
self.send(InterruptMsg('device123'))
|
||||
self.device_py_req_socket.send(b'kill')
|
||||
self.device_py_req_socket.recv()
|
||||
self.device_py_req_socket.send(b'start')
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
|
||||
import zmq
|
||||
|
||||
from flandre.BusClient import BusClient
|
||||
from flandre.utils.Msg import Msg, KillMsg, NodeOnlineMsg, Msg1
|
||||
from flandre.utils.Msg import Msg, KillMsg, NodeOnlineMsg, Msg1, Msg2, InterruptMsg
|
||||
|
||||
|
||||
class Node:
|
||||
@ -13,7 +14,7 @@ class Node:
|
||||
bp = BusClient.bp
|
||||
topics = []
|
||||
|
||||
def __init__(self, enable_init=True, level=logging.INFO, conflare=False, broker=False, req=False):
|
||||
def __init__(self, enable_init=True, level=logging.INFO, conflare=False, broker=False, req=None):
|
||||
self.enable_init = enable_init
|
||||
self.isalive = True
|
||||
self.level = level
|
||||
@ -24,6 +25,9 @@ class Node:
|
||||
def recv(self):
|
||||
return self.c.recv()
|
||||
|
||||
def poll(self, timeout: int):
|
||||
return self.c.poll(timeout)
|
||||
|
||||
def send(self, msg: Msg):
|
||||
return self.c.send(msg)
|
||||
|
||||
@ -56,16 +60,20 @@ class Node:
|
||||
|
||||
self.context = zmq.Context()
|
||||
if self.enable_init:
|
||||
self.c = BusClient(*([KillMsg, Msg1] + self.topics),
|
||||
self.c = BusClient(*([KillMsg,InterruptMsg, Msg1, Msg2] + self.topics),
|
||||
poller=True, conflare=self.conflare, req_socket_str=self.req)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.setup()
|
||||
if not self.broker:
|
||||
time.sleep(2)
|
||||
self.send(NodeOnlineMsg(self.__class__.__name__))
|
||||
# input('asdasd')
|
||||
msg = self.recv()
|
||||
# logging.info(f'{self.__class__.__name__},{msg}')
|
||||
while True:
|
||||
msg = self.poll(100)
|
||||
if msg is not None:
|
||||
break
|
||||
while True:
|
||||
self.send(NodeOnlineMsg(self.__class__.__name__))
|
||||
msg = self.recv()
|
||||
if isinstance(msg, Msg1):
|
||||
break
|
||||
self.loop()
|
||||
print(self.__class__.__name__, 'exiting')
|
||||
|
||||
Loading…
Reference in New Issue
Block a user