Giới thiệu: Dữ liệu là nhiên liệu của Robot Learning
Nếu bạn đã đọc bài trước về LeRobot Framework, bạn biết rằng LeRobot cung cấp nhiều policy mạnh mẽ như ACT và Diffusion Policy. Nhưng policy tốt đến mấy cũng vô dụng nếu không có dữ liệu chất lượng. Trong imitation learning, dữ liệu chính là demonstrations — những lần bạn "chỉ" cho robot cách thực hiện tác vụ.
Bài viết này — bài thứ 2 trong series VLA & LeRobot Mastery — sẽ hướng dẫn bạn thu thập dữ liệu bằng teleoperation trong môi trường simulation. Chúng ta sẽ đi từ setup ban đầu đến khi có dataset hoàn chỉnh sẵn sàng cho training.
Tại sao Teleoperation?
Có ba cách chính để thu thập dữ liệu cho robot learning:
| Phương pháp | Ưu điểm | Nhược điểm |
|---|---|---|
| Teleoperation | Chất lượng cao, kiểm soát được | Tốn thời gian, cần người điều khiển |
| Scripted Policy | Nhanh, tự động | Không tự nhiên, limited diversity |
| Autonomous (RL) | Không cần người | Cần reward design, kém ổn định |
Teleoperation — nơi người điều khiển trực tiếp robot qua controller — tạo ra dữ liệu tự nhiên nhất. Con người biết cách gắp đồ vật hiệu quả, và sự đa dạng tự nhiên trong cách điều khiển giúp policy học được tốt hơn.
Setup môi trường Simulation
Cài đặt dependencies
# Cài đặt LeRobot với simulation support
pip install lerobot[simulation]
# Hoặc cài riêng MuJoCo
pip install mujoco gymnasium-robotics
# Cài đặt robosuite (tùy chọn, nhiều task environments)
pip install robosuite
# Verify MuJoCo hoạt động
python -c "import mujoco; print(f'MuJoCo version: {mujoco.__version__}')"
Chọn môi trường phù hợp
LeRobot hỗ trợ nhiều simulator, mỗi loại có ưu điểm riêng:
# MuJoCo — Simulation chính xác, phổ biến nhất
import mujoco
import mujoco.viewer
# robosuite — Nhiều task environment có sẵn
import robosuite as suite
env = suite.make(
env_name="Lift", # Task: nhấc khối lên
robots="Panda", # Robot Franka Panda
has_renderer=True, # Render để xem
has_offscreen_renderer=True, # Offscreen cho camera obs
use_camera_obs=True, # Dùng camera observation
camera_names=["agentview", "robot0_eye_in_hand"],
camera_heights=480,
camera_widths=640,
)
# Gymnasium Robotics — Interface chuẩn
import gymnasium as gym
env = gym.make("FrankaPickAndPlace-v3", render_mode="human")
Thiết lập Teleoperation
Teleoperation bằng bàn phím (đơn giản nhất)
import numpy as np
import gymnasium as gym
import time
class KeyboardTeleop:
"""Điều khiển robot bằng bàn phím — phù hợp cho tasks 2D đơn giản."""
def __init__(self):
self.action = np.zeros(4) # [dx, dy, dz, gripper]
self.speed = 0.05
self.running = True
def get_action_from_key(self, key):
"""Map phím bấm sang action."""
action = np.zeros(4)
key_map = {
'w': (0, self.speed), # Forward
's': (0, -self.speed), # Backward
'a': (1, -self.speed), # Left
'd': (1, self.speed), # Right
'q': (2, self.speed), # Up
'e': (2, -self.speed), # Down
'g': (3, 1.0), # Gripper close
'r': (3, -1.0), # Gripper open
}
if key in key_map:
idx, val = key_map[key]
action[idx] = val
return action
def record_keyboard_episode(env, teleop, max_steps=500):
"""Ghi lại một episode điều khiển bằng bàn phím."""
obs, info = env.reset()
episode_data = []
for step in range(max_steps):
# Lấy action từ keyboard input
key = get_keyboard_input() # Platform-specific function
action = teleop.get_action_from_key(key)
# Thực thi action
next_obs, reward, terminated, truncated, info = env.step(action)
# Lưu frame
episode_data.append({
"observation.image": obs.get("image", None),
"observation.state": obs.get("state", None),
"action": action,
})
obs = next_obs
if terminated or truncated:
break
return episode_data
Teleoperation bằng SpaceMouse (chuyên nghiệp)
SpaceMouse (3Dconnexion) là thiết bị input 6-DOF lý tưởng cho robot manipulation:
import numpy as np
class SpaceMouseTeleop:
"""Teleoperation bằng SpaceMouse 6-DOF.
SpaceMouse cung cấp 6 trục: tx, ty, tz, rx, ry, rz
Map trực tiếp sang end-effector delta pose.
"""
def __init__(self, pos_sensitivity=1.0, rot_sensitivity=1.0):
try:
import pyspacemouse
self.device = pyspacemouse
success = pyspacemouse.open()
if not success:
raise RuntimeError("Không tìm thấy SpaceMouse!")
except ImportError:
raise ImportError("Cài đặt: pip install pyspacemouse")
self.pos_sensitivity = pos_sensitivity
self.rot_sensitivity = rot_sensitivity
self.gripper_state = 1.0 # 1.0 = mở, -1.0 = đóng
def get_action(self):
"""Đọc SpaceMouse state và trả về action 7D."""
state = self.device.read()
# Map SpaceMouse axes sang robot action
action = np.zeros(7) # [dx, dy, dz, drx, dry, drz, gripper]
# Position deltas
action[0] = state.x * self.pos_sensitivity
action[1] = state.y * self.pos_sensitivity
action[2] = state.z * self.pos_sensitivity
# Rotation deltas
action[3] = state.roll * self.rot_sensitivity
action[4] = state.pitch * self.rot_sensitivity
action[5] = state.yaw * self.rot_sensitivity
# Gripper toggle (nút bấm trên SpaceMouse)
if state.buttons[0]:
self.gripper_state *= -1
action[6] = self.gripper_state
return action
def close(self):
self.device.close()
Teleoperation bằng Leader-Follower (ALOHA style)
Phương pháp này dùng hai robot giống hệt nhau — leader (người điều khiển) và follower (thực hiện task):
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
def setup_leader_follower():
"""Setup hệ thống leader-follower cho ALOHA."""
robot = ManipulatorRobot(
robot_type="aloha",
leader_arms={
"left": FeetechMotorsBus(port="/dev/ttyACM0"),
"right": FeetechMotorsBus(port="/dev/ttyACM1"),
},
follower_arms={
"left": FeetechMotorsBus(port="/dev/ttyACM2"),
"right": FeetechMotorsBus(port="/dev/ttyACM3"),
},
cameras={
"top": OpenCVCamera(0, fps=30, width=640, height=480),
"wrist_left": OpenCVCamera(1, fps=30, width=640, height=480),
"wrist_right": OpenCVCamera(2, fps=30, width=640, height=480),
},
)
return robot
Ghi dữ liệu với LeRobot
Sử dụng lệnh record có sẵn
# Record 50 episodes trong simulation
python lerobot/scripts/control_robot.py \
--robot.type=so100 \
--control.type=record \
--control.fps=30 \
--control.repo_id=my-user/my-pick-place-dataset \
--control.num_episodes=50 \
--control.single_task="Pick the red cube and place it on the green target"
Ghi dữ liệu bằng code tùy chỉnh
import numpy as np
import torch
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
def create_dataset_for_recording(repo_id, robot_type="so100"):
"""Tạo LeRobotDataset mới cho việc ghi dữ liệu."""
dataset = LeRobotDataset.create(
repo_id=repo_id,
fps=30,
robot_type=robot_type,
features={
"observation.images.top": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
},
"observation.images.wrist": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
},
"observation.state": {
"dtype": "float32",
"shape": (6,),
"names": ["joint_positions"],
},
"action": {
"dtype": "float32",
"shape": (6,),
"names": ["joint_positions_target"],
},
},
)
return dataset
def record_episodes(env, teleop, dataset, num_episodes=50, max_steps=500):
"""Ghi nhiều episodes teleop demonstration.
Args:
env: Gymnasium environment
teleop: Teleop controller (keyboard, SpaceMouse, etc.)
dataset: LeRobotDataset đã tạo
num_episodes: Số episodes cần ghi
max_steps: Số bước tối đa mỗi episode
"""
successful_episodes = 0
for ep in range(num_episodes):
print(f"\n{'='*50}")
print(f"Episode {ep+1}/{num_episodes}")
print(f"Nhấn Enter để bắt đầu, 'q' để thoát...")
if input().strip() == 'q':
break
obs, info = env.reset()
episode_frames = []
for step in range(max_steps):
# Lấy action từ teleop
action = teleop.get_action()
# Lưu frame hiện tại
dataset.add_frame({
"observation.images.top": obs["image_top"],
"observation.images.wrist": obs["image_wrist"],
"observation.state": obs["state"],
"action": action,
})
# Thực thi action
obs, reward, terminated, truncated, info = env.step(action)
if terminated:
success = info.get("is_success", False)
if success:
successful_episodes += 1
print(f" ✓ Thành công! ({step+1} bước)")
else:
print(f" ✗ Thất bại ({step+1} bước)")
break
if truncated:
print(f" ⏰ Hết thời gian ({step+1} bước)")
break
# Lưu episode vào dataset
dataset.save_episode()
print(f" Tỷ lệ thành công: {successful_episodes}/{ep+1} "
f"({successful_episodes/(ep+1)*100:.0f}%)")
return dataset
# Chạy thu thập dữ liệu
env = make_pick_place_env()
teleop = SpaceMouseTeleop()
dataset = create_dataset_for_recording("my-user/pick-place-50ep")
dataset = record_episodes(env, teleop, dataset, num_episodes=50)
dataset.push_to_hub() # Upload lên HuggingFace Hub
print(f"\nĐã upload {dataset.num_episodes} episodes lên Hub!")
Cấu trúc Episode trong LeRobotDataset
Hiểu cấu trúc episode rất quan trọng để debug và phân tích dữ liệu:
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
dataset = LeRobotDataset("lerobot/aloha_sim_transfer_cube_human")
# Thông tin tổng quan
print(f"Tổng frames: {dataset.num_frames}")
print(f"Tổng episodes: {dataset.num_episodes}")
print(f"FPS: {dataset.fps}")
# Phân tích từng episode
for ep_idx in range(min(5, dataset.num_episodes)):
ep_start = dataset.episode_data_index["from"][ep_idx].item()
ep_end = dataset.episode_data_index["to"][ep_idx].item()
ep_length = ep_end - ep_start
duration = ep_length / dataset.fps
print(f"\nEpisode {ep_idx}:")
print(f" Frames: {ep_start} → {ep_end} ({ep_length} frames)")
print(f" Duration: {duration:.1f}s")
# Phân tích action distribution
actions = torch.stack([
dataset[i]["action"] for i in range(ep_start, ep_end)
])
print(f" Action range: [{actions.min():.3f}, {actions.max():.3f}]")
print(f" Action std: {actions.std(dim=0).mean():.4f}")
Mẹo thu thập dữ liệu chất lượng cao
1. Camera placement tối ưu
# Camera placement ảnh hưởng lớn đến chất lượng policy
camera_configs = {
# Camera trên cao (bird's eye) — tốt cho spatial reasoning
"top": {
"position": [0.0, 0.0, 1.2],
"orientation": [0, -90, 0], # Nhìn thẳng xuống
"fov": 60,
},
# Camera góc 45° — cân bằng depth và overview
"angle": {
"position": [0.5, -0.5, 0.8],
"orientation": [-30, -45, 0],
"fov": 60,
},
# Wrist camera — chi tiết grasping
"wrist": {
"position": "attached_to_ee",
"fov": 90, # Wide angle cho wrist cam
},
}
# Rule of thumb:
# - Dùng ít nhất 2 camera (1 overview + 1 wrist)
# - Overview camera nên thấy toàn bộ workspace
# - Wrist camera giúp policy nhìn rõ vật thể khi gắp
2. Consistency trong demonstrations
def validate_episode_quality(episode_data, min_steps=50, max_idle_ratio=0.3):
"""Kiểm tra chất lượng episode trước khi lưu.
Args:
episode_data: List các frames
min_steps: Số bước tối thiểu
max_idle_ratio: Tỷ lệ idle (không di chuyển) tối đa
Returns:
(is_valid, reason): Tuple (bool, str)
"""
if len(episode_data) < min_steps:
return False, f"Quá ngắn ({len(episode_data)} < {min_steps} bước)"
# Kiểm tra tỷ lệ idle
actions = np.array([f["action"] for f in episode_data])
action_norms = np.linalg.norm(actions[:, :3], axis=1) # Chỉ xét position
idle_frames = np.sum(action_norms < 0.001)
idle_ratio = idle_frames / len(actions)
if idle_ratio > max_idle_ratio:
return False, f"Quá nhiều idle ({idle_ratio:.0%} > {max_idle_ratio:.0%})"
# Kiểm tra action range bất thường
if np.any(np.abs(actions) > 10.0):
return False, "Action values bất thường (>10.0)"
return True, "OK"
3. Filtering bad episodes
def filter_dataset_by_success(dataset, min_reward=0.5):
"""Lọc bỏ episodes không thành công."""
good_episodes = []
for ep_idx in range(dataset.num_episodes):
ep_start = dataset.episode_data_index["from"][ep_idx].item()
ep_end = dataset.episode_data_index["to"][ep_idx].item()
# Lấy reward cuối episode
final_frame = dataset[ep_end - 1]
if final_frame.get("reward", 0) >= min_reward:
good_episodes.append(ep_idx)
print(f"Giữ lại {len(good_episodes)}/{dataset.num_episodes} episodes "
f"({len(good_episodes)/dataset.num_episodes*100:.0f}%)")
return good_episodes
Ví dụ hoàn chỉnh: Thu thập 50 episodes Pick-Place
Đây là workflow đầy đủ để thu thập dataset cho task pick-and-place:
import numpy as np
import gymnasium as gym
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
def make_pick_place_env():
"""Tạo môi trường pick-place simulation."""
env = gym.make(
"FrankaPickAndPlace-v3",
render_mode="human",
max_episode_steps=500,
)
return env
def scripted_pick_place_policy(obs, phase="approach"):
"""Policy scripted đơn giản cho pick-place.
Có thể dùng thay teleop để tạo dataset nhanh cho thử nghiệm.
"""
ee_pos = obs["observation"][:3] # End-effector position
obj_pos = obs["desired_goal"][:3] # Object position
target_pos = obs["achieved_goal"][:3] # Target position
action = np.zeros(4)
if phase == "approach":
# Di chuyển đến trên vật thể
above_obj = obj_pos.copy()
above_obj[2] += 0.05
action[:3] = (above_obj - ee_pos) * 10
action[3] = 1.0 # Gripper mở
elif phase == "grasp":
# Hạ xuống và gắp
action[:3] = (obj_pos - ee_pos) * 10
action[3] = -1.0 # Gripper đóng
elif phase == "lift":
# Nâng lên
action[:3] = np.array([0, 0, 0.1])
action[3] = -1.0
elif phase == "place":
# Di chuyển đến target
action[:3] = (target_pos - ee_pos) * 10
action[3] = -1.0
return np.clip(action, -1, 1)
def collect_pick_place_dataset(num_episodes=50):
"""Thu thập dataset pick-place hoàn chỉnh."""
env = make_pick_place_env()
dataset = LeRobotDataset.create(
repo_id="my-user/pick-place-50ep",
fps=30,
robot_type="franka",
features={
"observation.image": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
},
"observation.state": {
"dtype": "float32",
"shape": (10,),
"names": ["robot_state"],
},
"action": {
"dtype": "float32",
"shape": (4,),
"names": ["dx", "dy", "dz", "gripper"],
},
},
)
for ep in range(num_episodes):
obs, info = env.reset()
done = False
step = 0
while not done:
# Scripted policy (thay bằng teleop cho data thật)
if step < 30:
action = scripted_pick_place_policy(obs, "approach")
elif step < 50:
action = scripted_pick_place_policy(obs, "grasp")
elif step < 80:
action = scripted_pick_place_policy(obs, "lift")
else:
action = scripted_pick_place_policy(obs, "place")
# Thêm noise nhỏ để tăng diversity
action[:3] += np.random.normal(0, 0.01, 3)
dataset.add_frame({
"observation.image": obs.get("image", np.zeros((480,640,3))),
"observation.state": obs["observation"],
"action": action,
})
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
step += 1
dataset.save_episode()
if (ep + 1) % 10 == 0:
print(f"Đã ghi {ep+1}/{num_episodes} episodes")
# Upload lên Hub
dataset.push_to_hub()
print(f"\nHoàn tất! {dataset.num_episodes} episodes, "
f"{dataset.num_frames} frames tổng cộng")
env.close()
return dataset
# Chạy thu thập dữ liệu
dataset = collect_pick_place_dataset(num_episodes=50)
Papers tham khảo
- RoboTurk: A Crowdsourcing Platform for Robotic Skill Learning — Mandlekar et al., CoRL 2018 — Platform thu thập dữ liệu robot qua web
- MimicGen: A Data Generation System for Scalable Robot Learning — Mandlekar et al., CoRL 2023 — Tăng cường dữ liệu tự động từ ít demonstrations
- ACT: Learning Fine-Grained Bimanual Manipulation — Zhao et al., RSS 2023 — Framework teleop cho ALOHA
Kết luận và bước tiếp theo
Thu thập dữ liệu chất lượng cao là bước quan trọng nhất trong imitation learning. Hãy nhớ các nguyên tắc: consistent demonstrations, đa dạng camera angles, và filtering bad episodes. Chất lượng quan trọng hơn số lượng — 50 episodes tốt thường hiệu quả hơn 200 episodes kém.
Trong bài tiếp theo — Train Policy cho Single-Arm: ACT và Diffusion Policy — chúng ta sẽ dùng dataset vừa thu thập để train và so sánh hai policy phổ biến nhất.
Nếu bạn muốn tìm hiểu thêm về imitation learning trước khi đi sâu, hãy đọc bài về Imitation Learning cơ bản.
Bài viết liên quan
- LeRobot Framework Deep Dive — Kiến trúc và API toàn diện của LeRobot
- Imitation Learning cho Manipulation — Nền tảng lý thuyết imitation learning
- ACT: Action Chunking with Transformers — Deep dive vào policy ACT