simple record robot

This commit is contained in:
scarlet 2025-06-08 20:35:42 -07:00
parent 2809f45604
commit 464925a621

View File

@ -336,59 +336,46 @@ def robot_record(folder):
p = Path(folder) p = Path(folder)
p.mkdir(parents=True, exist_ok=True) p.mkdir(parents=True, exist_ok=True)
robot.setup() robot.setup()
q = queue.Queue()
def rtsi_thread():
output1 = robot.rt.output_subscribe(
'actual_TCP_pose,actual_TCP_force,timestamp', 250) # 输出订阅,配方1
robot.rt.start() # rtsi 开始
arr = []
last_device_ts = 0
while True:
recv_out: DataObject = robot.rt.get_output_data()
if recv_out is None:
continue
if recv_out.recipe_id == output1.id:
x, y, z, rx, ry, rz = recv_out.actual_TCP_pose
fx, fy, fz, frx, fry, frz = recv_out.actual_TCP_force
host_ts = time.time_ns()
device_ts = recv_out.timestamp
# print(device_ts - last_device_ts)
last_device_ts = device_ts
d = dict(
x=x,
y=y,
z=z,
fx=fx,
fy=fy,
fz=fz,
rx=rx,
ry=ry,
rz=rz,
frx=frx,
fry=fry,
frz=frz,
host_ts=host_ts,
device_ts=device_ts,
)
arr.append(d)
if arr.__len__() == 100:
q.put(arr)
arr = []
def write_thread():
while True:
ns = time.time_ns()
# print(q.get()[0]['ns'])
(p / f'{ns}.json').write_text(json.dumps(q.get()))
tr = threading.Thread(target=rtsi_thread)
tw = threading.Thread(target=write_thread)
tr.start()
tw.start()
tr.join()
tw.join()
output1 = robot.rt.output_subscribe(
'actual_TCP_pose,actual_TCP_force,timestamp', 250) # 输出订阅,配方1
robot.rt.start() # rtsi 开始
arr = []
last_device_ts = 0
while True:
recv_out: DataObject = robot.rt.get_output_data()
if recv_out is None:
continue
if recv_out.recipe_id == output1.id:
x, y, z, rx, ry, rz = recv_out.actual_TCP_pose
fx, fy, fz, frx, fry, frz = recv_out.actual_TCP_force
host_ts = time.time_ns()
device_ts = recv_out.timestamp
# print(device_ts - last_device_ts)
last_device_ts = device_ts
d = dict(
x=x,
y=y,
z=z,
fx=fx,
fy=fy,
fz=fz,
rx=rx,
ry=ry,
rz=rz,
frx=frx,
fry=fry,
frz=frz,
host_ts=host_ts,
device_ts=device_ts,
)
arr.append(d)
if arr.__len__() == 100:
ns = time.time_ns()
# print(q.get()[0]['ns'])
# print('write')
(p / f'{ns}.json').write_text(json.dumps(arr))
arr = []
if __name__ == '__main__': if __name__ == '__main__':
entrypoint() entrypoint()