ailerobotbimanualdual-armaloha

Bimanual Tasks: Folding, Pouring và Assembly với 2 tay

Train bimanual policies cho các task thực tế — gấp khăn, rót nước, lắp ráp. So sánh ACT vs Diffusion Policy cho dual-arm.

Nguyễn Anh Tuấn30 tháng 3, 202610 phút đọc
Bimanual Tasks: Folding, Pouring và Assembly với 2 tay

Giới thiệu: Hai tay trong hành động

Trong bài trước, chúng ta đã setup và calibrate hệ thống dual-arm. Bây giờ là lúc đưa nó vào hoạt động với các task bimanual thực tế. Đây là những task mà single-arm không thể làm được — và cũng là lý do dual-arm robot đang thu hút sự chú ý lớn trong cộng đồng robotics.

Bài viết này sẽ hướng dẫn bạn train bimanual policies cho ba task kinh điển: gấp khăn, rót nước, và lắp ráp. Chúng ta cũng sẽ so sánh hiệu năng của ACT và Diffusion Policy trên bimanual tasks, phân tích failure modes, và đưa ra giải pháp.

Bimanual robot manipulation

Task 1: Gấp khăn (Towel Folding)

Gấp khăn là benchmark kinh điển cho bimanual manipulation vì đòi hỏi:

  • Coordination: Hai tay phải di chuyển đồng bộ
  • Deformable object handling: Khăn là vật thể mềm, khó predict
  • Precision: Mép khăn phải khớp nhau

Setup task gấp khăn

import numpy as np
from dataclasses import dataclass
from typing import List

@dataclass
class TowelFoldingTask:
    """Configuration cho task gấp khăn.
    
    Phases:
    1. Approach: Hai tay tiếp cận 2 góc khăn
    2. Grasp: Gắp chặt 2 góc
    3. Lift: Nâng khăn lên
    4. Fold: Gập đôi (tay trái gập sang phải hoặc ngược lại)
    5. Release: Thả khăn đã gấp
    """
    towel_size: tuple = (0.3, 0.3)  # 30x30 cm
    towel_position: np.ndarray = None  # Center position
    fold_type: str = "half"  # "half", "quarter", "triangle"
    
    def __post_init__(self):
        if self.towel_position is None:
            self.towel_position = np.array([0.3, 0.0, 0.01])
    
    @property
    def corner_positions(self) -> dict:
        """Vị trí 4 góc khăn."""
        cx, cy, cz = self.towel_position
        w, h = self.towel_size
        return {
            "top_left": np.array([cx - w/2, cy + h/2, cz]),
            "top_right": np.array([cx + w/2, cy + h/2, cz]),
            "bottom_left": np.array([cx - w/2, cy - h/2, cz]),
            "bottom_right": np.array([cx + w/2, cy - h/2, cz]),
        }
    
    def get_grasp_points(self) -> tuple:
        """Trả về 2 điểm gắp cho fold type."""
        corners = self.corner_positions
        if self.fold_type == "half":
            return corners["top_left"], corners["top_right"]
        elif self.fold_type == "triangle":
            return corners["top_left"], corners["bottom_right"]
        return corners["top_left"], corners["top_right"]


