ailerobotactdiffusion-policysingle-arm

Train Policy cho Single-Arm: ACT và Diffusion Policy

Hướng dẫn train ACT và Diffusion Policy trên LeRobot, so sánh hiệu năng, hyperparameter tuning và visualize kết quả.

Nguyễn Anh Tuấn18 tháng 3, 20269 phút đọc
Train Policy cho Single-Arm: ACT và Diffusion Policy

Giới thiệu: Từ dữ liệu đến hành động

Trong bài trước, chúng ta đã thu thập dataset demonstration cho task pick-and-place. Bây giờ là lúc biến dữ liệu đó thành policy — mô hình có khả năng tự điều khiển robot hoàn thành tác vụ mà không cần con người.

Bài viết này sẽ hướng dẫn bạn train hai policy phổ biến nhất trong LeRobot: ACT (Action Chunking with Transformers)Diffusion Policy. Chúng ta sẽ so sánh trực tiếp hiệu năng của hai phương pháp trên cùng một dataset, tìm hiểu cách tune hyperparameters, và visualize kết quả.

Training robot policies

ACT: Action Chunking with Transformers

Ý tưởng cốt lõi

ACT, được giới thiệu bởi Zhao et al. (RSS 2023), giải quyết một vấn đề quan trọng: temporal compounding errors. Thay vì predict từng action một (dẫn đến drift tích lũy), ACT predict một chunk gồm nhiều actions cùng lúc.

Kiến trúc ACT gồm:

  • CVAE Encoder: Nén sequence of actions thành latent vector z
  • Transformer Decoder: Từ observation + z, sinh ra chunk of actions
  • Action chunking: Thay vì 1 action/step, predict k actions cùng lúc

Cấu hình và training ACT

from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import torch
from torch.utils.data import DataLoader

# Load dataset
dataset = LeRobotDataset("lerobot/pusht")

# Cấu hình ACT — các hyperparameters quan trọng
act_config = ACTConfig(
    input_shapes={
        "observation.image": [3, 96, 96],
        "observation.state": [2],
    },
    output_shapes={
        "action": [2],
    },
    input_normalization_modes={
        "observation.image": "mean_std",
        "observation.state": "min_max",
    },
    output_normalization_modes={
        "action": "min_max",
    },
    
    # === Hyperparameters quan trọng ===
    chunk_size=100,        # Số actions predict cùng lúc
    n_action_steps=100,    # Số actions thực thi trước khi re-predict
    
    # Transformer architecture
    dim_model=512,         # Hidden dimension
    n_heads=8,             # Attention heads
    n_layers=1,            # Số transformer layers (ít hơn = nhanh hơn)
    
    # CVAE settings
    latent_dim=32,         # Latent dimension cho CVAE
    use_vae=True,          # Bật CVAE (quan trọng!)
    kl_weight=10.0,        # Weight của KL loss
    
    # Vision encoder
    vision_backbone="resnet18",  # resnet18 hoặc resnet50
    pretrained_backbone=True,    # Dùng pretrained ImageNet weights
)

# Tạo policy và optimizer
policy = ACTPolicy(act_config)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
policy.to(device)

optimizer = torch.optim.AdamW(
    policy.parameters(),
    lr=1e-5,
    weight_decay=1e-4,
)

# Dataloader
dataloader = DataLoader(
    dataset,
    batch_size=8,
    shuffle=True,
    num_workers=4,
    pin_memory=True,
)

# Training loop
num_epochs = 100
policy.train()

for epoch in range(num_epochs):
    epoch_loss = 0
    num_batches = 0
    
    for batch in dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        
        # Forward pass — ACT tự tính loss
        output = policy.forward(batch)
        loss = output["loss"]
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=10.0)
        optimizer.step()
        
        epoch_loss += loss.item()
        num_batches += 1
    
    avg_loss = epoch_loss / num_batches
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f}")

# Lưu checkpoint
torch.save(policy.state_dict(), "act_checkpoint.pt")

Hyperparameter tuning cho ACT

