flandre/flandre/launcher.py
2025-04-19 23:29:19 +08:00

103 lines
2.8 KiB
Python

import logging
import multiprocessing
import os
import time
import tomllib
from enum import Enum
from pathlib import Path
from flandre.BusClient import BusClient
from flandre.kde_pyqt6_mainui import kde_pyqt6_mainui
from flandre.nodes.Beamformer import Beamformer
from flandre.nodes.Broker import Broker
from flandre.nodes.Device import Device
from flandre.nodes.ImageCV import ImageCV
from flandre.nodes.Loader import Loader
from flandre.nodes.MainUI import MainUI
from flandre.nodes.Muxer import Muxer
from flandre.nodes.Robot import Robot
from flandre.nodes.ImageFFMPEG import ImageFFMPEG
from flandre.nodes.ImageQt import ImageQt
from flandre.nodes.Midi import Midi
from flandre.nodes.Mi import Mi
from flandre.nodes.Recorder import Recorder
from flandre.utils.Msg import KillMsg, NodeOnlineMsg, Msg1, Msg2
from flandre.config import CONFIG_FOLDER
class LaunchComponent(Enum):
MainUI = MainUI
ImageCV = ImageCV
Muxer = Muxer
Robot = Robot
Loader = Loader
Device = Device
Recorder = Recorder
Beamformer = Beamformer
ImageFFMPEG = ImageFFMPEG
ImageQt = ImageQt
Midi = Midi
Mi = Mi
def launch(arg: dict[LaunchComponent, dict]):
logging.basicConfig(level=logging.INFO)
multiprocessing.set_start_method('spawn')
bp = multiprocessing.Process(target=Broker(broker=True))
bp.start()
ps = []
for k, v in arg.items():
if k == LaunchComponent.MainUI and os.environ.get('XDG_CURRENT_DESKTOP', None) == 'KDE':
ps.append(kde_pyqt6_mainui)
continue
ps.append(k.value(**v))
pps = []
for p in ps:
pps.append(multiprocessing.Process(target=p))
for p in pps:
p.start()
c = BusClient(KillMsg, NodeOnlineMsg)
cnt = 0
ready = False
dd = dict()
while True:
if ready:
msg = c.recv()
else:
c.send(Msg2())
msg = c.poll(100)
if msg is None:
continue
if isinstance(msg, KillMsg):
if msg.name == '':
break
if not ready:
if isinstance(msg, NodeOnlineMsg):
if msg.name not in dd:
logging.info(msg)
dd[msg.name] = 1
if dd.keys().__len__() == len(ps):
logging.info(f'launcher stand by ready')
c.send(Msg1())
ready = True
for p in pps:
p.kill()
bp.kill()
def launch_from_file(file: Path):
if not file.exists():
file.parent.mkdir(parents=True, exist_ok=True)
file.write_text('[MainUI]\n')
r = tomllib.loads(file.read_text())
arg_d = dict()
for k, v in r.items():
arg_d[LaunchComponent[k]] = v
launch(arg_d)
if __name__ == '__main__':
launch_from_file(CONFIG_FOLDER / 'gui.toml')