def collect_towel_folding_data(robot, dataset, num_episodes=100):
    """Thu thập data cho task gấp khăn.
    
    Tips:
    - Dùng khăn mỏng, vuông, không quá lớn (30x30cm)
    - Đặt khăn phẳng trước mỗi episode
    - Gấp chậm và đều để policy dễ học
    - Ghi ít nhất 100 episodes
    """
    task = TowelFoldingTask()
    
    for ep in range(num_episodes):
        print(f"\nEpisode {ep+1}/{num_episodes}")
        print("Đặt khăn phẳng, nhấn Enter để bắt đầu...")
        input()
        
        step = 0
        recording = True
        
        while recording:
            # Đọc observations
            obs = robot.get_observation()
            
            # Đọc leader positions (người điều khiển)
            left_target = robot.leader_arms["left"].read("Present_Position")
            right_target = robot.leader_arms["right"].read("Present_Position")
            
            # Follower copy leader
            robot.follower_arms["left"].write("Goal_Position", left_target)
            robot.follower_arms["right"].write("Goal_Position", right_target)
            
            # Đọc follower actual positions
            left_state = robot.follower_arms["left"].read("Present_Position")
            right_state = robot.follower_arms["right"].read("Present_Position")
            
            # Lưu frame
            dataset.add_frame({
                "observation.images.top": obs["top_camera"],
                "observation.images.left_wrist": obs["left_wrist_camera"],
                "observation.images.right_wrist": obs["right_wrist_camera"],
                "observation.state": np.concatenate([left_state, right_state]),
                "action": np.concatenate([left_target, right_target]),
            })
            
            step += 1
            if step > 400:
                recording = False
        
        dataset.save_episode()
        
        if (ep + 1) % 10 == 0:
            print(f"Đã ghi {ep+1} episodes")
    
    return dataset

Training bimanual folding policy

from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
import torch

def train_folding_policy(dataset_repo_id, num_epochs=300):
    """Train ACT policy cho towel folding."""
    from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
    
    dataset = LeRobotDataset(dataset_repo_id)
    
    config = ACTConfig(
        input_shapes={
            "observation.images.top": [3, 480, 640],
            "observation.images.left_wrist": [3, 480, 640],
            "observation.images.right_wrist": [3, 480, 640],
            "observation.state": [14],
        },
        output_shapes={"action": [14]},
        input_normalization_modes={
            "observation.images.top": "mean_std",
            "observation.images.left_wrist": "mean_std",
            "observation.images.right_wrist": "mean_std",
            "observation.state": "min_max",
        },
        output_normalization_modes={"action": "min_max"},
        
        # Folding cần chunk lớn cho smooth trajectories
        chunk_size=100,
        n_action_steps=100,
        dim_model=512,
        n_heads=8,
        n_layers=4,
        use_vae=True,
        latent_dim=32,
        kl_weight=10.0,
    )
    
    policy = ACTPolicy(config)
    device = torch.device("cuda")
    policy.to(device)
    
    optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-5, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
        optimizer, T_max=num_epochs, eta_min=1e-6
    )
    dataloader = torch.utils.data.DataLoader(
        dataset, batch_size=8, shuffle=True, num_workers=4
    )
    
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        n_batches = 0
        
        for batch in dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            output = policy.forward(batch)
            loss = output["loss"]
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy.parameters(), 10.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            n_batches += 1
        
        scheduler.step()
        avg_loss = epoch_loss / n_batches
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(policy.state_dict(), "best_folding_policy.pt")
        
        if (epoch + 1) % 25 == 0:
            print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | "
                  f"Best: {best_loss:.4f} | LR: {scheduler.get_last_lr()[0]:.2e}")
    
    return policy

Task 2: Rót nước (Pouring)

Rót nước đòi hỏi temporal coordination chính xác — một tay giữ cốc, tay kia nghiêng bình:

@dataclass
class PouringTask:
    """Configuration cho task rót nước.
    
    Right arm: Giữ bình nước (pitcher)
    Left arm: Giữ cốc (cup)
    
    Phases:
    1. Grasp pitcher (right) và cup (left)
    2. Lift cả hai
    3. Align pitcher trên cup
    4. Tilt pitcher dần (30° → 60° → 90°)
    5. Return pitcher thẳng đứng
    6. Place cả hai xuống
    """
    pitcher_volume_ml: float = 500
    cup_capacity_ml: float = 250
    pour_angle_deg: float = 75  # Góc nghiêng max
    pour_speed: float = 0.5     # rad/s
    
    def get_pour_trajectory(self, n_steps=100):
        """Tạo trajectory cho phase rót.
        
        Returns:
            angles: Array góc nghiêng từ 0 → max → 0
        """
        # Tilt up
        up_steps = n_steps // 2
        tilt_up = np.linspace(0, np.deg2rad(self.pour_angle_deg), up_steps)
        
        # Tilt back down
        down_steps = n_steps - up_steps
        tilt_down = np.linspace(np.deg2rad(self.pour_angle_deg), 0, down_steps)
        
        return np.concatenate([tilt_up, tilt_down])