Parameter Giá trị mặc định Range thử Ảnh hưởng
chunk_size 100 20-200 Lớn hơn = mượt hơn, nhưng kém responsive
n_action_steps 100 = chunk_size Thường bằng chunk_size
kl_weight 10.0 1.0-100.0 Cao = ít diverse, thấp = multimodal
dim_model 512 256-1024 Lớn hơn = expressive hơn, chậm hơn
lr 1e-5 1e-6 to 1e-4 Bắt đầu thấp, tăng dần
vision_backbone resnet18 resnet18/50 resnet50 tốt hơn nhưng chậm hơn

Diffusion Policy

Ý tưởng cốt lõi

Diffusion Policy (Chi et al., RSS 2023) áp dụng diffusion models vào robot learning. Thay vì predict action trực tiếp, nó sinh action từ noise qua quá trình iterative denoising — tương tự cách Stable Diffusion sinh ảnh.

Ưu điểm lớn nhất: khả năng xử lý multi-modal action distributions. Khi cùng một observation có thể dẫn đến nhiều hành động khác nhau (ví dụ: gắp từ bên trái hoặc bên phải), Diffusion Policy xử lý tốt hơn ACT.

Cấu hình và training Diffusion Policy

from lerobot.common.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.common.policies.diffusion.modeling_diffusion import DiffusionPolicy

# Cấu hình Diffusion Policy
diff_config = DiffusionConfig(
    input_shapes={
        "observation.image": [3, 96, 96],
        "observation.state": [2],
    },
    output_shapes={
        "action": [2],
    },
    input_normalization_modes={
        "observation.image": "mean_std",
        "observation.state": "min_max",
    },
    output_normalization_modes={
        "action": "min_max",
    },
    
    # === Diffusion-specific hyperparameters ===
    num_inference_steps=100,    # Denoising steps khi inference
    
    # UNet architecture
    down_dims=[256, 512, 1024],  # Channel dims cho mỗi level
    
    # Observation horizons
    n_obs_steps=2,              # Số observation frames dùng làm input
    horizon=16,                 # Planning horizon (action sequence length)
    n_action_steps=8,           # Số actions thực thi
    
    # Noise scheduler
    noise_scheduler_type="DDPM",  # DDPM hoặc DDIM
    beta_schedule="squaredcos_cap_v2",
    
    # Vision encoder
    vision_backbone="resnet18",
    crop_shape=[84, 84],        # Random crop cho augmentation
)

# Tạo policy
diff_policy = DiffusionPolicy(diff_config)
diff_policy.to(device)

# Optimizer — Diffusion thường cần lr cao hơn ACT
optimizer = torch.optim.AdamW(
    diff_policy.parameters(),
    lr=1e-4,
    weight_decay=1e-6,
    betas=(0.95, 0.999),
)

# Learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=num_epochs,
    eta_min=1e-6,
)

# Training loop
diff_policy.train()

for epoch in range(200):  # Diffusion thường cần nhiều epochs hơn
    epoch_loss = 0
    num_batches = 0
    
    for batch in dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        
        output = diff_policy.forward(batch)
        loss = output["loss"]
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(diff_policy.parameters(), max_norm=10.0)
        optimizer.step()
        
        epoch_loss += loss.item()
        num_batches += 1
    
    lr_scheduler.step()
    avg_loss = epoch_loss / num_batches
    
    if (epoch + 1) % 20 == 0:
        print(f"Epoch {epoch+1}/200 | Loss: {avg_loss:.4f} | "
              f"LR: {lr_scheduler.get_last_lr()[0]:.2e}")

torch.save(diff_policy.state_dict(), "diffusion_checkpoint.pt")

Diffusion process for action generation

Tối ưu inference speed cho Diffusion Policy

Một nhược điểm của Diffusion Policy là inference chậm do cần nhiều denoising steps. Có vài cách tối ưu:

# 1. Dùng DDIM thay DDPM — ít steps hơn, kết quả tương đương
diff_config_fast = DiffusionConfig(
    num_inference_steps=10,              # Giảm từ 100 xuống 10
    noise_scheduler_type="DDIM",         # DDIM nhanh hơn DDPM
    # ... giữ nguyên các params khác
)

# 2. Benchmark inference time
import time

policy.eval()
obs = {k: v[:1].to(device) for k, v in next(iter(dataloader)).items()
       if k.startswith("observation")}

