ailerobotteleoperationsimulationdata-collection

Thu thập dữ liệu bằng Teleoperation trong Simulation

Hướng dẫn chi tiết thu thập dữ liệu demonstration bằng teleop trong simulation với LeRobot — từ setup đến dataset hoàn chỉnh.

Nguyễn Anh Tuấn15 tháng 3, 202611 phút đọc
Thu thập dữ liệu bằng Teleoperation trong Simulation

Giới thiệu: Dữ liệu là nhiên liệu của Robot Learning

Nếu bạn đã đọc bài trước về LeRobot Framework, bạn biết rằng LeRobot cung cấp nhiều policy mạnh mẽ như ACT và Diffusion Policy. Nhưng policy tốt đến mấy cũng vô dụng nếu không có dữ liệu chất lượng. Trong imitation learning, dữ liệu chính là demonstrations — những lần bạn "chỉ" cho robot cách thực hiện tác vụ.

Bài viết này — bài thứ 2 trong series VLA & LeRobot Mastery — sẽ hướng dẫn bạn thu thập dữ liệu bằng teleoperation trong môi trường simulation. Chúng ta sẽ đi từ setup ban đầu đến khi có dataset hoàn chỉnh sẵn sàng cho training.

Teleoperation setup

Tại sao Teleoperation?

Có ba cách chính để thu thập dữ liệu cho robot learning:

Phương pháp Ưu điểm Nhược điểm
Teleoperation Chất lượng cao, kiểm soát được Tốn thời gian, cần người điều khiển
Scripted Policy Nhanh, tự động Không tự nhiên, limited diversity
Autonomous (RL) Không cần người Cần reward design, kém ổn định

Teleoperation — nơi người điều khiển trực tiếp robot qua controller — tạo ra dữ liệu tự nhiên nhất. Con người biết cách gắp đồ vật hiệu quả, và sự đa dạng tự nhiên trong cách điều khiển giúp policy học được tốt hơn.

Setup môi trường Simulation

Cài đặt dependencies

# Cài đặt LeRobot với simulation support
pip install lerobot[simulation]

# Hoặc cài riêng MuJoCo
pip install mujoco gymnasium-robotics

# Cài đặt robosuite (tùy chọn, nhiều task environments)
pip install robosuite

# Verify MuJoCo hoạt động
python -c "import mujoco; print(f'MuJoCo version: {mujoco.__version__}')"

Chọn môi trường phù hợp

LeRobot hỗ trợ nhiều simulator, mỗi loại có ưu điểm riêng:

# MuJoCo — Simulation chính xác, phổ biến nhất
import mujoco
import mujoco.viewer

# robosuite — Nhiều task environment có sẵn
import robosuite as suite
env = suite.make(
    env_name="Lift",           # Task: nhấc khối lên
    robots="Panda",            # Robot Franka Panda
    has_renderer=True,         # Render để xem
    has_offscreen_renderer=True,  # Offscreen cho camera obs
    use_camera_obs=True,       # Dùng camera observation
    camera_names=["agentview", "robot0_eye_in_hand"],
    camera_heights=480,
    camera_widths=640,
)

# Gymnasium Robotics — Interface chuẩn
import gymnasium as gym
env = gym.make("FrankaPickAndPlace-v3", render_mode="human")

Thiết lập Teleoperation

Teleoperation bằng bàn phím (đơn giản nhất)

import numpy as np
import gymnasium as gym
import time

class KeyboardTeleop:
    """Điều khiển robot bằng bàn phím — phù hợp cho tasks 2D đơn giản."""
    
    def __init__(self):
        self.action = np.zeros(4)  # [dx, dy, dz, gripper]
        self.speed = 0.05
        self.running = True
        
    def get_action_from_key(self, key):
        """Map phím bấm sang action."""
        action = np.zeros(4)
        key_map = {
            'w': (0, self.speed),    # Forward
            's': (0, -self.speed),   # Backward
            'a': (1, -self.speed),   # Left
            'd': (1, self.speed),    # Right
            'q': (2, self.speed),    # Up
            'e': (2, -self.speed),   # Down
            'g': (3, 1.0),          # Gripper close
            'r': (3, -1.0),         # Gripper open
        }
        if key in key_map:
            idx, val = key_map[key]
            action[idx] = val
        return action