def evaluate_pouring(policy, env, n_episodes=30):
    """Đánh giá pouring policy.
    
    Metrics:
    - Pour accuracy: Lượng nước rót vào cốc / tổng lượng rót
    - Spill rate: Lượng nước bị đổ ra ngoài
    - Cup stability: Cốc có bị đổ không
    """
    results = {
        "pour_accuracy": [],
        "spill_count": 0,
        "cup_dropped": 0,
        "success": 0,
    }
    
    for ep in range(n_episodes):
        obs, info = env.reset()
        done = False
        
        while not done:
            action = policy.predict(obs)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
        
        water_in_cup = info.get("water_in_cup", 0)
        water_spilled = info.get("water_spilled", 0)
        total_water = water_in_cup + water_spilled
        
        if total_water > 0:
            accuracy = water_in_cup / total_water
            results["pour_accuracy"].append(accuracy)
        
        if info.get("cup_dropped", False):
            results["cup_dropped"] += 1
        
        if water_spilled > 10:  # > 10ml
            results["spill_count"] += 1
        
        if accuracy > 0.9 and not info.get("cup_dropped", False):
            results["success"] += 1
    
    avg_accuracy = np.mean(results["pour_accuracy"]) if results["pour_accuracy"] else 0
    print(f"Success rate: {results['success']/n_episodes:.1%}")
    print(f"Pour accuracy: {avg_accuracy:.1%}")
    print(f"Spill rate: {results['spill_count']/n_episodes:.1%}")
    print(f"Cup drop rate: {results['cup_dropped']/n_episodes:.1%}")
    
    return results

Bimanual pouring task

Task 3: Lắp ráp (Assembly)

Assembly tasks đòi hỏi contact coordination — một tay giữ, tay kia thao tác:

@dataclass
class AssemblyTask:
    """Configuration cho task lắp ráp.
    
    Ví dụ: Vặn nắp chai
    Left arm: Giữ chai
    Right arm: Vặn nắp
    
    Challenges:
    - Contact force control
    - Precise alignment
    - Rotation coordination
    """
    task_type: str = "screw_cap"  # "screw_cap", "peg_in_hole", "snap_fit"
    
    def get_subtasks(self) -> List[str]:
        if self.task_type == "screw_cap":
            return [
                "grasp_bottle_left",    # Left giữ chai
                "grasp_cap_right",      # Right gắp nắp
                "align_cap_to_bottle",  # Căn chỉnh nắp
                "screw_clockwise",      # Vặn nắp (xoay right arm)
                "verify_tight",         # Kiểm tra đã chặt chưa
                "release_both",         # Thả cả hai
            ]
        elif self.task_type == "peg_in_hole":
            return [
                "grasp_base_left",      # Left giữ base
                "grasp_peg_right",      # Right gắp peg
                "align_peg_to_hole",    # Căn chỉnh
                "insert_peg",           # Đẩy peg vào lỗ
                "verify_inserted",      # Kiểm tra
                "release_both",
            ]
        return []