# Warm up
for _ in range(5):
    with torch.no_grad():
        _ = policy.select_action(obs)

# Benchmark
times = []
for _ in range(50):
    start = time.perf_counter()
    with torch.no_grad():
        action = policy.select_action(obs)
    torch.cuda.synchronize()
    times.append(time.perf_counter() - start)

print(f"Inference time: {np.mean(times)*1000:.1f} +/- {np.std(times)*1000:.1f} ms")
# ACT: ~5-10ms | Diffusion (100 steps): ~50-100ms | Diffusion DDIM (10 steps): ~10-20ms

So sánh ACT vs Diffusion Policy

Evaluation trên cùng dataset

import gymnasium as gym
import numpy as np
import torch

def evaluate_policy(policy, env_name, n_episodes=50, max_steps=500):
    """Đánh giá policy trên environment."""
    env = gym.make(env_name)
    policy.eval()
    
    results = {
        "success_rate": 0,
        "avg_steps": 0,
        "avg_reward": 0,
        "path_lengths": [],
    }
    
    successes = 0
    total_steps = 0
    total_reward = 0
    
    for ep in range(n_episodes):
        obs, info = env.reset()
        ep_reward = 0
        ep_steps = 0
        positions = []
        
        for step in range(max_steps):
            obs_tensor = {
                k: torch.tensor(v).unsqueeze(0).to(device) 
                for k, v in obs.items()
            }
            
            with torch.no_grad():
                action = policy.select_action(obs_tensor)
            
            obs, reward, terminated, truncated, info = env.step(
                action.squeeze(0).cpu().numpy()
            )
            
            ep_reward += reward
            ep_steps += 1
            positions.append(obs.get("achieved_goal", np.zeros(3)))
            
            if terminated or truncated:
                break
        
        if info.get("is_success", False):
            successes += 1
        total_steps += ep_steps
        total_reward += ep_reward
        
        # Tính path length
        positions = np.array(positions)
        path_length = np.sum(np.linalg.norm(np.diff(positions, axis=0), axis=1))
        results["path_lengths"].append(path_length)
    
    results["success_rate"] = successes / n_episodes
    results["avg_steps"] = total_steps / n_episodes
    results["avg_reward"] = total_reward / n_episodes
    results["path_efficiency"] = np.mean(results["path_lengths"])
    
    env.close()
    return results


# Đánh giá cả hai policy
act_results = evaluate_policy(act_policy, "lerobot/pusht", n_episodes=50)
diff_results = evaluate_policy(diff_policy, "lerobot/pusht", n_episodes=50)

# In kết quả so sánh
print(f"\n{'Metric':<20} {'ACT':>10} {'Diffusion':>10}")
print(f"{'='*42}")
print(f"{'Success Rate':<20} {act_results['success_rate']:>9.1%} {diff_results['success_rate']:>9.1%}")
print(f"{'Avg Steps':<20} {act_results['avg_steps']:>10.1f} {diff_results['avg_steps']:>10.1f}")
print(f"{'Avg Reward':<20} {act_results['avg_reward']:>10.2f} {diff_results['avg_reward']:>10.2f}")
print(f"{'Path Efficiency':<20} {act_results['path_efficiency']:>10.3f} {diff_results['path_efficiency']:>10.3f}")

Bảng so sánh tổng hợp

Tiêu chí ACT Diffusion Policy
Inference speed ~5-10ms ~50-100ms (DDPM), ~10-20ms (DDIM)
Training epochs 100-200 200-500
Success rate Cao nếu demo đồng nhất Cao hơn với demo đa dạng
Multi-modal support Hạn chế (CVAE giúp phần nào) Tốt — xử lý nhiều modes
Memory Nhẹ hơn Nặng hơn (UNet + scheduler)
Dễ tune Dễ — ít hyperparams Khó hơn — nhiều hyperparams
Best for Tasks rõ ràng, real-time Tasks phức tạp, multi-modal

Visualize learned policies

import matplotlib.pyplot as plt
import numpy as np