def record_keyboard_episode(env, teleop, max_steps=500):
    """Ghi lại một episode điều khiển bằng bàn phím."""
    obs, info = env.reset()
    episode_data = []
    
    for step in range(max_steps):
        # Lấy action từ keyboard input
        key = get_keyboard_input()  # Platform-specific function
        action = teleop.get_action_from_key(key)
        
        # Thực thi action
        next_obs, reward, terminated, truncated, info = env.step(action)
        
        # Lưu frame
        episode_data.append({
            "observation.image": obs.get("image", None),
            "observation.state": obs.get("state", None),
            "action": action,
        })
        
        obs = next_obs
        if terminated or truncated:
            break
    
    return episode_data

Teleoperation bằng SpaceMouse (chuyên nghiệp)

SpaceMouse (3Dconnexion) là thiết bị input 6-DOF lý tưởng cho robot manipulation:

import numpy as np

class SpaceMouseTeleop:
    """Teleoperation bằng SpaceMouse 6-DOF.
    
    SpaceMouse cung cấp 6 trục: tx, ty, tz, rx, ry, rz
    Map trực tiếp sang end-effector delta pose.
    """
    
    def __init__(self, pos_sensitivity=1.0, rot_sensitivity=1.0):
        try:
            import pyspacemouse
            self.device = pyspacemouse
            success = pyspacemouse.open()
            if not success:
                raise RuntimeError("Không tìm thấy SpaceMouse!")
        except ImportError:
            raise ImportError("Cài đặt: pip install pyspacemouse")
        
        self.pos_sensitivity = pos_sensitivity
        self.rot_sensitivity = rot_sensitivity
        self.gripper_state = 1.0  # 1.0 = mở, -1.0 = đóng
    
    def get_action(self):
        """Đọc SpaceMouse state và trả về action 7D."""
        state = self.device.read()
        
        # Map SpaceMouse axes sang robot action
        action = np.zeros(7)  # [dx, dy, dz, drx, dry, drz, gripper]
        
        # Position deltas
        action[0] = state.x * self.pos_sensitivity
        action[1] = state.y * self.pos_sensitivity  
        action[2] = state.z * self.pos_sensitivity
        
        # Rotation deltas
        action[3] = state.roll * self.rot_sensitivity
        action[4] = state.pitch * self.rot_sensitivity
        action[5] = state.yaw * self.rot_sensitivity
        
        # Gripper toggle (nút bấm trên SpaceMouse)
        if state.buttons[0]:
            self.gripper_state *= -1
        action[6] = self.gripper_state
        
        return action
    
    def close(self):
        self.device.close()

SpaceMouse controller for robot teleoperation

Teleoperation bằng Leader-Follower (ALOHA style)

Phương pháp này dùng hai robot giống hệt nhau — leader (người điều khiển) và follower (thực hiện task):

from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot

def setup_leader_follower():
    """Setup hệ thống leader-follower cho ALOHA."""
    robot = ManipulatorRobot(
        robot_type="aloha",
        leader_arms={
            "left": FeetechMotorsBus(port="/dev/ttyACM0"),
            "right": FeetechMotorsBus(port="/dev/ttyACM1"),
        },
        follower_arms={
            "left": FeetechMotorsBus(port="/dev/ttyACM2"),
            "right": FeetechMotorsBus(port="/dev/ttyACM3"),
        },
        cameras={
            "top": OpenCVCamera(0, fps=30, width=640, height=480),
            "wrist_left": OpenCVCamera(1, fps=30, width=640, height=480),
            "wrist_right": OpenCVCamera(2, fps=30, width=640, height=480),
        },
    )
    return robot

Ghi dữ liệu với LeRobot

Sử dụng lệnh record có sẵn

# Record 50 episodes trong simulation
python lerobot/scripts/control_robot.py \
    --robot.type=so100 \
    --control.type=record \
    --control.fps=30 \
    --control.repo_id=my-user/my-pick-place-dataset \
    --control.num_episodes=50 \
    --control.single_task="Pick the red cube and place it on the green target"

Ghi dữ liệu bằng code tùy chỉnh

import numpy as np
import torch
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