def train_assembly_policy(dataset_repo_id):
    """Train policy cho assembly task.
    
    Assembly cần:
    - Smaller chunk_size (precision cao)
    - Wrist cameras quan trọng (close-up contact)
    - Force feedback nếu có
    """
    from lerobot.common.policies.diffusion.configuration_diffusion import DiffusionConfig
    from lerobot.common.policies.diffusion.modeling_diffusion import DiffusionPolicy
    
    # Diffusion Policy tốt hơn cho assembly vì multi-modal
    config = DiffusionConfig(
        input_shapes={
            "observation.images.top": [3, 480, 640],
            "observation.images.left_wrist": [3, 480, 640],
            "observation.images.right_wrist": [3, 480, 640],
            "observation.state": [14],
        },
        output_shapes={"action": [14]},
        input_normalization_modes={
            "observation.images.top": "mean_std",
            "observation.images.left_wrist": "mean_std",
            "observation.images.right_wrist": "mean_std",
            "observation.state": "min_max",
        },
        output_normalization_modes={"action": "min_max"},
        
        num_inference_steps=50,
        down_dims=[256, 512, 1024],
        n_obs_steps=2,
        horizon=16,
        n_action_steps=8,
        noise_scheduler_type="DDIM",  # Faster inference
        vision_backbone="resnet18",
        crop_shape=[84, 84],
    )
    
    policy = DiffusionPolicy(config)
    return policy

So sánh ACT vs Diffusion Policy cho Bimanual

def compare_bimanual_policies(act_results, diff_results, tasks):
    """So sánh kết quả ACT vs Diffusion Policy trên bimanual tasks."""
    
    print(f"\n{'Task':<20} {'ACT':>15} {'Diffusion':>15}")
    print(f"{'='*52}")
    
    for task in tasks:
        act_sr = act_results[task]["success_rate"]
        diff_sr = diff_results[task]["success_rate"]
        winner = "ACT" if act_sr > diff_sr else "Diff" if diff_sr > act_sr else "Tie"
        
        print(f"{task:<20} {act_sr:>14.1%} {diff_sr:>14.1%}  ({winner})")
    
    print(f"\n{'Inference (ms)':<20} {act_results['inference_ms']:>15.1f} "
          f"{diff_results['inference_ms']:>15.1f}")

Kết quả benchmark (typical)

Task ACT Diffusion Policy Ghi chú
Towel Folding 65% 72% Diffusion tốt hơn với deformable
Pouring 78% 75% ACT tốt hơn nhờ smooth trajectories
Assembly (cap) 55% 68% Diffusion tốt hơn với contact-rich
Cube Transfer 85% 82% ACT tốt hơn với task đơn giản
Inference 8ms 25ms (DDIM) ACT nhanh hơn 3x

Failure Modes và Fixes

Failure modes phổ biến

BIMANUAL_FAILURE_MODES = {
    "temporal_desync": {
        "description": "Hai tay không đồng bộ — một tay nhanh hơn",
        "frequency": "30% of failures",
        "fix": [
            "Tăng chunk_size để cả 2 tay plan dài hơn",
            "Thêm temporal loss penalty cho desync",
            "Thu thập data với tốc độ đều hơn",
        ],
    },
    "grasp_slip": {
        "description": "Một tay buông object giữa chừng",
        "frequency": "25% of failures",
        "fix": [
            "Thêm force observation nếu có sensor",
            "Train separate grasp maintenance policy",
            "Tăng gripper action frequency",
        ],
    },
    "contact_collision": {
        "description": "Hai tay va vào nhau",
        "frequency": "20% of failures",
        "fix": [
            "Thêm self-collision penalty trong training",
            "Dùng workspace separation constraints",
            "Thu thập data cẩn thận hơn, tránh va chạm",
        ],
    },
    "wrong_sequence": {
        "description": "Thực hiện sai thứ tự sub-tasks",
        "frequency": "15% of failures",
        "fix": [
            "Dùng hierarchical policy (bài 5)",
            "Thêm sub-task indicator trong observation",
            "Curriculum learning từ đơn giản đến phức tạp",
        ],
    },
    "overshoot": {
        "description": "Di chuyển quá mức — pour quá nhiều, fold quá mạnh",
        "frequency": "10% of failures",
        "fix": [
            "Giảm action magnitude trong training",
            "Dùng action smoothing (EMA)",
            "Thêm boundary constraints",
        ],
    },
}