def visualize_action_predictions(policy, dataset, n_samples=5):
    """Visualize action predictions vs ground truth."""
    policy.eval()
    
    fig, axes = plt.subplots(n_samples, 2, figsize=(12, 3*n_samples))
    
    for i in range(n_samples):
        # Lấy sample từ dataset
        idx = np.random.randint(len(dataset))
        sample = {k: v.unsqueeze(0).to(device) for k, v in dataset[idx].items()}
        
        # Ground truth action
        gt_action = sample["action"].cpu().numpy().flatten()
        
        # Predicted action
        with torch.no_grad():
            pred_action = policy.select_action(
                {k: v for k, v in sample.items() if k.startswith("observation")}
            ).cpu().numpy().flatten()
        
        # Plot
        axes[i, 0].bar(range(len(gt_action)), gt_action, alpha=0.7, label="Ground Truth")
        axes[i, 0].bar(range(len(pred_action)), pred_action, alpha=0.5, label="Predicted")
        axes[i, 0].set_title(f"Sample {i+1}: Action Comparison")
        axes[i, 0].legend()
        
        # Error
        error = np.abs(gt_action - pred_action[:len(gt_action)])
        axes[i, 1].bar(range(len(error)), error, color='red', alpha=0.7)
        axes[i, 1].set_title(f"Sample {i+1}: Absolute Error")
    
    plt.tight_layout()
    plt.savefig("policy_comparison.png", dpi=150)
    plt.show()

visualize_action_predictions(act_policy, dataset)
visualize_action_predictions(diff_policy, dataset)

Training bằng CLI (nhanh nhất)

Nếu bạn muốn bắt đầu nhanh mà không viết code, LeRobot cung cấp CLI:

# Train ACT trên PushT
python lerobot/scripts/train.py \
    --policy.type=act \
    --dataset.repo_id=lerobot/pusht \
    --training.num_epochs=100 \
    --training.batch_size=8 \
    --training.lr=1e-5 \
    --policy.chunk_size=100 \
    --policy.use_vae=true \
    --output_dir=outputs/act_pusht

# Train Diffusion Policy trên cùng dataset
python lerobot/scripts/train.py \
    --policy.type=diffusion \
    --dataset.repo_id=lerobot/pusht \
    --training.num_epochs=200 \
    --training.batch_size=64 \
    --training.lr=1e-4 \
    --policy.num_inference_steps=100 \
    --output_dir=outputs/diffusion_pusht

# Evaluate
python lerobot/scripts/eval.py \
    --policy.path=outputs/act_pusht/checkpoints/last \
    --env.type=pusht \
    --eval.n_episodes=50

Papers tham khảo

  1. ACT: Learning Fine-Grained Bimanual Manipulation with Low-Cost HardwareZhao et al., RSS 2023 — Paper gốc của ACT
  2. Diffusion Policy: Visuomotor Policy Learning via Action DiffusionChi et al., RSS 2023 — Paper gốc của Diffusion Policy
  3. Consistency PolicyPrasad et al., 2024 — Tăng tốc Diffusion Policy bằng consistency distillation

Kết luận và bước tiếp theo

ACT và Diffusion Policy đại diện cho hai triết lý khác nhau trong robot learning. ACT phù hợp khi bạn cần inference nhanh và demonstrations đồng nhất. Diffusion Policy tốt hơn khi task phức tạp và cần xử lý multi-modal behaviors.

Trong thực tế, hãy bắt đầu với ACT (đơn giản, nhanh), và chuyển sang Diffusion Policy nếu thấy ACT không capture được hành vi đa dạng.

Bài tiếp theo — Multi-Object Manipulation — sẽ mở rộng từ single object sang nhiều vật thể, đòi hỏi policy phải hiểu ngữ cảnh và thứ tự tác vụ.

Bài viết liên quan

NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWTutorial
SimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPhần 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 phút đọc
NEWTutorial
SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Phần 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 phút đọc
NEWTutorial
PEFT/LoRA Fine-tune & Deploy VLA
lerobotpeftloradeploymentvlaPhần 15

PEFT/LoRA Fine-tune & Deploy VLA

Fine-tune VLA lớn bằng LoRA trên GPU nhỏ, deploy lên robot thật với Real-Time Chunking — production-ready workflow.

11/4/202612 phút đọc