flandre/flandre/nodes/Midi.py
flandre 3a7700df1e move flandre to module
change SOFTWARE_CONFIG
2025-04-12 00:03:28 +08:00

67 lines
2.3 KiB
Python

import logging
from threading import Thread
from unittest import case
import mido
import zmq
from mido import Message
from mido.backends.rtmidi import Input, Output
from flandre.nodes.Node import Node
from flandre.utils.Msg import KillMsg, MidiMsg, Msg, ImageArgMsg
logger = logging.getLogger(__name__)
class Midi(Node):
def __init__(self, level=logging.INFO):
super(Midi, self).__init__(level=level)
self.m_input: Input = None
self.m_output: Output = None
self.do_loop = True
self.t_midi_event_loop: Thread = None
self.isa: zmq.Socket = None
def custom_setup(self):
self.isa = self.c.ctx.socket(zmq.PUSH)
self.isa.bind("inproc://midi")
self.m_input: Input = mido.open_input('SMC-Mixer:SMC-Mixer Bluetooth 128:0')
self.m_output: Output = mido.open_output('SMC-Mixer:SMC-Mixer Bluetooth 128:0')
self.t_midi_event_loop = Thread(target=self.midi_event_loop)
self.t_midi_event_loop.start()
def midi_event_loop(self):
while self.do_loop:
midi_msg: Message = self.m_input.receive()
# print(midi_msg)
d = midi_msg.dict()
match d['type']:
case 'pitchwheel':
channel = d['channel']
# 0-127
pitch = int(d['pitch'] / 128) + 64
# pitch_p = int(pitch / 127)
self.isa.send(MidiMsg(type='pitchwheel', channel=channel, pitch=pitch).encode_msg())
def loop(self):
isb = self.c.ctx.socket(zmq.PULL)
isb.connect("inproc://midi")
self.c.poller.register(isb, zmq.POLLIN)
while True:
p = dict(self.c.poller.poll())
if isb in p:
msg = Msg.decode_msg(isb.recv())
match msg.type:
case 'pitchwheel':
match msg.channel:
case 0:
# print(msg.pitch)
self.send(ImageArgMsg('midi', 0, msg.pitch * 3))
if self.c.sub in p:
msg = self.recv()
if isinstance(msg, KillMsg):
if msg.name == '':
self.do_loop = False
return