flandre/flandre/launcher.py
2025-05-11 17:46:45 +08:00

142 lines
4.0 KiB
Python

import logging
import multiprocessing
import os
import shutil
import subprocess
import tomllib
from enum import Enum
from pathlib import Path
import click
import platformdirs
from flandre import C
from flandre import P
from flandre.BusClient import BusClient
from flandre.kde_pyqt6_mainui import kde_pyqt6_mainui
from flandre.nodes.Beamformer import Beamformer
from flandre.nodes.Broker import Broker
from flandre.nodes.Device import Device
from flandre.nodes.ImageCV import ImageCV
from flandre.nodes.ImageFFMPEG import ImageFFMPEG
from flandre.nodes.ImageQt import ImageQt
from flandre.nodes.Loader import Loader
from flandre.nodes.MainUI import MainUI
from flandre.nodes.Mi import Mi
from flandre.nodes.Midi import Midi
from flandre.nodes.Muxer import Muxer
from flandre.nodes.Recorder import Recorder
from flandre.nodes.Robot import Robot
from flandre.nodes.VideoQt import VideoQt
from flandre.nodes.Web import Web
from flandre.utils.Msg import KillMsg, NodeOnlineMsg, Msg1, Msg2
class LaunchComponent(Enum):
MainUI = MainUI
ImageCV = ImageCV
Muxer = Muxer
Robot = Robot
Loader = Loader
Device = Device
Recorder = Recorder
Beamformer = Beamformer
ImageFFMPEG = ImageFFMPEG
ImageQt = ImageQt
Midi = Midi
Mi = Mi
Web = Web
VideoQt = VideoQt
def launch(arg: dict[LaunchComponent, dict]):
logging.basicConfig(level=logging.INFO)
multiprocessing.set_start_method('spawn')
bp = multiprocessing.Process(target=Broker(broker=True), kwargs=dict(software_config=C))
bp.start()
ps = []
for k, v in arg.items():
if k == LaunchComponent.MainUI and os.environ.get('XDG_CURRENT_DESKTOP', None) == 'KDE':
ps.append(kde_pyqt6_mainui)
continue
ps.append(k.value(**v))
pps = []
for p in ps:
pps.append(multiprocessing.Process(target=p, kwargs=dict(software_config=C)))
for p in pps:
p.start()
c = BusClient(KillMsg, NodeOnlineMsg)
cnt = 0
ready = False
dd = dict()
while True:
if ready:
msg = c.recv()
else:
c.send(Msg2())
msg = c.poll(100)
if msg is None:
continue
if isinstance(msg, KillMsg):
if msg.name == '':
break
if not ready:
if isinstance(msg, NodeOnlineMsg):
if msg.name not in dd:
logging.info(msg)
dd[msg.name] = 1
if dd.keys().__len__() == len(ps):
logging.info(f'launcher stand by ready')
c.send(Msg1())
ready = True
for p in pps:
p.kill()
bp.kill()
def launch_from_file(file: Path):
assert file.exists()
r = tomllib.loads(file.read_text())
arg_d = dict()
for k, v in r.items():
arg_d[LaunchComponent[k]] = v
launch(arg_d)
@click.command()
@click.option('--data_folder', default=None)
@click.option('--generate_pyqt', default=True)
@click.option('--dev/--no-dev', default=True)
@click.option('-p', '--path', type=str,
default=platformdirs.user_config_path('Flandre', 'Scarlet') / 'launch.toml',
help='Path to launch.toml'
)
def entrypoint(data_folder, generate_pyqt, dev, path):
if dev:
C.config_folder = P.DEV_PROJECT_FOLDER / 'config'
C.log_folder = P.DEV_PROJECT_FOLDER / 'log'
else:
C.read_config(C.software_config_file)
if (pyuic6 := shutil.which('pyuic6')) is None:
print('pyuic6 is not installed')
return
if generate_pyqt:
subprocess.run([pyuic6, '-o', P.PYQT / 'Main.py', P.PYQT / 'Main.ui'])
subprocess.run([pyuic6, '-o', P.PYQT / 'Image.py', P.PYQT / 'Image.ui'])
if data_folder is not None:
C.record_folder = Path(data_folder)
path = Path(path)
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text('[MainUI]\n')
print('Use launch config', path)
launch_from_file(path)
if __name__ == '__main__':
entrypoint()