← Quay lại Blog
aiai-perceptionimitation-learningprogramming

Imitation Learning: BC, DAgger và DAPG cho robot

Tại sao imitation learning quan trọng hơn RL cho nhiều bài toán manipulation — cách thu thập data và train policy.

Nguyễn Anh Tuấn8 tháng 3, 202611 phút đọc
Imitation Learning: BC, DAgger và DAPG cho robot

Imitation Learning — Dạy robot bằng cách làm mẫu

Phần 1, mình đã giới thiệu RL algorithms cho robotics. Nhưng thực tế, nhiều bài toán manipulation không cần RL — chỉ cần cho robot xem người làm, rồi bắt chước. Đó chính là Imitation Learning (IL).

Tại sao IL quan trọng? Vì RL cho manipulation cực kỳ khó:

Imitation learning giải quyết tất cả bằng cách bỏ qua reward hoàn toàn — thay vào đó, học trực tiếp từ demonstrations của expert (người điều khiển).

Người điều khiển robot thông qua teleoperation để thu thập demonstration data

Behavioral Cloning (BC) — Đơn giản nhất, nhanh nhất

Behavioral Cloning là phương pháp IL đơn giản nhất: supervised learning thuần túy trên cặp (observation, action) từ expert demonstrations.

Cách hoạt động

Thu thập data:
  Expert điều khiển robot → ghi lại (state, action) mỗi timestep
  Dataset: D = {(s_1, a_1), (s_2, a_2), ..., (s_N, a_N)}

Training:
  Minimize L = Σ ||π_θ(s_i) - a_i||²
  → Supervised regression: cho state, predict action

Deployment:
  Robot tự chạy policy π_θ đã train

Nói đơn giản: BC = regression. Bạn cho model cặp (input, output) và train nó predict output từ input. Không cần environment, không cần reward, không cần simulation.

Ưu điểm

Nhược điểm — Distribution Shift

BC có một vấn đề cốt lõi: distribution shift (còn gọi là covariate shift).

Training:  Robot thấy states từ expert trajectory
           s_expert → a_expert (đúng đường)

Deployment: Robot mắc lỗi nhỏ → đi chệch → thấy state mới
            s_new (chưa bao giờ thấy) → a_??? (không biết)
            → lỗi tích lũy → thất bại hoàn toàn

Expert luôn đi đúng đường, nên training data chỉ chứa states "tốt". Khi robot tự chạy và mắc lỗi nhỏ, nó rơi vào states chưa thấy bao giờ — và không biết quay lại. Lỗi nhỏ tích lũy thành lỗi lớn theo thời gian.

Giải pháp cho distribution shift

  1. Thu thập nhiều data hơn: Đa dạng demonstrations — không chỉ 1 cách làm
  2. Data augmentation: Thêm noise vào observations khi training
  3. Action chunking: Predict nhiều actions cùng lúc thay vì từng step (xem Phần 3)
  4. DAgger: Phương pháp tiếp theo mình sẽ giới thiệu

DAgger — Iterative, Sửa Distribution Shift

DAgger (Dataset Aggregation) (Ross et al., 2011) giải quyết distribution shift bằng cách iterative: cho robot chạy policy hiện tại, expert sửa lỗi, thêm data mới vào dataset, train lại.

Algorithm

Algorithm DAgger:
  1. Thu thập dataset D₀ từ expert demonstrations
  2. Train policy π₁ trên D₀ (BC)
  3. For i = 1, 2, ..., N:
     a. Cho robot chạy π_i → thu thập states {s₁, s₂, ...}
     b. Expert label actions cho các states đó: {(s₁, a*₁), (s₂, a*₂), ...}
     c. Gộp data: D_i = D_{i-1} ∪ {new labeled data}
     d. Train π_{i+1} trên D_i
  4. Return π_N

Tại sao hiệu quả?

Vòng lặp DAgger đảm bảo training data bao phủ cả states mà robot thực sự gặp, không chỉ states từ expert. Sau vài iterations, distribution shift giảm đáng kể.

Ưu điểm

Nhược điểm

Biến thể thực tế

Trong thực tế, DAgger thuần rất ít khi được dùng vì cần expert luôn sẵn sàng. Các biến thể phổ biến hơn:

DAPG — Kết hợp Demos với RL Fine-tuning

DAPG (Demonstration Augmented Policy Gradient) (Rajeswaran et al., 2018) là best of both worlds: bắt đầu từ demonstrations, rồi fine-tune bằng RL.

Hai giai đoạn

Giai đoạn 1: Pre-training (BC)
  → Train policy ban đầu từ demonstrations
  → Policy biết "đại khái" cách làm, nhưng chưa tối ưu

Giai đoạn 2: RL Fine-tuning với demo augmented reward
  → Dùng RL (PPO/NPG) để tối ưu policy
  → Thêm auxiliary reward: ||π(s) - π_demo(s)||
    → Giữ policy gần demonstrations, không drift quá xa
  → Giảm dần auxiliary reward theo thời gian

