add MiSwitch class

This commit is contained in:
flandre 2025-05-13 17:25:26 +08:00
parent a7b4cf1d72
commit 002af887bc
3 changed files with 40 additions and 87 deletions

View File

@ -17,7 +17,7 @@ from flandre.BusClient import BusClient
from flandre.kde_pyqt6_mainui import kde_pyqt6_mainui from flandre.kde_pyqt6_mainui import kde_pyqt6_mainui
from flandre.nodes.Broker import Broker from flandre.nodes.Broker import Broker
from flandre.utils.Msg import KillMsg, NodeOnlineMsg, Msg1, Msg2 from flandre.utils.Msg import KillMsg, NodeOnlineMsg, Msg1, Msg2
from flandre.utils.mi import c1_connect, c2_connect, c1_disconnect, c2_disconnect, c1_connected, c1_power, c2_connected from flandre.utils.mi import MiSwitch
class LaunchComponent(Enum): class LaunchComponent(Enum):
@ -130,37 +130,45 @@ def gui(data_folder, generate_pyqt, path):
launch_from_file(path) launch_from_file(path)
mi1: MiSwitch = None
mi2: MiSwitch = None
@entrypoint.group() @entrypoint.group()
def mi(): def mi():
pass global mi1, mi2
mi1 = MiSwitch(C.switch1_ip, C.switch1_token)
mi2 = MiSwitch(C.switch2_ip, C.switch2_token)
@mi.command() @mi.command()
@click.argument('name') @click.argument('name')
def on(name): def on(name):
if name == 'c1': if name == 'c1':
c1_connect() mi1.on()
if name == 'c2': if name == 'c2':
c2_connect() mi2.on()
@mi.command() @mi.command()
@click.argument('name') @click.argument('name')
def off(name): def off(name):
if name == 'c1': if name == 'c1':
c1_disconnect() mi2.off()
if name == 'c2': if name == 'c2':
c2_disconnect() mi2.off()
@mi.command() @mi.command()
@click.argument('name') @click.argument('name')
def status(name): def status(name):
if name == 'c1': if name == 'c1':
print(c1_connected()) mi = MiSwitch(C.switch1_ip, C.switch1_token)
print(c1_power()) print(mi.is_on())
print(mi.power())
if name == 'c2': if name == 'c2':
print(c2_connected()) mi = MiSwitch(C.switch2_ip, C.switch2_token)
print(mi.is_on())
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -7,7 +7,7 @@ import zmq
from flandre import C from flandre import C
from flandre.nodes.Node import Node from flandre.nodes.Node import Node
from flandre.utils.Msg import KillMsg, SetDeviceSwitchMsg, DeviceSwitchMsg, RefreshDeviceMsg, InterruptMsg from flandre.utils.Msg import KillMsg, SetDeviceSwitchMsg, DeviceSwitchMsg, RefreshDeviceMsg, InterruptMsg
from flandre.utils.mi import c1_connect, c1_connected, c1_disconnect from flandre.utils.mi import MiSwitch
from flandre.utils.network import check_port, check_socket from flandre.utils.network import check_port, check_socket
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -20,6 +20,7 @@ class Mi(Node):
super(Mi, self).__init__(level=level) super(Mi, self).__init__(level=level)
self.ping1enable = False self.ping1enable = False
self.ping60enable = False self.ping60enable = False
self.mi = MiSwitch(C.switch1_ip, C.switch1_token)
def custom_setup(self): def custom_setup(self):
self.mi_rep_socket = self.context.socket(zmq.REP) self.mi_rep_socket = self.context.socket(zmq.REP)
@ -79,7 +80,7 @@ class Mi(Node):
self.device_py_req_socket.recv() self.device_py_req_socket.recv()
def loop(self): def loop(self):
if c1_connected() and check_socket(C.device_py_rep_socket): if self.mi.is_on() and check_socket(C.device_py_rep_socket):
self.device_py_req_socket.send(b'get_device_status') self.device_py_req_socket.send(b'get_device_status')
r = self.device_py_req_socket.recv() r = self.device_py_req_socket.recv()
if r == b'on': if r == b'on':
@ -112,11 +113,11 @@ class Mi(Node):
return return
if isinstance(msg, SetDeviceSwitchMsg): if isinstance(msg, SetDeviceSwitchMsg):
if msg.value: if msg.value:
c1_connect() self.mi.on()
self.ping1enable = True self.ping1enable = True
else: else:
self.ping60enable = False self.ping60enable = False
c1_disconnect() self.mi.off()
self.device_py_req_socket.send(b'kill') self.device_py_req_socket.send(b'kill')
self.device_py_req_socket.recv() self.device_py_req_socket.recv()
self.send(DeviceSwitchMsg('RED')) self.send(DeviceSwitchMsg('RED'))

View File

@ -1,85 +1,29 @@
import click
from miio.miioprotocol import MiIOProtocol from miio.miioprotocol import MiIOProtocol
from flandre import C
class MiSwitch:
def __init__(self, ip, token):
self.protocol = MiIOProtocol(ip, token)
def c1(): def set_connect(self, status: bool):
return MiIOProtocol(C.switch1_ip, C.switch1_token) self.protocol.send('set_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1, 'value': status}])
def on(self):
self.set_connect(True)
def c2(): def off(self):
return MiIOProtocol(C.switch2_ip, C.switch2_token) self.set_connect(False)
def power(self):
return self.protocol.send('get_properties', [{'did': '845778715', 'siid': 11, 'piid': 2}])[0]['value']
def c1_set_connect(status: bool): def is_on(self):
c1().send('set_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1, 'value': status}]) return self.protocol.send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}])[0]['value']
def c1_connect():
c1_set_connect(True)
def c1_disconnect():
c1_set_connect(False)
def c1_connected():
return c1().send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}])[0]['value']
def c1_power():
return c1().send('get_properties', [{'did': '845778715', 'siid': 11, 'piid': 2}])[0]['value']
def c2_set_connect(status: bool):
c2().send('set_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1, 'value': status}])
def c2_connect():
c2_set_connect(True)
def c2_disconnect():
c2_set_connect(False)
def c2_connected():
return c2().send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}])[0]['value']
@click.group()
def cli():
pass
@cli.command()
@click.argument('name')
def on(name):
if name == 'c1':
c1_connect()
if name == 'c2':
c2_connect()
@cli.command()
@click.argument('name')
def off(name):
if name == 'c1':
c1_disconnect()
if name == 'c2':
c2_disconnect()
@cli.command()
@click.argument('name')
def status(name):
if name == 'c1':
print(c1_connected())
print(c1_power())
if name == 'c2':
print(c2_connected())
if __name__ == '__main__': if __name__ == '__main__':
cli() from flandre import C
mi = MiSwitch(C.switch1_ip, C.switch1_token)
print(mi.is_on())
print(mi.power())