def analyze_failures(episodes, success_threshold=0.8):
    """Phân tích failure modes từ evaluation episodes."""
    failures = {mode: 0 for mode in BIMANUAL_FAILURE_MODES}
    total_failures = 0
    
    for ep in episodes:
        if ep["success_rate"] < success_threshold:
            total_failures += 1
            
            # Phân loại failure
            if ep.get("arm_desync", 0) > 0.1:
                failures["temporal_desync"] += 1
            if ep.get("grasp_lost", False):
                failures["grasp_slip"] += 1
            if ep.get("self_collision", False):
                failures["contact_collision"] += 1
            if ep.get("wrong_order", False):
                failures["wrong_sequence"] += 1
            if ep.get("overshoot", False):
                failures["overshoot"] += 1
    
    print(f"Total failures: {total_failures}/{len(episodes)}")
    for mode, count in sorted(failures.items(), key=lambda x: -x[1]):
        if count > 0:
            pct = count / total_failures * 100
            print(f"  {mode}: {count} ({pct:.0f}%)")
            for fix in BIMANUAL_FAILURE_MODES[mode]["fix"]:
                print(f"    Fix: {fix}")

Temporal Synchronization

class TemporalSyncModule:
    """Module đồng bộ hóa actions giữa 2 arms.
    
    Đảm bảo left và right arm di chuyển đồng bộ,
    đặc biệt quan trọng cho contact-rich tasks.
    """
    
    def __init__(self, sync_weight=0.1):
        self.sync_weight = sync_weight
    
    def compute_sync_loss(self, left_actions, right_actions):
        """Tính sync loss giữa 2 arms.
        
        Penalize khi velocity difference quá lớn.
        """
        # Velocity = difference between consecutive actions
        left_vel = left_actions[:, 1:] - left_actions[:, :-1]
        right_vel = right_actions[:, 1:] - right_actions[:, :-1]
        
        # Magnitude ratio — 2 arms should move at similar speed
        left_speed = torch.norm(left_vel, dim=-1)
        right_speed = torch.norm(right_vel, dim=-1)
        
        # Avoid division by zero
        speed_ratio = (left_speed + 1e-6) / (right_speed + 1e-6)
        
        # Penalty khi ratio quá xa 1.0
        sync_loss = torch.mean((speed_ratio - 1.0) ** 2)
        
        return self.sync_weight * sync_loss

Papers tham khảo

  1. ALOHA 2: An Enhanced Low-Cost Hardware for Bimanual TeleoperationAldaco et al., 2024 — Upgrade hardware và results
  2. Bi-KVIL: Keypoints-based Visual Imitation Learning for Bimanual ManipulationGrotz et al., CoRL 2024 — Keypoint-based approach cho bimanual
  3. ACT: Learning Fine-Grained Bimanual ManipulationZhao et al., RSS 2023 — Foundation paper cho bimanual ACT

Kết luận và bước tiếp theo

Bimanual manipulation mở ra khả năng thực hiện các task mà single-arm không thể. Key insights từ bài này:

  • Towel folding: Deformable → cần nhiều data, Diffusion Policy thường tốt hơn
  • Pouring: Temporal precision → ACT tốt hơn nhờ smooth chunks
  • Assembly: Contact-rich → Diffusion Policy + wrist cameras
  • Failure analysis quan trọng hơn chỉ nhìn success rate

Bài tiếp theo — Mobile Manipulation — sẽ thêm chiều mới: di chuyển. Robot không chỉ đứng một chỗ mà còn phải navigate + manipulate.

Bài viết liên quan

NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWTutorial
SimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPhần 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 phút đọc
NEWTutorial
SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Phần 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 phút đọc
NEWTutorial
PEFT/LoRA Fine-tune & Deploy VLA
lerobotpeftloradeploymentvlaPhần 15

PEFT/LoRA Fine-tune & Deploy VLA

Fine-tune VLA lớn bằng LoRA trên GPU nhỏ, deploy lên robot thật với Real-Time Chunking — production-ready workflow.

11/4/202612 phút đọc