flandre/src/nodes/Robot.py

64 lines
2.1 KiB
Python
Raw Normal View History

2025-01-20 15:23:27 +08:00
import logging
import threading
import zmq
from nodes.Node import Node
from utils.Msg import KillMsg, RobotRtsiMsg, Msg
from utils.rtsi import rtsi
from utils.rtsi.serialize import DataObject
logger = logging.getLogger(__name__)
class Robot(Node):
topics = []
def __init__(self):
super(Robot, self).__init__()
self.rtsi_thread_stop = False
self.rt = rtsi('11.6.1.53') # 创建rtsi类
self.rt.connect() # socket链接远程rtsi
self.rt.version_check() # rtsi版本协议检查
version = self.rt.controller_version() # 获取控制器协议版本
def rtsi_thread(self):
rtsi_push_socket = self.context.socket(zmq.PUSH)
rtsi_push_socket.bind('inproc://rtsi')
output1 = self.rt.output_subscribe('actual_joint_positions,actual_TCP_force', 125) # 输出订阅,配方1
self.rt.start() # rtsi 开始
while not self.rtsi_thread_stop:
recv_out: DataObject = self.rt.get_output_data()
if recv_out is None:
continue
if recv_out.recipe_id == output1.id:
rtsi_push_socket.send(RobotRtsiMsg(
pos=recv_out.actual_joint_positions,
force=recv_out.actual_TCP_force,
).encode_msg())
self.rt.disconnect()
def loop(self):
t = threading.Thread(target=self.rtsi_thread)
t.start()
rtsi_pull_socket = self.context.socket(zmq.PULL)
rtsi_pull_socket.connect('inproc://rtsi')
self.c.poller.register(rtsi_pull_socket, zmq.POLLIN)
while True:
socks = dict(self.c.poller.poll())
if rtsi_pull_socket in socks and socks[rtsi_pull_socket] == zmq.POLLIN:
msg = Msg.decode_msg(rtsi_pull_socket.recv())
self.send(msg)
if self.c.sub in socks and socks[self.c.sub] == zmq.POLLIN:
msg = self.recv()
if isinstance(msg, KillMsg):
if msg.name == '':
self.rtsi_thread_stop = True
return
if __name__ == '__main__':
r = Robot()
r()