def create_dataset_for_recording(repo_id, robot_type="so100"):
    """Tạo LeRobotDataset mới cho việc ghi dữ liệu."""
    dataset = LeRobotDataset.create(
        repo_id=repo_id,
        fps=30,
        robot_type=robot_type,
        features={
            "observation.images.top": {
                "dtype": "video",
                "shape": (480, 640, 3),
                "names": ["height", "width", "channels"],
            },
            "observation.images.wrist": {
                "dtype": "video",
                "shape": (480, 640, 3),
                "names": ["height", "width", "channels"],
            },
            "observation.state": {
                "dtype": "float32",
                "shape": (6,),
                "names": ["joint_positions"],
            },
            "action": {
                "dtype": "float32",
                "shape": (6,),
                "names": ["joint_positions_target"],
            },
        },
    )
    return dataset


def record_episodes(env, teleop, dataset, num_episodes=50, max_steps=500):
    """Ghi nhiều episodes teleop demonstration.
    
    Args:
        env: Gymnasium environment
        teleop: Teleop controller (keyboard, SpaceMouse, etc.)
        dataset: LeRobotDataset đã tạo
        num_episodes: Số episodes cần ghi
        max_steps: Số bước tối đa mỗi episode
    """
    successful_episodes = 0
    
    for ep in range(num_episodes):
        print(f"\n{'='*50}")
        print(f"Episode {ep+1}/{num_episodes}")
        print(f"Nhấn Enter để bắt đầu, 'q' để thoát...")
        
        if input().strip() == 'q':
            break
        
        obs, info = env.reset()
        episode_frames = []
        
        for step in range(max_steps):
            # Lấy action từ teleop
            action = teleop.get_action()
            
            # Lưu frame hiện tại
            dataset.add_frame({
                "observation.images.top": obs["image_top"],
                "observation.images.wrist": obs["image_wrist"],
                "observation.state": obs["state"],
                "action": action,
            })
            
            # Thực thi action
            obs, reward, terminated, truncated, info = env.step(action)
            
            if terminated:
                success = info.get("is_success", False)
                if success:
                    successful_episodes += 1
                    print(f"  ✓ Thành công! ({step+1} bước)")
                else:
                    print(f"  ✗ Thất bại ({step+1} bước)")
                break
            
            if truncated:
                print(f"  ⏰ Hết thời gian ({step+1} bước)")
                break
        
        # Lưu episode vào dataset
        dataset.save_episode()
        print(f"  Tỷ lệ thành công: {successful_episodes}/{ep+1} "
              f"({successful_episodes/(ep+1)*100:.0f}%)")
    
    return dataset


# Chạy thu thập dữ liệu
env = make_pick_place_env()
teleop = SpaceMouseTeleop()
dataset = create_dataset_for_recording("my-user/pick-place-50ep")

dataset = record_episodes(env, teleop, dataset, num_episodes=50)
dataset.push_to_hub()  # Upload lên HuggingFace Hub
print(f"\nĐã upload {dataset.num_episodes} episodes lên Hub!")

Cấu trúc Episode trong LeRobotDataset

Hiểu cấu trúc episode rất quan trọng để debug và phân tích dữ liệu:

from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

dataset = LeRobotDataset("lerobot/aloha_sim_transfer_cube_human")

# Thông tin tổng quan
print(f"Tổng frames: {dataset.num_frames}")
print(f"Tổng episodes: {dataset.num_episodes}")
print(f"FPS: {dataset.fps}")

# Phân tích từng episode
for ep_idx in range(min(5, dataset.num_episodes)):
    ep_start = dataset.episode_data_index["from"][ep_idx].item()
    ep_end = dataset.episode_data_index["to"][ep_idx].item()
    ep_length = ep_end - ep_start
    duration = ep_length / dataset.fps
    
    print(f"\nEpisode {ep_idx}:")
    print(f"  Frames: {ep_start} → {ep_end} ({ep_length} frames)")
    print(f"  Duration: {duration:.1f}s")
    
    # Phân tích action distribution
    actions = torch.stack([
        dataset[i]["action"] for i in range(ep_start, ep_end)
    ])
    print(f"  Action range: [{actions.min():.3f}, {actions.max():.3f}]")
    print(f"  Action std: {actions.std(dim=0).mean():.4f}")