Tại sao tốt hơn BC thuần và RL thuần?

Phương pháp Vấn đề DAPG giải quyết
BC thuần Distribution shift, không tối ưu RL fine-tuning cải thiện performance
RL thuần Sample inefficient, sparse reward Demo pre-training cho starting point tốt
BC → RL (naive) RL phá hỏng BC policy Auxiliary reward giữ gần demos

Kết quả ấn tượng

DAPG đặc biệt mạnh cho dexterous manipulation — bài toán cực khó với bàn tay robot 24 DoF:

Task RL thuần BC thuần DAPG
Door opening 0% (1M steps) 45% 95%
Tool use 0% (1M steps) 30% 88%
Object relocation 12% (1M steps) 52% 92%

RL thuần thậm chí không solve được trong 1 triệu steps, nhưng DAPG với chỉ 25 demonstrations + 200K RL steps đạt >90% success rate.

Robot hand thực hiện dexterous manipulation task

Thu thập Demonstration Data

Data collection là phần quan trọng nhất — policy chỉ tốt bằng data. Có 3 phương pháp chính:

1. Teleoperation

Người điều khiển robot từ xa qua controller, VR headset, hoặc master device.

Phương pháp Setup cost Quality Speed
Keyboard/Joystick Thấp Thấp (6-DoF khó) Chậm
VR Controllers Trung bình Cao Trung bình
Master-slave (ALOHA) Cao Rất cao Nhanh
SpaceMouse Thấp Trung bình Trung bình

ALOHA (hệ thống từ paper ACT) dùng master robot — người điều khiển robot master, slave robot copy chính xác movements. Cho quality tốt nhất cho manipulation.

2. Kinesthetic Teaching

Người cầm trực tiếp robot và dẫn qua trajectory mong muốn. Robot ở chế độ gravity compensation — tay người di chuyển, robot ghi lại joint positions.

3. VR-based Collection

Dùng VR headset + controllers để điều khiển robot trong simulation hoặc real-time.

Bảng so sánh tổng hợp

Tiêu chí BC DAgger DAPG
Cần expert online? Không (offline) Có (mỗi iteration) Không (offline demos)
Cần reward function? Không Không Có (cho RL phase)
Cần simulation? Không Tùy (có thể real) Thường cần (RL phase)
Distribution shift Có (vấn đề lớn) Giải quyết Giải quyết (qua RL)
Sample efficiency Rất cao Cao Trung bình
Performance ceiling Trung bình Cao Rất cao
Long-horizon tasks Kém Tốt Tốt
Implementation Đơn giản Trung bình Phức tạp
Demos cần 50-200 20-50 + iterations 20-50 + RL
Best for Quick start Safety-critical Best performance

Thực hành: Behavioral Cloning với PyTorch

Đây là implementation BC đơn giản nhưng đầy đủ — bạn có thể dùng trực tiếp cho robot project:

