Giới thiệu: Hai tay trong hành động
Trong bài trước, chúng ta đã setup và calibrate hệ thống dual-arm. Bây giờ là lúc đưa nó vào hoạt động với các task bimanual thực tế. Đây là những task mà single-arm không thể làm được — và cũng là lý do dual-arm robot đang thu hút sự chú ý lớn trong cộng đồng robotics.
Bài viết này sẽ hướng dẫn bạn train bimanual policies cho ba task kinh điển: gấp khăn, rót nước, và lắp ráp. Chúng ta cũng sẽ so sánh hiệu năng của ACT và Diffusion Policy trên bimanual tasks, phân tích failure modes, và đưa ra giải pháp.
Task 1: Gấp khăn (Towel Folding)
Gấp khăn là benchmark kinh điển cho bimanual manipulation vì đòi hỏi:
- Coordination: Hai tay phải di chuyển đồng bộ
- Deformable object handling: Khăn là vật thể mềm, khó predict
- Precision: Mép khăn phải khớp nhau
Setup task gấp khăn
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class TowelFoldingTask:
"""Configuration cho task gấp khăn.
Phases:
1. Approach: Hai tay tiếp cận 2 góc khăn
2. Grasp: Gắp chặt 2 góc
3. Lift: Nâng khăn lên
4. Fold: Gập đôi (tay trái gập sang phải hoặc ngược lại)
5. Release: Thả khăn đã gấp
"""
towel_size: tuple = (0.3, 0.3) # 30x30 cm
towel_position: np.ndarray = None # Center position
fold_type: str = "half" # "half", "quarter", "triangle"
def __post_init__(self):
if self.towel_position is None:
self.towel_position = np.array([0.3, 0.0, 0.01])
@property
def corner_positions(self) -> dict:
"""Vị trí 4 góc khăn."""
cx, cy, cz = self.towel_position
w, h = self.towel_size
return {
"top_left": np.array([cx - w/2, cy + h/2, cz]),
"top_right": np.array([cx + w/2, cy + h/2, cz]),
"bottom_left": np.array([cx - w/2, cy - h/2, cz]),
"bottom_right": np.array([cx + w/2, cy - h/2, cz]),
}
def get_grasp_points(self) -> tuple:
"""Trả về 2 điểm gắp cho fold type."""
corners = self.corner_positions
if self.fold_type == "half":
return corners["top_left"], corners["top_right"]
elif self.fold_type == "triangle":
return corners["top_left"], corners["bottom_right"]
return corners["top_left"], corners["top_right"]
def collect_towel_folding_data(robot, dataset, num_episodes=100):
"""Thu thập data cho task gấp khăn.
Tips:
- Dùng khăn mỏng, vuông, không quá lớn (30x30cm)
- Đặt khăn phẳng trước mỗi episode
- Gấp chậm và đều để policy dễ học
- Ghi ít nhất 100 episodes
"""
task = TowelFoldingTask()
for ep in range(num_episodes):
print(f"\nEpisode {ep+1}/{num_episodes}")
print("Đặt khăn phẳng, nhấn Enter để bắt đầu...")
input()
step = 0
recording = True
while recording:
# Đọc observations
obs = robot.get_observation()
# Đọc leader positions (người điều khiển)
left_target = robot.leader_arms["left"].read("Present_Position")
right_target = robot.leader_arms["right"].read("Present_Position")
# Follower copy leader
robot.follower_arms["left"].write("Goal_Position", left_target)
robot.follower_arms["right"].write("Goal_Position", right_target)
# Đọc follower actual positions
left_state = robot.follower_arms["left"].read("Present_Position")
right_state = robot.follower_arms["right"].read("Present_Position")
# Lưu frame
dataset.add_frame({
"observation.images.top": obs["top_camera"],
"observation.images.left_wrist": obs["left_wrist_camera"],
"observation.images.right_wrist": obs["right_wrist_camera"],
"observation.state": np.concatenate([left_state, right_state]),
"action": np.concatenate([left_target, right_target]),
})
step += 1
if step > 400:
recording = False
dataset.save_episode()
if (ep + 1) % 10 == 0:
print(f"Đã ghi {ep+1} episodes")
return dataset
Training bimanual folding policy
from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
import torch
def train_folding_policy(dataset_repo_id, num_epochs=300):
"""Train ACT policy cho towel folding."""
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
dataset = LeRobotDataset(dataset_repo_id)
config = ACTConfig(
input_shapes={
"observation.images.top": [3, 480, 640],
"observation.images.left_wrist": [3, 480, 640],
"observation.images.right_wrist": [3, 480, 640],
"observation.state": [14],
},
output_shapes={"action": [14]},
input_normalization_modes={
"observation.images.top": "mean_std",
"observation.images.left_wrist": "mean_std",
"observation.images.right_wrist": "mean_std",
"observation.state": "min_max",
},
output_normalization_modes={"action": "min_max"},
# Folding cần chunk lớn cho smooth trajectories
chunk_size=100,
n_action_steps=100,
dim_model=512,
n_heads=8,
n_layers=4,
use_vae=True,
latent_dim=32,
kl_weight=10.0,
)
policy = ACTPolicy(config)
device = torch.device("cuda")
policy.to(device)
optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-5, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer, T_max=num_epochs, eta_min=1e-6
)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=8, shuffle=True, num_workers=4
)
best_loss = float('inf')
for epoch in range(num_epochs):
epoch_loss = 0
n_batches = 0
for batch in dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
output = policy.forward(batch)
loss = output["loss"]
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(policy.parameters(), 10.0)
optimizer.step()
epoch_loss += loss.item()
n_batches += 1
scheduler.step()
avg_loss = epoch_loss / n_batches
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(policy.state_dict(), "best_folding_policy.pt")
if (epoch + 1) % 25 == 0:
print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | "
f"Best: {best_loss:.4f} | LR: {scheduler.get_last_lr()[0]:.2e}")
return policy
Task 2: Rót nước (Pouring)
Rót nước đòi hỏi temporal coordination chính xác — một tay giữ cốc, tay kia nghiêng bình:
@dataclass
class PouringTask:
"""Configuration cho task rót nước.
Right arm: Giữ bình nước (pitcher)
Left arm: Giữ cốc (cup)
Phases:
1. Grasp pitcher (right) và cup (left)
2. Lift cả hai
3. Align pitcher trên cup
4. Tilt pitcher dần (30° → 60° → 90°)
5. Return pitcher thẳng đứng
6. Place cả hai xuống
"""
pitcher_volume_ml: float = 500
cup_capacity_ml: float = 250
pour_angle_deg: float = 75 # Góc nghiêng max
pour_speed: float = 0.5 # rad/s
def get_pour_trajectory(self, n_steps=100):
"""Tạo trajectory cho phase rót.
Returns:
angles: Array góc nghiêng từ 0 → max → 0
"""
# Tilt up
up_steps = n_steps // 2
tilt_up = np.linspace(0, np.deg2rad(self.pour_angle_deg), up_steps)
# Tilt back down
down_steps = n_steps - up_steps
tilt_down = np.linspace(np.deg2rad(self.pour_angle_deg), 0, down_steps)
return np.concatenate([tilt_up, tilt_down])
def evaluate_pouring(policy, env, n_episodes=30):
"""Đánh giá pouring policy.
Metrics:
- Pour accuracy: Lượng nước rót vào cốc / tổng lượng rót
- Spill rate: Lượng nước bị đổ ra ngoài
- Cup stability: Cốc có bị đổ không
"""
results = {
"pour_accuracy": [],
"spill_count": 0,
"cup_dropped": 0,
"success": 0,
}
for ep in range(n_episodes):
obs, info = env.reset()
done = False
while not done:
action = policy.predict(obs)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
water_in_cup = info.get("water_in_cup", 0)
water_spilled = info.get("water_spilled", 0)
total_water = water_in_cup + water_spilled
if total_water > 0:
accuracy = water_in_cup / total_water
results["pour_accuracy"].append(accuracy)
if info.get("cup_dropped", False):
results["cup_dropped"] += 1
if water_spilled > 10: # > 10ml
results["spill_count"] += 1
if accuracy > 0.9 and not info.get("cup_dropped", False):
results["success"] += 1
avg_accuracy = np.mean(results["pour_accuracy"]) if results["pour_accuracy"] else 0
print(f"Success rate: {results['success']/n_episodes:.1%}")
print(f"Pour accuracy: {avg_accuracy:.1%}")
print(f"Spill rate: {results['spill_count']/n_episodes:.1%}")
print(f"Cup drop rate: {results['cup_dropped']/n_episodes:.1%}")
return results
Task 3: Lắp ráp (Assembly)
Assembly tasks đòi hỏi contact coordination — một tay giữ, tay kia thao tác:
@dataclass
class AssemblyTask:
"""Configuration cho task lắp ráp.
Ví dụ: Vặn nắp chai
Left arm: Giữ chai
Right arm: Vặn nắp
Challenges:
- Contact force control
- Precise alignment
- Rotation coordination
"""
task_type: str = "screw_cap" # "screw_cap", "peg_in_hole", "snap_fit"
def get_subtasks(self) -> List[str]:
if self.task_type == "screw_cap":
return [
"grasp_bottle_left", # Left giữ chai
"grasp_cap_right", # Right gắp nắp
"align_cap_to_bottle", # Căn chỉnh nắp
"screw_clockwise", # Vặn nắp (xoay right arm)
"verify_tight", # Kiểm tra đã chặt chưa
"release_both", # Thả cả hai
]
elif self.task_type == "peg_in_hole":
return [
"grasp_base_left", # Left giữ base
"grasp_peg_right", # Right gắp peg
"align_peg_to_hole", # Căn chỉnh
"insert_peg", # Đẩy peg vào lỗ
"verify_inserted", # Kiểm tra
"release_both",
]
return []
def train_assembly_policy(dataset_repo_id):
"""Train policy cho assembly task.
Assembly cần:
- Smaller chunk_size (precision cao)
- Wrist cameras quan trọng (close-up contact)
- Force feedback nếu có
"""
from lerobot.common.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.common.policies.diffusion.modeling_diffusion import DiffusionPolicy
# Diffusion Policy tốt hơn cho assembly vì multi-modal
config = DiffusionConfig(
input_shapes={
"observation.images.top": [3, 480, 640],
"observation.images.left_wrist": [3, 480, 640],
"observation.images.right_wrist": [3, 480, 640],
"observation.state": [14],
},
output_shapes={"action": [14]},
input_normalization_modes={
"observation.images.top": "mean_std",
"observation.images.left_wrist": "mean_std",
"observation.images.right_wrist": "mean_std",
"observation.state": "min_max",
},
output_normalization_modes={"action": "min_max"},
num_inference_steps=50,
down_dims=[256, 512, 1024],
n_obs_steps=2,
horizon=16,
n_action_steps=8,
noise_scheduler_type="DDIM", # Faster inference
vision_backbone="resnet18",
crop_shape=[84, 84],
)
policy = DiffusionPolicy(config)
return policy
So sánh ACT vs Diffusion Policy cho Bimanual
def compare_bimanual_policies(act_results, diff_results, tasks):
"""So sánh kết quả ACT vs Diffusion Policy trên bimanual tasks."""
print(f"\n{'Task':<20} {'ACT':>15} {'Diffusion':>15}")
print(f"{'='*52}")
for task in tasks:
act_sr = act_results[task]["success_rate"]
diff_sr = diff_results[task]["success_rate"]
winner = "ACT" if act_sr > diff_sr else "Diff" if diff_sr > act_sr else "Tie"
print(f"{task:<20} {act_sr:>14.1%} {diff_sr:>14.1%} ({winner})")
print(f"\n{'Inference (ms)':<20} {act_results['inference_ms']:>15.1f} "
f"{diff_results['inference_ms']:>15.1f}")
Kết quả benchmark (typical)
| Task | ACT | Diffusion Policy | Ghi chú |
|---|---|---|---|
| Towel Folding | 65% | 72% | Diffusion tốt hơn với deformable |
| Pouring | 78% | 75% | ACT tốt hơn nhờ smooth trajectories |
| Assembly (cap) | 55% | 68% | Diffusion tốt hơn với contact-rich |
| Cube Transfer | 85% | 82% | ACT tốt hơn với task đơn giản |
| Inference | 8ms | 25ms (DDIM) | ACT nhanh hơn 3x |
Failure Modes và Fixes
Failure modes phổ biến
BIMANUAL_FAILURE_MODES = {
"temporal_desync": {
"description": "Hai tay không đồng bộ — một tay nhanh hơn",
"frequency": "30% of failures",
"fix": [
"Tăng chunk_size để cả 2 tay plan dài hơn",
"Thêm temporal loss penalty cho desync",
"Thu thập data với tốc độ đều hơn",
],
},
"grasp_slip": {
"description": "Một tay buông object giữa chừng",
"frequency": "25% of failures",
"fix": [
"Thêm force observation nếu có sensor",
"Train separate grasp maintenance policy",
"Tăng gripper action frequency",
],
},
"contact_collision": {
"description": "Hai tay va vào nhau",
"frequency": "20% of failures",
"fix": [
"Thêm self-collision penalty trong training",
"Dùng workspace separation constraints",
"Thu thập data cẩn thận hơn, tránh va chạm",
],
},
"wrong_sequence": {
"description": "Thực hiện sai thứ tự sub-tasks",
"frequency": "15% of failures",
"fix": [
"Dùng hierarchical policy (bài 5)",
"Thêm sub-task indicator trong observation",
"Curriculum learning từ đơn giản đến phức tạp",
],
},
"overshoot": {
"description": "Di chuyển quá mức — pour quá nhiều, fold quá mạnh",
"frequency": "10% of failures",
"fix": [
"Giảm action magnitude trong training",
"Dùng action smoothing (EMA)",
"Thêm boundary constraints",
],
},
}
def analyze_failures(episodes, success_threshold=0.8):
"""Phân tích failure modes từ evaluation episodes."""
failures = {mode: 0 for mode in BIMANUAL_FAILURE_MODES}
total_failures = 0
for ep in episodes:
if ep["success_rate"] < success_threshold:
total_failures += 1
# Phân loại failure
if ep.get("arm_desync", 0) > 0.1:
failures["temporal_desync"] += 1
if ep.get("grasp_lost", False):
failures["grasp_slip"] += 1
if ep.get("self_collision", False):
failures["contact_collision"] += 1
if ep.get("wrong_order", False):
failures["wrong_sequence"] += 1
if ep.get("overshoot", False):
failures["overshoot"] += 1
print(f"Total failures: {total_failures}/{len(episodes)}")
for mode, count in sorted(failures.items(), key=lambda x: -x[1]):
if count > 0:
pct = count / total_failures * 100
print(f" {mode}: {count} ({pct:.0f}%)")
for fix in BIMANUAL_FAILURE_MODES[mode]["fix"]:
print(f" Fix: {fix}")
Temporal Synchronization
class TemporalSyncModule:
"""Module đồng bộ hóa actions giữa 2 arms.
Đảm bảo left và right arm di chuyển đồng bộ,
đặc biệt quan trọng cho contact-rich tasks.
"""
def __init__(self, sync_weight=0.1):
self.sync_weight = sync_weight
def compute_sync_loss(self, left_actions, right_actions):
"""Tính sync loss giữa 2 arms.
Penalize khi velocity difference quá lớn.
"""
# Velocity = difference between consecutive actions
left_vel = left_actions[:, 1:] - left_actions[:, :-1]
right_vel = right_actions[:, 1:] - right_actions[:, :-1]
# Magnitude ratio — 2 arms should move at similar speed
left_speed = torch.norm(left_vel, dim=-1)
right_speed = torch.norm(right_vel, dim=-1)
# Avoid division by zero
speed_ratio = (left_speed + 1e-6) / (right_speed + 1e-6)
# Penalty khi ratio quá xa 1.0
sync_loss = torch.mean((speed_ratio - 1.0) ** 2)
return self.sync_weight * sync_loss
Papers tham khảo
- ALOHA 2: An Enhanced Low-Cost Hardware for Bimanual Teleoperation — Aldaco et al., 2024 — Upgrade hardware và results
- Bi-KVIL: Keypoints-based Visual Imitation Learning for Bimanual Manipulation — Grotz et al., CoRL 2024 — Keypoint-based approach cho bimanual
- ACT: Learning Fine-Grained Bimanual Manipulation — Zhao et al., RSS 2023 — Foundation paper cho bimanual ACT
Kết luận và bước tiếp theo
Bimanual manipulation mở ra khả năng thực hiện các task mà single-arm không thể. Key insights từ bài này:
- Towel folding: Deformable → cần nhiều data, Diffusion Policy thường tốt hơn
- Pouring: Temporal precision → ACT tốt hơn nhờ smooth chunks
- Assembly: Contact-rich → Diffusion Policy + wrist cameras
- Failure analysis quan trọng hơn chỉ nhìn success rate
Bài tiếp theo — Mobile Manipulation — sẽ thêm chiều mới: di chuyển. Robot không chỉ đứng một chỗ mà còn phải navigate + manipulate.
Bài viết liên quan
- Dual-Arm Setup và Calibration — Setup trước khi chạy tasks
- Bimanual Manipulation Overview — Tổng quan lý thuyết bimanual
- Long-Horizon Tasks — Planning cho multi-step tasks