Mẹo thu thập dữ liệu chất lượng cao

1. Camera placement tối ưu

# Camera placement ảnh hưởng lớn đến chất lượng policy
camera_configs = {
    # Camera trên cao (bird's eye) — tốt cho spatial reasoning
    "top": {
        "position": [0.0, 0.0, 1.2],
        "orientation": [0, -90, 0],  # Nhìn thẳng xuống
        "fov": 60,
    },
    # Camera góc 45° — cân bằng depth và overview  
    "angle": {
        "position": [0.5, -0.5, 0.8],
        "orientation": [-30, -45, 0],
        "fov": 60,
    },
    # Wrist camera — chi tiết grasping
    "wrist": {
        "position": "attached_to_ee",
        "fov": 90,  # Wide angle cho wrist cam
    },
}

# Rule of thumb: 
# - Dùng ít nhất 2 camera (1 overview + 1 wrist)
# - Overview camera nên thấy toàn bộ workspace
# - Wrist camera giúp policy nhìn rõ vật thể khi gắp

2. Consistency trong demonstrations

def validate_episode_quality(episode_data, min_steps=50, max_idle_ratio=0.3):
    """Kiểm tra chất lượng episode trước khi lưu.
    
    Args:
        episode_data: List các frames
        min_steps: Số bước tối thiểu
        max_idle_ratio: Tỷ lệ idle (không di chuyển) tối đa
    
    Returns:
        (is_valid, reason): Tuple (bool, str)
    """
    if len(episode_data) < min_steps:
        return False, f"Quá ngắn ({len(episode_data)} < {min_steps} bước)"
    
    # Kiểm tra tỷ lệ idle
    actions = np.array([f["action"] for f in episode_data])
    action_norms = np.linalg.norm(actions[:, :3], axis=1)  # Chỉ xét position
    idle_frames = np.sum(action_norms < 0.001)
    idle_ratio = idle_frames / len(actions)
    
    if idle_ratio > max_idle_ratio:
        return False, f"Quá nhiều idle ({idle_ratio:.0%} > {max_idle_ratio:.0%})"
    
    # Kiểm tra action range bất thường
    if np.any(np.abs(actions) > 10.0):
        return False, "Action values bất thường (>10.0)"
    
    return True, "OK"

3. Filtering bad episodes

def filter_dataset_by_success(dataset, min_reward=0.5):
    """Lọc bỏ episodes không thành công."""
    good_episodes = []
    
    for ep_idx in range(dataset.num_episodes):
        ep_start = dataset.episode_data_index["from"][ep_idx].item()
        ep_end = dataset.episode_data_index["to"][ep_idx].item()
        
        # Lấy reward cuối episode
        final_frame = dataset[ep_end - 1]
        
        if final_frame.get("reward", 0) >= min_reward:
            good_episodes.append(ep_idx)
    
    print(f"Giữ lại {len(good_episodes)}/{dataset.num_episodes} episodes "
          f"({len(good_episodes)/dataset.num_episodes*100:.0f}%)")
    
    return good_episodes

Data quality analysis

Ví dụ hoàn chỉnh: Thu thập 50 episodes Pick-Place

Đây là workflow đầy đủ để thu thập dataset cho task pick-and-place:

import numpy as np
import gymnasium as gym
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

def make_pick_place_env():
    """Tạo môi trường pick-place simulation."""
    env = gym.make(
        "FrankaPickAndPlace-v3",
        render_mode="human",
        max_episode_steps=500,
    )
    return env

def scripted_pick_place_policy(obs, phase="approach"):
    """Policy scripted đơn giản cho pick-place.
    
    Có thể dùng thay teleop để tạo dataset nhanh cho thử nghiệm.
    """
    ee_pos = obs["observation"][:3]      # End-effector position
    obj_pos = obs["desired_goal"][:3]    # Object position  
    target_pos = obs["achieved_goal"][:3] # Target position
    
    action = np.zeros(4)
    
    if phase == "approach":
        # Di chuyển đến trên vật thể
        above_obj = obj_pos.copy()
        above_obj[2] += 0.05
        action[:3] = (above_obj - ee_pos) * 10
        action[3] = 1.0  # Gripper mở
    elif phase == "grasp":
        # Hạ xuống và gắp
        action[:3] = (obj_pos - ee_pos) * 10
        action[3] = -1.0  # Gripper đóng
    elif phase == "lift":
        # Nâng lên
        action[:3] = np.array([0, 0, 0.1])
        action[3] = -1.0
    elif phase == "place":
        # Di chuyển đến target
        action[:3] = (target_pos - ee_pos) * 10
        action[3] = -1.0
    
    return np.clip(action, -1, 1)