"""
Behavioral Cloning cho robot manipulation
Yêu cầu: pip install torch numpy
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np


class RobotDemoDataset(Dataset):
    """Dataset từ demonstration files."""

    def __init__(self, demo_dir: str, transform=None):
        """
        demo_dir: thư mục chứa .npz files, mỗi file = 1 demonstration
        Mỗi .npz chứa: observations (T, obs_dim), actions (T, act_dim)
        """
        import glob
        self.observations = []
        self.actions = []

        for f in sorted(glob.glob(f"{demo_dir}/*.npz")):
            data = np.load(f)
            self.observations.append(data["observations"])
            self.actions.append(data["actions"])

        self.observations = np.concatenate(self.observations, axis=0)
        self.actions = np.concatenate(self.actions, axis=0)

        # Normalize observations — rất quan trọng cho training stability
        self.obs_mean = self.observations.mean(axis=0)
        self.obs_std = self.observations.std(axis=0) + 1e-8
        self.observations = (self.observations - self.obs_mean) / self.obs_std

        # Normalize actions về [-1, 1]
        self.act_mean = self.actions.mean(axis=0)
        self.act_std = self.actions.std(axis=0) + 1e-8
        self.actions = (self.actions - self.act_mean) / self.act_std

        print(f"Loaded {len(self)} samples from {demo_dir}")
        print(f"  Obs dim: {self.observations.shape[1]}")
        print(f"  Act dim: {self.actions.shape[1]}")

    def __len__(self):
        return len(self.observations)

    def __getitem__(self, idx):
        return (
            torch.FloatTensor(self.observations[idx]),
            torch.FloatTensor(self.actions[idx]),
        )


class BCPolicy(nn.Module):
    """MLP policy cho Behavioral Cloning."""

    def __init__(self, obs_dim: int, act_dim: int, hidden_dim: int = 256):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(obs_dim, hidden_dim),
            nn.ReLU(),
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.LayerNorm(hidden_dim),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, act_dim),
            nn.Tanh(),  # Output trong [-1, 1] → denormalize khi deploy
        )

    def forward(self, obs):
        return self.network(obs)


def train_bc(
    demo_dir: str,
    obs_dim: int = 12,
    act_dim: int = 7,
    epochs: int = 200,
    batch_size: int = 256,
    lr: float = 1e-3,
    save_path: str = "bc_policy.pt",
):
    """Train BC policy từ demonstrations."""

    # 1. Load data
    dataset = RobotDemoDataset(demo_dir)
    train_size = int(0.9 * len(dataset))
    val_size = len(dataset) - train_size
    train_set, val_set = torch.utils.data.random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=batch_size)

    # 2. Model + optimizer
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    policy = BCPolicy(obs_dim, act_dim).to(device)
    optimizer = optim.AdamW(policy.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

    # 3. Training loop
    best_val_loss = float("inf")

    for epoch in range(epochs):
        # Train
        policy.train()
        train_loss = 0
        for obs, actions in train_loader:
            obs, actions = obs.to(device), actions.to(device)
            pred = policy(obs)
            loss = nn.functional.mse_loss(pred, actions)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()

        # Validate
        policy.eval()
        val_loss = 0
        with torch.no_grad():
            for obs, actions in val_loader:
                obs, actions = obs.to(device), actions.to(device)
                pred = policy(obs)
                val_loss += nn.functional.mse_loss(pred, actions).item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        scheduler.step()

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save({
                "model": policy.state_dict(),
                "obs_mean": dataset.obs_mean,
                "obs_std": dataset.obs_std,
                "act_mean": dataset.act_mean,
                "act_std": dataset.act_std,
            }, save_path)

        if epoch % 20 == 0:
            print(f"Epoch {epoch}: train_loss={train_loss:.6f}, val_loss={val_loss:.6f}")

    print(f"Training complete! Best val_loss: {best_val_loss:.6f}")
    print(f"Model saved to {save_path}")


# Chạy training
if __name__ == "__main__":
    train_bc(
        demo_dir="./demos/pick_and_place/",
        obs_dim=12,   # 6 joints + 3 gripper pos + 3 object pos
        act_dim=7,    # 6 joint velocities + 1 gripper
        epochs=200,
        batch_size=256,
    )

Deploy policy trên robot

def deploy_bc_policy(policy_path: str, robot):
    """Load và chạy BC policy trên robot thực."""
    checkpoint = torch.load(policy_path, map_location="cpu")
    policy = BCPolicy(obs_dim=12, act_dim=7)
    policy.load_state_dict(checkpoint["model"])
    policy.eval()

    obs_mean = checkpoint["obs_mean"]
    obs_std = checkpoint["obs_std"]
    act_mean = checkpoint["act_mean"]
    act_std = checkpoint["act_std"]

    while True:
        # Đọc sensor data từ robot
        obs = robot.get_observation()  # np.array shape (12,)

        # Normalize observation (giống lúc training)
        obs_norm = (obs - obs_mean) / obs_std
        obs_tensor = torch.FloatTensor(obs_norm).unsqueeze(0)

        # Predict action
        with torch.no_grad():
            action_norm = policy(obs_tensor).squeeze(0).numpy()

        # Denormalize action
        action = action_norm * act_std + act_mean

        # Gửi action đến robot
        robot.execute_action(action)

Khi nào dùng phương pháp nào?

Bước tiếp theo

BC và DAgger là foundation tốt, nhưng single-step prediction có vấn đề nghiêm trọng với temporal correlation trong robot actions. Trong bài tiếp theo, mình sẽ phân tích Action Chunking Transformers (ACT) — kiến trúc predict nhiều actions cùng lúc, đạt kết quả state-of-the-art cho bimanual manipulation.


Bài viết liên quan

Bài viết liên quan

Sim-to-Real Transfer: Train simulation, chạy thực tế
ai-perceptionresearchrobotics

Sim-to-Real Transfer: Train simulation, chạy thực tế

Kỹ thuật chuyển đổi mô hình từ simulation sang robot thật — domain randomization, system identification và best practices.

1/4/202612 phút đọc
Nghiên cứuEmbodied AI 2026: Toàn cảnh và xu hướng
ai-perceptionresearchrobotics

Embodied AI 2026: Toàn cảnh và xu hướng

Tổng quan embodied AI -- từ foundation models, sim-to-real đến robot learning tại scale với open-source tools.

25/3/202612 phút đọc
TutorialHands-on: Fine-tune OpenVLA với LeRobot
ai-perceptionvlatutorialPhần 7

Hands-on: Fine-tune OpenVLA với LeRobot

Tutorial thực hành — fine-tune OpenVLA trên custom data, LoRA, quantization và deploy trên robot thật.

23/3/202613 phút đọc