Teleoperating Robot with MagiClaw

MagiClaw provides two types of the gripper, including in-hand type and on-robot type. It allows users to teleoperate their robots when the on-robot type is mounted at the end of the robots, and the in-hand type is handled by human operators. On this page, we will briefly introduce how to read the data streamed from MagiClaw, and demonstrate teleoperating UR7e robot using MagiClaw.

How to Read MagiClaw Data

When the MagiClaw is running, the control box keeps sending data via Transmission Control Protocol (TCP) so that external applications can easily subscribe and process the information in real time. Any device on the same network can subscribe to the MagiClaw data stream and decode the messages consistently.

  • Transport Layer: MagiClaw uses TCP sockets to ensure reliable delivery and ordered transmission.

  • Messaging Layer: Data is sent via ZeroMQ (ZMQ), which provides efficient asynchronous publish/subscribe messaging.

  • Serialization: All messages are encoded with Protocol Buffers (Protobuf), ensuring compact binary format, cross-language compatibility, and well-defined schemas.

To receive MagiClaw data, an external program needs to:

  1. Connect to the TCP port where MagiClaw publishes messages (e.g., tcp://<magiclaw-ip>:<port>). The default ports for sending data of magiclaw_0 and magiclaw_1 are 6300 and 6400.

  2. Use ZeroMQ’s SUB socket to subscribe to the stream.

  3. Deserialize the received binary data using the corresponding Protobuf message definition (magiclaw_msg.proto).

Protocol Buffers Definition

Here is the definition of the magiclaw_msg.proto, and you can also find it in the repository:

syntax = "proto3";

package MagiClaw;

message Motor
{
    float angle = 1;
    float speed = 2;
    float iq = 3;
    int32 temperature = 4;
}

message Claw
{
    float angle = 1;
    Motor motor = 2;
}

message Finger
{
    bytes img = 1;
    repeated float pose = 2;
    repeated float force = 3;
    repeated float node = 4;
}

message Phone
{
    bytes color_img = 1;
    bytes depth_img = 2;
    int32 depth_width = 3;
    int32 depth_height = 4;
    repeated float local_pose = 5;
    repeated float global_pose = 6;
}

message MagiClaw
{
    float timestamp = 1;
    Claw claw = 2;
    Finger finger_0 = 3;
    Finger finger_1 = 4;
    Phone phone = 5;
    repeated float pose = 6;
}

Next step is to generate the class you’ll need to read MagiClaw message. To do this, you need to run the protocol buffer compiler protoc on the magiclaw_msg.proto:

1

If you haven’t installed the compiler, download the package and follow the instructions in the README.

2

Now run the compiler, specifying the source directory (where your application’s source code lives – the current directory is used if you don’t provide a value), the destination directory (where you want the generated code to go; often the same as $SRC_DIR), and the path to your .proto:

protoc --proto_path=$SRC_DIR --python_out=$DST_DIR $SRC_DIR/magiclaw_msg.proto

The --python_out option is used since the example below is in Python – similar options are provided for other supported languages.

This generates magiclaw_msg_pb2.py in your specified destination directory.

Python Example

Here is a Python example to read the MagiClaw data using the generated magiclaw_msg_pb2.py, and you can also find it in the repository:

import zmq
import numpy as np
from typing import Tuple
from datetime import datetime
import magiclaw_msg_pb2    # generated from magiclaw_msg.proto

class MagiClawSubscriber:
    def __init__(
        self,
        host: str,
        port: int,
        hwm: int = 1,
        conflate: bool = True,
    ) -> None:
        """
        Subscriber initialization.
        """

        print("{:-^80}".format(" Claw Subscriber Initialization "))
        print(f"Address: tcp://{host}:{port}")

        # Create a ZMQ context
        self.context = zmq.Context()
        # Create a ZMQ subscriber
        self.subscriber = self.context.socket(zmq.SUB)
        # Set high water mark
        self.subscriber.set_hwm(hwm)
        # Set conflate
        self.subscriber.setsockopt(zmq.CONFLATE, conflate)
        # Connect the address
        self.subscriber.connect(f"tcp://{host}:{port}")
        # Subscribe all messages
        self.subscriber.setsockopt_string(zmq.SUBSCRIBE, "")

        print("Claw Subscriber Initialization Done.")
        print("{:-^80}".format(""))

    def subscribeMessage(self):
        """
        Subscribe the message.
        """

        # Receive the message
        msg = self.subscriber.recv()

        # Parse the message
        magiclaw = magiclaw_msg_pb2.MagiClaw()
        magiclaw.ParseFromString(msg)

        # Unpack the message
        claw_angle = magiclaw.claw.angle
        motor_angle = magiclaw.claw.motor.angle
        motor_speed = magiclaw.claw.motor.speed
        motor_iq = magiclaw.claw.motor.iq
        motor_temperature = magiclaw.claw.motor.temperature
        finger_0_img = magiclaw.finger_0.img
        finger_0_pose = np.array(magiclaw.finger_0.pose)
        finger_0_force = np.array(magiclaw.finger_0.force)
        finger_0_node = np.array(magiclaw.finger_0.node).reshape(-1, 3)
        finger_1_img = magiclaw.finger_1.img
        finger_1_pose = np.array(magiclaw.finger_1.pose)
        finger_1_force = np.array(magiclaw.finger_1.force)
        finger_1_node = np.array(magiclaw.finger_1.node).reshape(-1, 3)
        phone_color_img = magiclaw.phone.color_img
        phone_depth_img = magiclaw.phone.depth_img
        phone_local_pose = np.array(magiclaw.phone.local_pose)
        phone_global_pose = np.array(magiclaw.phone.global_pose)
        magiclaw_pose = np.array(magiclaw.pose)

        return (
            claw_angle,
            motor_angle,
            motor_speed,
            motor_iq,
            motor_temperature,
            finger_0_img,
            finger_0_pose,
            finger_0_force,
            finger_0_node,
            finger_1_img,
            finger_1_pose,
            finger_1_force,
            finger_1_node,
            phone_color_img,
            phone_depth_img,
            phone_local_pose,
            phone_global_pose,
            magiclaw_pose,
        )

    def close(self):
        """
        Close ZMQ socket and context to prevent memory leaks.
        """

        if hasattr(self, "subscriber") and self.subscriber:
            self.subscriber.close()
        if hasattr(self, "context") and self.context:
            self.context.term()

To use this MagiClawSubscriber class, make sure install dependencies:

pip install pyzmq protobuf

Then you can use the following code:

magiclaw_subscriber = MagiClawSubscriber(
    host=<magiclaw_host>,
    port=6300,
)

while True:
    (
        claw_angle,
        motor_angle,
        motor_speed,
        motor_iq,
        motor_temperature,
        finger_0_img,
        finger_0_pose,
        finger_0_force,
        finger_0_node,
        finger_1_img,
        finger_1_pose,
        finger_1_force,
        finger_1_node,
        phone_color_img,
        phone_depth_img,
        phone_local_pose,
        phone_global_pose,
        magiclaw_pose,
    ) = magiclaw_subscriber.subscribeMessage()
    
    print("MagiClaw pose:", magiclaw_pose)    # print data as you want

Now you can get multi-modal data from the MagiClaw (ID: 0), including:

  • claw_angle (float): The angle of the claw.

  • motor_angle (float): The angle of the motor.

  • motor_speed (float): The speed of the motor.

  • motor_iq (float): The current of the motor in IQ format.

  • motor_temperature (int): The temperature of the motor in Celsius.

  • finger_0_img_bytes (bytes): The image captured by finger 0.

  • finger_0_pose (list): The pose of finger 0. Default is a zero vector.

  • finger_0_force (list): The force on the bottom surface of finger 0.

  • finger_0_node (list): The node displacement of finger 0.

  • finger_1_img_bytes (bytes): The image captured by finger 1.

  • finger_1_pose (list): The pose of finger 1. Default is a zero vector.

  • finger_1_force (list): The force on the bottom surface of finger 1.

  • finger_1_node (list): The node displacement of finger 1.

  • phone_color_img_bytes (bytes): The color image captured by the phone.

  • phone_depth_img_bytes (bytes): The depth image captured by the phone.

  • phone_depth_width (int): The width of the phone depth image.

  • phone_depth_height (int): The height of the phone depth image.

  • phone_local_pose (list): The local pose of the phone.

  • phone_global_pose (list): The global pose of the phone.

  • magiclaw_pose (list): The pose of MagiClaw.

If you want to read data from the MagiClaw (ID: 1), just change the value of port to 6400.

Teleoperate UR7e using MagiClaw

MagiClaw can be used as an input device for teleoperation of a UR7e robotic arm. By reading the pose (position + orientation) from the MagiClaw data stream, we can map it into incremental Cartesian motions and send them to the UR7e in real time.

First, you need to mount the on-robot MagiClaw at the end of UR7e, and connect it to a control box with an XT30 2+2 cable. The in-hand MagiClaw is connected to another control box with an XT30 2+2 cable. Both of the control boxes are connected to a local network via Ethernet, where your PC and UR7e are also connected.

Then you need to modify the bilateral control mode for each control box. You can launch the control box via either SSH or VNC, and change the value of mode in $HOME/magiclaw/configs/bilateral.yaml to leader for the in-hand MagiClaw and follower for the on-robot one.

After that, you can copy the following script teleop.py to your PC:

import argparse
import time
import rtde_control
import rtde_receive
import numpy as np
import multiprocessing as mp
from multiprocessing import Queue, Event
from scipy.spatial.transform import Slerp, Rotation as R
from magiclaw_subscriber import MagiClawSubscriber


def magiclaw_process(host, data_queue, stop_event, loop_rate=50) -> None:
    magiclaw_subscriber = MagiClawSubscriber(host, 6300)
    _, _, _, _, magiclaw_pose = magiclaw_subscriber.subscribeMessage()
    init_trans = magiclaw_pose[:3]
    init_rot = R.from_quat(magiclaw_pose[3:])
    
    loop_time = 1.0 / loop_rate
    try:
        while not stop_event.is_set():
            start_time = time.time()
            _, _, _, _, magiclaw_pose = magiclaw_subscriber.subscribeMessage()
            trans = magiclaw_pose[:3]
            rot = R.from_quat(magiclaw_pose[3:])
            delta_trans = trans - init_trans
            delta_rot = rot * init_rot.inv()
            data_queue.put((delta_pos.tolist(), delta_rot.as_quat()))

            elapsed_time = time.time() - start_time
            if elapsed_time < loop_time:
                time.sleep(loop_time - elapsed_time)
        magiclaw_subscriber.close()
    except KeyboardInterrupt:
        if not stop_event.is_set():
            stop_event.set()
        magiclaw_subscriber.close()

def ur_control_process(host, data_queue, stop_event, loop_rate=100) -> None:
    rtde_c = rtde_control.RTDEControlInterface(host)
    rtde_r = rtde_receive.RTDEReceiveInterface(host)
    
    initial_pose = np.array(rtde_r.getActualTCPPose())
    initial_trans = initial_pose[:3]
    initial_rot = R.from_rotvec(initial_pose[3:6])

    last_delta_pos = np.zeros(3)
    last_delta_rot = R.from_quat([0, 0, 0, 1])

    loop_time = 1.0 / loop_rate

    try:
        while not stop_event.is_set():
            start_time = time.time()
            while not data_queue.empty():
                delta_pos, delta_rot_quat = data_queue.get()
                last_delta_pos = np.array(delta_pos)
                last_delta_rot = R.from_quat(delta_rot_quat)
            
            new_trans = initial_trans + last_delta_trans
            new_rot = last_delta_rot * initial_rot
            terget_pose = np.concatenate([new_trans, new_rot.as_rotvec()])
            rtde_c.servoL(
                target_pose.tolist(),
                0.2,
                1.0,
                0.02,
                0.1,
                300.0
            )
            
            elapsed_time = time.time() - start_time
            if elapsed_time < loop_time:
                time.sleep(loop_time - elapsed_time)
        rtde_c.servoStop()
        rtde_c.stopScript()
    except KeyboardInterrupt:
        if not stop_event.is_set():
            stop_event.set()
        rtde_c.servoStop()
        rtde_c.stopScript()

def teleoperation(robot_host: str, magiclaw_host: str) -> None:
    mp.set_start_method("spawn", force=True)
    data_queue = Queue()
    stop_event = Event()
    
    ur_control_proc = mp.Process(
        target=ur_control_process, args=(robot_host, data_queue, stop_event)
    )
    ur_control_proc.start()
    time.sleep(1)
    
    magiclaw_proc = mp.Process(
        target=magiclaw_process, args=(magiclaw_host, data_queue, stop_event)
    )
    magiclaw_proc.start()

    try:
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        stop_event.set()
    finally:
        magiclaw_proc.join()
        ur_control_proc.join()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--robot_host",
        type=str,
        help="IP address of the robot",
    )
    parser.add_argument(
        "--magiclaw_host",
        type=str,
        help="IP address of the MagiClaw",
    )
    args = parser.parse_args()
    
    teleoperation(args.robot_host, args.magiclaw_host)

Before running the script, install the dependencies:

pip install ur_rtde pyzmq protobuf

INFO

If you cannot install ur_rtde successfully by pip, please follow the official installation guide.

To start teleoperation, start the two MagiClaws in teleop mode via the app, CLI or dashboard. Then on your PC's terminal, run:

python teleop.py --robot_host <robot_host> --magiclaw_host <magiclaw_host>

where <robot_host> refers to the IP address of the UR7e, and <magiclaw_host> refers to the IP address of the MagiClaw control box.

DANGER

The delta value of magiclaw_pose received from the in-hand MagiClaw relative to the initial state will be converted into the delta value of the robot's Tool Center Point (TCP) to calculate the target of the robot's TCP, achieving continuous teleoperation.

Last updated