def collect_pick_place_dataset(num_episodes=50):
    """Thu thập dataset pick-place hoàn chỉnh."""
    env = make_pick_place_env()
    
    dataset = LeRobotDataset.create(
        repo_id="my-user/pick-place-50ep",
        fps=30,
        robot_type="franka",
        features={
            "observation.image": {
                "dtype": "video",
                "shape": (480, 640, 3),
                "names": ["height", "width", "channels"],
            },
            "observation.state": {
                "dtype": "float32",
                "shape": (10,),
                "names": ["robot_state"],
            },
            "action": {
                "dtype": "float32",
                "shape": (4,),
                "names": ["dx", "dy", "dz", "gripper"],
            },
        },
    )
    
    for ep in range(num_episodes):
        obs, info = env.reset()
        done = False
        step = 0
        
        while not done:
            # Scripted policy (thay bằng teleop cho data thật)
            if step < 30:
                action = scripted_pick_place_policy(obs, "approach")
            elif step < 50:
                action = scripted_pick_place_policy(obs, "grasp")
            elif step < 80:
                action = scripted_pick_place_policy(obs, "lift")
            else:
                action = scripted_pick_place_policy(obs, "place")
            
            # Thêm noise nhỏ để tăng diversity
            action[:3] += np.random.normal(0, 0.01, 3)
            
            dataset.add_frame({
                "observation.image": obs.get("image", np.zeros((480,640,3))),
                "observation.state": obs["observation"],
                "action": action,
            })
            
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            step += 1
        
        dataset.save_episode()
        
        if (ep + 1) % 10 == 0:
            print(f"Đã ghi {ep+1}/{num_episodes} episodes")
    
    # Upload lên Hub
    dataset.push_to_hub()
    print(f"\nHoàn tất! {dataset.num_episodes} episodes, "
          f"{dataset.num_frames} frames tổng cộng")
    
    env.close()
    return dataset


# Chạy thu thập dữ liệu
dataset = collect_pick_place_dataset(num_episodes=50)

Papers tham khảo

  1. RoboTurk: A Crowdsourcing Platform for Robotic Skill LearningMandlekar et al., CoRL 2018 — Platform thu thập dữ liệu robot qua web
  2. MimicGen: A Data Generation System for Scalable Robot LearningMandlekar et al., CoRL 2023 — Tăng cường dữ liệu tự động từ ít demonstrations
  3. ACT: Learning Fine-Grained Bimanual ManipulationZhao et al., RSS 2023 — Framework teleop cho ALOHA

Kết luận và bước tiếp theo

Thu thập dữ liệu chất lượng cao là bước quan trọng nhất trong imitation learning. Hãy nhớ các nguyên tắc: consistent demonstrations, đa dạng camera angles, và filtering bad episodes. Chất lượng quan trọng hơn số lượng — 50 episodes tốt thường hiệu quả hơn 200 episodes kém.

Trong bài tiếp theo — Train Policy cho Single-Arm: ACT và Diffusion Policy — chúng ta sẽ dùng dataset vừa thu thập để train và so sánh hai policy phổ biến nhất.

Nếu bạn muốn tìm hiểu thêm về imitation learning trước khi đi sâu, hãy đọc bài về Imitation Learning cơ bản.

Bài viết liên quan

NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWTutorial
SimpleVLA-RL (9): OpenArm Simulation & Data
openarmisaac-labsimulationdata-collectionsimplevla-rlPhần 9

SimpleVLA-RL (9): OpenArm Simulation & Data

Setup OpenArm trong Isaac Lab, collect demonstration data trong simulation, và convert sang format cho SimpleVLA-RL training.

11/4/202618 phút đọc
NEWTutorial
SimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPhần 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 phút đọc
NEWTutorial
SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Phần 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 phút đọc