flandre/src/nodes/JoyStick.py
2025-03-03 22:22:06 +08:00

124 lines
3.6 KiB
Python

import logging
from threading import Thread
import mido
import pyjoystick
import zmq
from mido.backends.rtmidi import Input, Output
from pyjoystick.sdl2 import Key, Joystick, run_event_loop
from nodes.Node import Node
from utils.Msg import KillMsg, Msg, ImageArgMsg, JoystickMsg
logger = logging.getLogger(__name__)
def clamp(x, t):
if -t < x < t:
return 0.0
return x
class Joystick(Node):
def __init__(self, level=logging.INFO):
super(Joystick, self).__init__(level=level)
self.m_input: Input = None
self.m_output: Output = None
self.do_loop = True
self.t_joystick_event_loop: Thread = None
self.isa: zmq.Socket = None
def sendj(self, t=0.1):
d2 = self.d.copy()
self.s2.send_json(dict(
x=clamp(d2['x'], t),
y=clamp(d2['y'], t),
z=clamp(d2['z'], t),
rx=clamp(d2['rx'], t),
ry=clamp(d2['ry'], t),
rz=clamp(d2['rz'], t),
))
def custom_setup(self):
self.isa = self.c.ctx.socket(zmq.PUSH)
self.isa.bind("inproc://midi")
self.s2 = self.c.ctx.socket(zmq.PUSH)
self.s2.connect("tcp://11.6.1.63:5555")
self.t_joystick_event_loop = Thread(target=self.joystick_event_loop)
self.t_joystick_event_loop.start()
self.arg = ImageArgMsg('joystick', 0, 0)
self.arg1 = 0
self.d = dict(
x=0.0,
y=0.0,
z=0.0,
rx=0.0,
ry=0.0,
rz=0.0,
)
self.rz1 = 0
self.rz2 = 0
self.z1 = 0
self.z2 = 0
def joystick_event_loop(self):
def key_received(key: pyjoystick.interface.Key):
msg = JoystickMsg(str(key).replace('-', ''), key.value)
# print(msg)
match msg.key:
case 'Axis 0':
self.d['x'] = -msg.value
case 'Axis 1':
self.d['y'] = msg.value
case 'Axis 4':
self.d['rx'] = -msg.value
case 'Axis 3':
self.d['ry'] = -msg.value
case 'Axis 2':
self.z1 = msg.value
case 'Axis 5':
self.z2 = - msg.value
case 'Button 4':
self.rz1 = - msg.value
case 'Button 5':
self.rz2 = msg.value
self.d['rz'] = float(self.rz1 + self.rz2)
self.d['z'] = float(self.z1 + self.z2)
self.sendj()
# if msg.key == 'Axis 0':
# self.arg1 += msg.value
# self.arg.t_end = int(self.arg1 * 2)
# self.send(self.arg)
run_event_loop(None, None, key_received)
def loop(self):
isb = self.c.ctx.socket(zmq.PULL)
isb.connect("inproc://midi")
self.c.poller.register(isb, zmq.POLLIN)
while True:
p = dict(self.c.poller.poll())
if isb in p:
msg = Msg.decode_msg(isb.recv())
match msg.type:
case 'pitchwheel':
match msg.channel:
case 0:
# print(msg.pitch)
self.send(ImageArgMsg('midi', 0, msg.pitch * 3))
if self.c.sub in p:
msg = self.recv()
if isinstance(msg, KillMsg):
if msg.name == '':
self.do_loop = False
return
if __name__ == '__main__':
Joystick()()