Giới thiệu: Từ dữ liệu đến hành động
Trong bài trước, chúng ta đã thu thập dataset demonstration cho task pick-and-place. Bây giờ là lúc biến dữ liệu đó thành policy — mô hình có khả năng tự điều khiển robot hoàn thành tác vụ mà không cần con người.
Bài viết này sẽ hướng dẫn bạn train hai policy phổ biến nhất trong LeRobot: ACT (Action Chunking with Transformers) và Diffusion Policy. Chúng ta sẽ so sánh trực tiếp hiệu năng của hai phương pháp trên cùng một dataset, tìm hiểu cách tune hyperparameters, và visualize kết quả.
ACT: Action Chunking with Transformers
Ý tưởng cốt lõi
ACT, được giới thiệu bởi Zhao et al. (RSS 2023), giải quyết một vấn đề quan trọng: temporal compounding errors. Thay vì predict từng action một (dẫn đến drift tích lũy), ACT predict một chunk gồm nhiều actions cùng lúc.
Kiến trúc ACT gồm:
- CVAE Encoder: Nén sequence of actions thành latent vector z
- Transformer Decoder: Từ observation + z, sinh ra chunk of actions
- Action chunking: Thay vì 1 action/step, predict k actions cùng lúc
Cấu hình và training ACT
from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import torch
from torch.utils.data import DataLoader
# Load dataset
dataset = LeRobotDataset("lerobot/pusht")
# Cấu hình ACT — các hyperparameters quan trọng
act_config = ACTConfig(
input_shapes={
"observation.image": [3, 96, 96],
"observation.state": [2],
},
output_shapes={
"action": [2],
},
input_normalization_modes={
"observation.image": "mean_std",
"observation.state": "min_max",
},
output_normalization_modes={
"action": "min_max",
},
# === Hyperparameters quan trọng ===
chunk_size=100, # Số actions predict cùng lúc
n_action_steps=100, # Số actions thực thi trước khi re-predict
# Transformer architecture
dim_model=512, # Hidden dimension
n_heads=8, # Attention heads
n_layers=1, # Số transformer layers (ít hơn = nhanh hơn)
# CVAE settings
latent_dim=32, # Latent dimension cho CVAE
use_vae=True, # Bật CVAE (quan trọng!)
kl_weight=10.0, # Weight của KL loss
# Vision encoder
vision_backbone="resnet18", # resnet18 hoặc resnet50
pretrained_backbone=True, # Dùng pretrained ImageNet weights
)
# Tạo policy và optimizer
policy = ACTPolicy(act_config)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
policy.to(device)
optimizer = torch.optim.AdamW(
policy.parameters(),
lr=1e-5,
weight_decay=1e-4,
)
# Dataloader
dataloader = DataLoader(
dataset,
batch_size=8,
shuffle=True,
num_workers=4,
pin_memory=True,
)
# Training loop
num_epochs = 100
policy.train()
for epoch in range(num_epochs):
epoch_loss = 0
num_batches = 0
for batch in dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
# Forward pass — ACT tự tính loss
output = policy.forward(batch)
loss = output["loss"]
# Backward pass
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(policy.parameters(), max_norm=10.0)
optimizer.step()
epoch_loss += loss.item()
num_batches += 1
avg_loss = epoch_loss / num_batches
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f}")
# Lưu checkpoint
torch.save(policy.state_dict(), "act_checkpoint.pt")
Hyperparameter tuning cho ACT
| Parameter | Giá trị mặc định | Range thử | Ảnh hưởng |
|---|---|---|---|
chunk_size |
100 | 20-200 | Lớn hơn = mượt hơn, nhưng kém responsive |
n_action_steps |
100 | = chunk_size | Thường bằng chunk_size |
kl_weight |
10.0 | 1.0-100.0 | Cao = ít diverse, thấp = multimodal |
dim_model |
512 | 256-1024 | Lớn hơn = expressive hơn, chậm hơn |
lr |
1e-5 | 1e-6 to 1e-4 | Bắt đầu thấp, tăng dần |
vision_backbone |
resnet18 | resnet18/50 | resnet50 tốt hơn nhưng chậm hơn |
Diffusion Policy
Ý tưởng cốt lõi
Diffusion Policy (Chi et al., RSS 2023) áp dụng diffusion models vào robot learning. Thay vì predict action trực tiếp, nó sinh action từ noise qua quá trình iterative denoising — tương tự cách Stable Diffusion sinh ảnh.
Ưu điểm lớn nhất: khả năng xử lý multi-modal action distributions. Khi cùng một observation có thể dẫn đến nhiều hành động khác nhau (ví dụ: gắp từ bên trái hoặc bên phải), Diffusion Policy xử lý tốt hơn ACT.
Cấu hình và training Diffusion Policy
from lerobot.common.policies.diffusion.configuration_diffusion import DiffusionConfig
from lerobot.common.policies.diffusion.modeling_diffusion import DiffusionPolicy
# Cấu hình Diffusion Policy
diff_config = DiffusionConfig(
input_shapes={
"observation.image": [3, 96, 96],
"observation.state": [2],
},
output_shapes={
"action": [2],
},
input_normalization_modes={
"observation.image": "mean_std",
"observation.state": "min_max",
},
output_normalization_modes={
"action": "min_max",
},
# === Diffusion-specific hyperparameters ===
num_inference_steps=100, # Denoising steps khi inference
# UNet architecture
down_dims=[256, 512, 1024], # Channel dims cho mỗi level
# Observation horizons
n_obs_steps=2, # Số observation frames dùng làm input
horizon=16, # Planning horizon (action sequence length)
n_action_steps=8, # Số actions thực thi
# Noise scheduler
noise_scheduler_type="DDPM", # DDPM hoặc DDIM
beta_schedule="squaredcos_cap_v2",
# Vision encoder
vision_backbone="resnet18",
crop_shape=[84, 84], # Random crop cho augmentation
)
# Tạo policy
diff_policy = DiffusionPolicy(diff_config)
diff_policy.to(device)
# Optimizer — Diffusion thường cần lr cao hơn ACT
optimizer = torch.optim.AdamW(
diff_policy.parameters(),
lr=1e-4,
weight_decay=1e-6,
betas=(0.95, 0.999),
)
# Learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=num_epochs,
eta_min=1e-6,
)
# Training loop
diff_policy.train()
for epoch in range(200): # Diffusion thường cần nhiều epochs hơn
epoch_loss = 0
num_batches = 0
for batch in dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
output = diff_policy.forward(batch)
loss = output["loss"]
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(diff_policy.parameters(), max_norm=10.0)
optimizer.step()
epoch_loss += loss.item()
num_batches += 1
lr_scheduler.step()
avg_loss = epoch_loss / num_batches
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1}/200 | Loss: {avg_loss:.4f} | "
f"LR: {lr_scheduler.get_last_lr()[0]:.2e}")
torch.save(diff_policy.state_dict(), "diffusion_checkpoint.pt")
Tối ưu inference speed cho Diffusion Policy
Một nhược điểm của Diffusion Policy là inference chậm do cần nhiều denoising steps. Có vài cách tối ưu:
# 1. Dùng DDIM thay DDPM — ít steps hơn, kết quả tương đương
diff_config_fast = DiffusionConfig(
num_inference_steps=10, # Giảm từ 100 xuống 10
noise_scheduler_type="DDIM", # DDIM nhanh hơn DDPM
# ... giữ nguyên các params khác
)
# 2. Benchmark inference time
import time
policy.eval()
obs = {k: v[:1].to(device) for k, v in next(iter(dataloader)).items()
if k.startswith("observation")}
# Warm up
for _ in range(5):
with torch.no_grad():
_ = policy.select_action(obs)
# Benchmark
times = []
for _ in range(50):
start = time.perf_counter()
with torch.no_grad():
action = policy.select_action(obs)
torch.cuda.synchronize()
times.append(time.perf_counter() - start)
print(f"Inference time: {np.mean(times)*1000:.1f} +/- {np.std(times)*1000:.1f} ms")
# ACT: ~5-10ms | Diffusion (100 steps): ~50-100ms | Diffusion DDIM (10 steps): ~10-20ms
So sánh ACT vs Diffusion Policy
Evaluation trên cùng dataset
import gymnasium as gym
import numpy as np
import torch
def evaluate_policy(policy, env_name, n_episodes=50, max_steps=500):
"""Đánh giá policy trên environment."""
env = gym.make(env_name)
policy.eval()
results = {
"success_rate": 0,
"avg_steps": 0,
"avg_reward": 0,
"path_lengths": [],
}
successes = 0
total_steps = 0
total_reward = 0
for ep in range(n_episodes):
obs, info = env.reset()
ep_reward = 0
ep_steps = 0
positions = []
for step in range(max_steps):
obs_tensor = {
k: torch.tensor(v).unsqueeze(0).to(device)
for k, v in obs.items()
}
with torch.no_grad():
action = policy.select_action(obs_tensor)
obs, reward, terminated, truncated, info = env.step(
action.squeeze(0).cpu().numpy()
)
ep_reward += reward
ep_steps += 1
positions.append(obs.get("achieved_goal", np.zeros(3)))
if terminated or truncated:
break
if info.get("is_success", False):
successes += 1
total_steps += ep_steps
total_reward += ep_reward
# Tính path length
positions = np.array(positions)
path_length = np.sum(np.linalg.norm(np.diff(positions, axis=0), axis=1))
results["path_lengths"].append(path_length)
results["success_rate"] = successes / n_episodes
results["avg_steps"] = total_steps / n_episodes
results["avg_reward"] = total_reward / n_episodes
results["path_efficiency"] = np.mean(results["path_lengths"])
env.close()
return results
# Đánh giá cả hai policy
act_results = evaluate_policy(act_policy, "lerobot/pusht", n_episodes=50)
diff_results = evaluate_policy(diff_policy, "lerobot/pusht", n_episodes=50)
# In kết quả so sánh
print(f"\n{'Metric':<20} {'ACT':>10} {'Diffusion':>10}")
print(f"{'='*42}")
print(f"{'Success Rate':<20} {act_results['success_rate']:>9.1%} {diff_results['success_rate']:>9.1%}")
print(f"{'Avg Steps':<20} {act_results['avg_steps']:>10.1f} {diff_results['avg_steps']:>10.1f}")
print(f"{'Avg Reward':<20} {act_results['avg_reward']:>10.2f} {diff_results['avg_reward']:>10.2f}")
print(f"{'Path Efficiency':<20} {act_results['path_efficiency']:>10.3f} {diff_results['path_efficiency']:>10.3f}")
Bảng so sánh tổng hợp
| Tiêu chí | ACT | Diffusion Policy |
|---|---|---|
| Inference speed | ~5-10ms | ~50-100ms (DDPM), ~10-20ms (DDIM) |
| Training epochs | 100-200 | 200-500 |
| Success rate | Cao nếu demo đồng nhất | Cao hơn với demo đa dạng |
| Multi-modal support | Hạn chế (CVAE giúp phần nào) | Tốt — xử lý nhiều modes |
| Memory | Nhẹ hơn | Nặng hơn (UNet + scheduler) |
| Dễ tune | Dễ — ít hyperparams | Khó hơn — nhiều hyperparams |
| Best for | Tasks rõ ràng, real-time | Tasks phức tạp, multi-modal |
Visualize learned policies
import matplotlib.pyplot as plt
import numpy as np
def visualize_action_predictions(policy, dataset, n_samples=5):
"""Visualize action predictions vs ground truth."""
policy.eval()
fig, axes = plt.subplots(n_samples, 2, figsize=(12, 3*n_samples))
for i in range(n_samples):
# Lấy sample từ dataset
idx = np.random.randint(len(dataset))
sample = {k: v.unsqueeze(0).to(device) for k, v in dataset[idx].items()}
# Ground truth action
gt_action = sample["action"].cpu().numpy().flatten()
# Predicted action
with torch.no_grad():
pred_action = policy.select_action(
{k: v for k, v in sample.items() if k.startswith("observation")}
).cpu().numpy().flatten()
# Plot
axes[i, 0].bar(range(len(gt_action)), gt_action, alpha=0.7, label="Ground Truth")
axes[i, 0].bar(range(len(pred_action)), pred_action, alpha=0.5, label="Predicted")
axes[i, 0].set_title(f"Sample {i+1}: Action Comparison")
axes[i, 0].legend()
# Error
error = np.abs(gt_action - pred_action[:len(gt_action)])
axes[i, 1].bar(range(len(error)), error, color='red', alpha=0.7)
axes[i, 1].set_title(f"Sample {i+1}: Absolute Error")
plt.tight_layout()
plt.savefig("policy_comparison.png", dpi=150)
plt.show()
visualize_action_predictions(act_policy, dataset)
visualize_action_predictions(diff_policy, dataset)
Training bằng CLI (nhanh nhất)
Nếu bạn muốn bắt đầu nhanh mà không viết code, LeRobot cung cấp CLI:
# Train ACT trên PushT
python lerobot/scripts/train.py \
--policy.type=act \
--dataset.repo_id=lerobot/pusht \
--training.num_epochs=100 \
--training.batch_size=8 \
--training.lr=1e-5 \
--policy.chunk_size=100 \
--policy.use_vae=true \
--output_dir=outputs/act_pusht
# Train Diffusion Policy trên cùng dataset
python lerobot/scripts/train.py \
--policy.type=diffusion \
--dataset.repo_id=lerobot/pusht \
--training.num_epochs=200 \
--training.batch_size=64 \
--training.lr=1e-4 \
--policy.num_inference_steps=100 \
--output_dir=outputs/diffusion_pusht
# Evaluate
python lerobot/scripts/eval.py \
--policy.path=outputs/act_pusht/checkpoints/last \
--env.type=pusht \
--eval.n_episodes=50
Papers tham khảo
- ACT: Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware — Zhao et al., RSS 2023 — Paper gốc của ACT
- Diffusion Policy: Visuomotor Policy Learning via Action Diffusion — Chi et al., RSS 2023 — Paper gốc của Diffusion Policy
- Consistency Policy — Prasad et al., 2024 — Tăng tốc Diffusion Policy bằng consistency distillation
Kết luận và bước tiếp theo
ACT và Diffusion Policy đại diện cho hai triết lý khác nhau trong robot learning. ACT phù hợp khi bạn cần inference nhanh và demonstrations đồng nhất. Diffusion Policy tốt hơn khi task phức tạp và cần xử lý multi-modal behaviors.
Trong thực tế, hãy bắt đầu với ACT (đơn giản, nhanh), và chuyển sang Diffusion Policy nếu thấy ACT không capture được hành vi đa dạng.
Bài tiếp theo — Multi-Object Manipulation — sẽ mở rộng từ single object sang nhiều vật thể, đòi hỏi policy phải hiểu ngữ cảnh và thứ tự tác vụ.
Bài viết liên quan
- Thu thập dữ liệu bằng Teleoperation — Cách thu thập dataset chất lượng cao
- ACT: Action Chunking with Transformers — Deep dive vào kiến trúc ACT
- Diffusion Policy: Từ lý thuyết đến thực hành — Giải thích chi tiết Diffusion Policy