← Quay lại Blog
researchsim2realrldexterousmanipulationhumanoid

Sim-to-Real RL cho Dexterous Manipulation trên Humanoid

Pipeline hoàn chỉnh từ simulation đến robot thật — grasp, box lift, bimanual handover với automated real-to-sim tuning

Nguyễn Anh Tuấn4 tháng 4, 202617 phút đọc
Sim-to-Real RL cho Dexterous Manipulation trên Humanoid

Từ Simulation đến Robot Thật: Bài toán khó nhất của RL

Sim-to-real transfer luôn là nút thắt cổ chai của robot learning. Bạn có thể train một policy hoàn hảo trong simulation — 99% success rate, chuyển động mượt mà, gắp mọi thứ — nhưng khi đặt lên robot thật, nó fail thảm hại. Tại sao? Vì simulation không bao giờ chính xác 100% so với thế giới thực: ma sát khác, quán tính khác, delay khác, sensor noise khác.

Paper Sim-to-Real Reinforcement Learning for Humanoid Dexterous Manipulation (Feb 2025) đưa ra một recipe thực tế cho sim-to-real humanoid manipulation, giải quyết 3 tasks cụ thể: grasp-and-reach, box lift, và bimanual handover.

Điểm đặc biệt: paper không chỉ dùng domain randomization "standard" — mà có automated real-to-sim tuning module để tự động điều chỉnh simulation cho khớp với robot thật.

Simulation environment cho robot manipulation training

Tổng quan Pipeline

Pipeline gồm 4 giai đoạn chính:

  1. Real-to-Sim Tuning: Tự động điều chỉnh simulation parameters cho khớp robot thật
  2. Specialist Training: Train policy chuyên biệt cho từng task (grasp, lift, handover)
  3. Policy Distillation: Gom specialists thành 1 generalist policy
  4. Sim-to-Real Deployment: Domain randomization + transfer lên hardware

Tại sao Divide-and-Conquer?

Train trực tiếp 1 policy cho tất cả tasks rất khó vì:

Giải pháp: train specialist riêng cho từng task → distill vào 1 generalist. Specialist dễ train hơn, generalist inherit kiến thức từ tất cả specialists.

1. Real-to-Sim Parameter Identification

Automated tuning module so sánh trajectory thật với trajectory simulation, tối ưu hóa physics parameters:

import torch
import numpy as np
from scipy.optimize import minimize


class RealToSimTuner:
    """
    Automated real-to-sim parameter identification.
    Thu thập trajectory từ robot thật → tối ưu simulation parameters
    để simulation reproduce trajectory đó.
    """

    # Parameters cần tune
    PARAM_NAMES = [
        "joint_friction",      # Ma sát khớp (Nm)
        "joint_damping",       # Damping (Nm·s/rad)
        "joint_armature",      # Rotor inertia reflected (kg·m²)
        "contact_friction",    # Ma sát tiếp xúc vật thể
        "contact_restitution", # Hệ số nảy
        "actuator_kp",         # PD gains
        "actuator_kd",
        "com_offset_x",        # Sai lệch trọng tâm (m)
        "com_offset_y",
        "com_offset_z",
        "mass_scale",          # Tỷ lệ khối lượng thực/mô hình
    ]

    def __init__(
        self,
        sim_env,
        real_trajectories: list[dict],
        num_iterations: int = 200,
    ):
        """
        Args:
            sim_env: Isaac Lab environment instance
            real_trajectories: list of dicts with keys:
                - joint_pos: (T, num_joints) — measured positions
                - joint_vel: (T, num_joints) — measured velocities
                - joint_torque: (T, num_joints) — commanded torques
                - timestamps: (T,) — time stamps
        """
        self.sim_env = sim_env
        self.real_trajs = real_trajectories
        self.num_iterations = num_iterations

        # Initial parameter guess (from URDF defaults)
        self.initial_params = self._get_default_params()

    def _get_default_params(self) -> np.ndarray:
        """Load default parameters from URDF/simulation."""
        return np.array([
            0.05,   # joint_friction (Nm)
            0.5,    # joint_damping (Nm·s/rad)
            0.01,   # joint_armature (kg·m²)
            0.8,    # contact_friction
            0.0,    # contact_restitution
            200.0,  # actuator_kp
            20.0,   # actuator_kd
            0.0,    # com_offset_x
            0.0,    # com_offset_y
            0.0,    # com_offset_z
            1.0,    # mass_scale
        ])

    def _simulate_trajectory(
        self, params: np.ndarray, real_traj: dict
    ) -> dict:
        """
        Replay torque commands trong simulation với params cho trước.
        Returns sim trajectory.
        """
        # Apply parameters to simulation
        self.sim_env.set_physics_params({
            "joint_friction": params[0],
            "joint_damping": params[1],
            "joint_armature": params[2],
            "contact_friction": params[3],
            "contact_restitution": params[4],
            "actuator_kp": params[5],
            "actuator_kd": params[6],
            "com_offset": params[7:10],
            "mass_scale": params[10],
        })

        # Reset simulation to initial state from real trajectory
        self.sim_env.reset()
        self.sim_env.set_joint_state(
            real_traj["joint_pos"][0],
            real_traj["joint_vel"][0],
        )

        # Replay torque commands
        sim_positions = []
        sim_velocities = []

        for t in range(len(real_traj["joint_torque"])):
            self.sim_env.apply_torque(real_traj["joint_torque"][t])
            self.sim_env.step()
            sim_positions.append(self.sim_env.get_joint_pos())
            sim_velocities.append(self.sim_env.get_joint_vel())

        return {
            "joint_pos": np.stack(sim_positions),
            "joint_vel": np.stack(sim_velocities),
        }

    def _objective(self, params: np.ndarray) -> float:
        """
        Objective function: tổng MSE giữa sim và real trajectories.
        """
        total_error = 0.0

        for real_traj in self.real_trajs:
            sim_traj = self._simulate_trajectory(params, real_traj)

            # Position error (weighted higher)
            pos_error = np.mean(
                (sim_traj["joint_pos"] - real_traj["joint_pos"]) ** 2
            )

            # Velocity error
            vel_error = np.mean(
                (sim_traj["joint_vel"] - real_traj["joint_vel"]) ** 2
            )

            total_error += 10.0 * pos_error + 1.0 * vel_error

        return total_error / len(self.real_trajs)

    def tune(self) -> dict:
        """
        Chạy optimization để tìm sim parameters tốt nhất.
        Returns: dict of tuned parameters.
        """
        # Parameter bounds (physical constraints)
        bounds = [
            (0.001, 1.0),    # joint_friction
            (0.01, 5.0),     # joint_damping
            (0.001, 0.1),    # joint_armature
            (0.1, 2.0),      # contact_friction
            (0.0, 0.5),      # contact_restitution
            (50.0, 500.0),   # actuator_kp
            (5.0, 100.0),    # actuator_kd
            (-0.05, 0.05),   # com_offset_x
            (-0.05, 0.05),   # com_offset_y
            (-0.05, 0.05),   # com_offset_z
            (0.8, 1.2),      # mass_scale
        ]

        result = minimize(
            self._objective,
            self.initial_params,
            method="L-BFGS-B",
            bounds=bounds,
            options={"maxiter": self.num_iterations, "disp": True},
        )

        tuned_params = result.x
        print(f"[Real-to-Sim] Final error: {result.fun:.6f}")
        print(f"[Real-to-Sim] Tuned params:")
        for name, val in zip(self.PARAM_NAMES, tuned_params):
            print(f"  {name}: {val:.4f}")

        return dict(zip(self.PARAM_NAMES, tuned_params))

2. Reward Function Design cho 3 Tasks

Mỗi task có reward function riêng, nhưng theo generalized formulation dựa trên contact và object goals:

import torch


class ManipulationRewards:
    """
    Generalized reward formulation cho dexterous manipulation.
    Dựa trên contact goals và object goals.
    """

    @staticmethod
    def grasp_and_reach(
        hand_pos: torch.Tensor,
        object_pos: torch.Tensor,
        target_pos: torch.Tensor,
        contact_force: torch.Tensor,
        finger_positions: torch.Tensor,
        object_in_hand: torch.Tensor,
    ) -> dict[str, torch.Tensor]:
        """
        Reward cho task grasp-and-reach.
        Phase 1: Tiến gần vật → Phase 2: Gắp → Phase 3: Di chuyển đến target.
        """
        batch = hand_pos.shape[0]

        # Phase 1: Approach — tay tiến gần vật
        hand_to_obj = torch.norm(hand_pos - object_pos, dim=-1)
        r_approach = torch.exp(-5.0 * hand_to_obj)

        # Phase 2: Grasp — contact force đủ lớn + fingers wrap object
        contact_magnitude = torch.norm(contact_force, dim=-1)
        r_contact = torch.clamp(contact_magnitude / 10.0, 0, 1)  # Normalize to [0,1]

        # Finger closure metric: fingers gần nhau = gắp chặt hơn
        finger_spread = torch.std(finger_positions, dim=-1)
        r_closure = torch.exp(-3.0 * finger_spread)

        # Phase 3: Reach — object đến target
        obj_to_target = torch.norm(object_pos - target_pos, dim=-1)
        r_reach = torch.exp(-3.0 * obj_to_target) * object_in_hand

        # Bonus cho hoàn thành task (object tại target)
        success = (obj_to_target < 0.05).float() * object_in_hand
        r_success = success * 10.0

        # Penalties
        r_energy = -0.001 * contact_magnitude  # Tránh squeeze quá mạnh

        total = (
            0.2 * r_approach
            + 0.2 * r_contact
            + 0.1 * r_closure
            + 0.3 * r_reach
            + r_success
            + r_energy
        )

        return {
            "total": total,
            "approach": r_approach,
            "contact": r_contact,
            "closure": r_closure,
            "reach": r_reach,
            "success": success,
        }

    @staticmethod
    def box_lift(
        left_hand_pos: torch.Tensor,
        right_hand_pos: torch.Tensor,
        box_pos: torch.Tensor,
        box_quat: torch.Tensor,
        target_height: float,
        left_contact: torch.Tensor,
        right_contact: torch.Tensor,
    ) -> dict[str, torch.Tensor]:
        """
        Reward cho bimanual box lift.
        Hai tay phải phối hợp nâng hộp lên target height.
        """
        # Approach: cả hai tay gần box
        left_dist = torch.norm(left_hand_pos - box_pos, dim=-1)
        right_dist = torch.norm(right_hand_pos - box_pos, dim=-1)
        r_approach = torch.exp(-3.0 * (left_dist + right_dist))

        # Contact: cả hai tay chạm box
        left_contact_mag = torch.norm(left_contact, dim=-1)
        right_contact_mag = torch.norm(right_contact, dim=-1)
        r_bimanual_contact = (
            torch.clamp(left_contact_mag / 5.0, 0, 1)
            * torch.clamp(right_contact_mag / 5.0, 0, 1)
        )

        # Lift: box height tiến gần target
        height_error = torch.abs(box_pos[:, 2] - target_height)
        r_lift = torch.exp(-5.0 * height_error)

        # Orientation: giữ box thẳng (không nghiêng)
        # w component of quaternion — gần 1 = thẳng đứng
        r_orient = torch.abs(box_quat[:, 0])

        # Symmetry: hai tay đối xứng (lực cân bằng)
        force_diff = torch.abs(left_contact_mag - right_contact_mag)
        r_symmetry = torch.exp(-2.0 * force_diff)

        # Success
        success = (height_error < 0.03).float() * (r_bimanual_contact > 0.5).float()
        r_success = success * 10.0

        total = (
            0.15 * r_approach
            + 0.2 * r_bimanual_contact
            + 0.3 * r_lift
            + 0.1 * r_orient
            + 0.1 * r_symmetry
            + r_success
        )

        return {
            "total": total,
            "approach": r_approach,
            "contact": r_bimanual_contact,
            "lift": r_lift,
            "orient": r_orient,
            "symmetry": r_symmetry,
            "success": success,
        }

    @staticmethod
    def bimanual_handover(
        giver_hand_pos: torch.Tensor,
        receiver_hand_pos: torch.Tensor,
        object_pos: torch.Tensor,
        giver_contact: torch.Tensor,
        receiver_contact: torch.Tensor,
        phase: torch.Tensor,  # 0=giver_grasp, 1=transfer, 2=receiver_grasp
    ) -> dict[str, torch.Tensor]:
        """
        Reward cho bimanual handover.
        Tay A gắp vật → đưa cho tay B → tay B nhận.
        """
        batch = object_pos.shape[0]

        # Phase 0: Giver gắp vật
        giver_to_obj = torch.norm(giver_hand_pos - object_pos, dim=-1)
        giver_contact_mag = torch.norm(giver_contact, dim=-1)
        r_giver_grasp = (
            torch.exp(-5.0 * giver_to_obj)
            * torch.clamp(giver_contact_mag / 5.0, 0, 1)
        )

        # Phase 1: Di chuyển vật đến vùng transfer (giữa hai tay)
        midpoint = (giver_hand_pos + receiver_hand_pos) / 2
        obj_to_mid = torch.norm(object_pos - midpoint, dim=-1)
        r_transfer = torch.exp(-3.0 * obj_to_mid)

        # Phase 2: Receiver nhận vật (contact) + Giver nhả (no contact)
        receiver_contact_mag = torch.norm(receiver_contact, dim=-1)
        r_receiver_grasp = torch.clamp(receiver_contact_mag / 5.0, 0, 1)
        r_giver_release = torch.exp(-2.0 * giver_contact_mag)  # Reward for releasing

        # Phase-weighted rewards
        is_phase0 = (phase == 0).float()
        is_phase1 = (phase == 1).float()
        is_phase2 = (phase == 2).float()

        total = (
            is_phase0 * r_giver_grasp
            + is_phase1 * (0.5 * r_transfer + 0.3 * r_giver_grasp)
            + is_phase2 * (0.5 * r_receiver_grasp + 0.3 * r_giver_release)
        )

        # Success: receiver has object and giver released
        success = (
            (receiver_contact_mag > 3.0).float()
            * (giver_contact_mag < 0.5).float()
        )
        total = total + success * 10.0

        return {
            "total": total,
            "giver_grasp": r_giver_grasp,
            "transfer": r_transfer,
            "receiver_grasp": r_receiver_grasp,
            "success": success,
        }

3. Divide-and-Conquer: Specialist Training → Distillation

Specialist Training

Mỗi specialist là một policy riêng, train với reward function tương ứng:

import torch
import torch.nn as nn


class DexterousManipulationPolicy(nn.Module):
    """
    Policy cho dexterous manipulation.
    Observation space bao gồm: proprio + object state + tactile.
    """

    def __init__(
        self,
        proprio_dim: int = 60,     # joint pos (30) + joint vel (30)
        object_dim: int = 13,      # pos (3) + quat (4) + vel (3) + angular_vel (3)
        tactile_dim: int = 24,     # 12 fingertip sensors × 2 hands
        action_dim: int = 30,      # tất cả upper-body + hand joints
        hidden_dim: int = 512,
    ):
        super().__init__()
        total_obs = proprio_dim + object_dim + tactile_dim  # 97

        # Actor
        self.actor = nn.Sequential(
            nn.Linear(total_obs, hidden_dim),
            nn.ELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ELU(),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ELU(),
            nn.Linear(hidden_dim // 2, action_dim),
            nn.Tanh(),  # Actions trong [-1, 1]
        )

        # Critic
        self.critic = nn.Sequential(
            nn.Linear(total_obs, hidden_dim),
            nn.ELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ELU(),
            nn.Linear(hidden_dim, 1),
        )

    def forward(self, obs: torch.Tensor):
        return self.actor(obs), self.critic(obs)

    def get_action(self, obs: torch.Tensor, deterministic: bool = False):
        mean = self.actor(obs)
        if deterministic:
            return mean
        # Add exploration noise
        noise = torch.randn_like(mean) * 0.1
        return (mean + noise).clamp(-1, 1)


def train_specialist(
    task_name: str,
    env_config: dict,
    reward_fn,
    num_timesteps: int = 50_000_000,
):
    """
    Train specialist policy cho 1 task.
    """
    device = torch.device("cuda")

    # Create environment cho task cụ thể
    env = create_manipulation_env(task_name, env_config)

    policy = DexterousManipulationPolicy(
        proprio_dim=env.proprio_dim,
        object_dim=env.object_dim,
        tactile_dim=env.tactile_dim,
        action_dim=env.action_dim,
    ).to(device)

    optimizer = torch.optim.Adam(policy.parameters(), lr=3e-4)

    total_steps = 0
    obs = env.reset()

    while total_steps < num_timesteps:
        # Collect rollout
        actions = policy.get_action(obs)
        next_obs, info = env.step(actions)

        # Compute reward
        reward_dict = reward_fn(**info)
        reward = reward_dict["total"]

        # PPO update (simplified — actual implementation uses GAE)
        _, values = policy(obs)
        _, next_values = policy(next_obs)

        advantage = reward + 0.99 * next_values.squeeze() - values.squeeze()
        policy_loss = -(advantage.detach() * policy.get_action(obs)).mean()
        value_loss = advantage.pow(2).mean()

        loss = policy_loss + 0.5 * value_loss

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(policy.parameters(), 1.0)
        optimizer.step()

        obs = next_obs
        total_steps += env.num_envs

        if total_steps % 1_000_000 == 0:
            success_rate = reward_dict["success"].mean().item()
            print(
                f"[{task_name}] Steps: {total_steps:>10d} | "
                f"Success: {success_rate:.1%} | "
                f"Reward: {reward.mean().item():.3f}"
            )

    return policy


# Train 3 specialists
grasp_specialist = train_specialist(
    "grasp_and_reach",
    env_config={"object_type": "diverse", "randomize": True},
    reward_fn=ManipulationRewards.grasp_and_reach,
)

lift_specialist = train_specialist(
    "box_lift",
    env_config={"box_sizes": [(0.1, 0.1, 0.1), (0.15, 0.15, 0.15)], "randomize": True},
    reward_fn=ManipulationRewards.box_lift,
)

handover_specialist = train_specialist(
    "bimanual_handover",
    env_config={"object_type": "diverse", "randomize": True},
    reward_fn=ManipulationRewards.bimanual_handover,
)

Quá trình training robot manipulation trong simulation

Policy Distillation: Specialists → Generalist

import torch
import torch.nn as nn
import torch.nn.functional as F


class GeneralistPolicy(nn.Module):
    """
    Generalist policy — distilled từ multiple specialists.
    Thêm task embedding để phân biệt tasks.
    """

    def __init__(
        self,
        obs_dim: int = 97,
        action_dim: int = 30,
        num_tasks: int = 3,
        task_embed_dim: int = 16,
        hidden_dim: int = 512,
    ):
        super().__init__()
        self.task_embedding = nn.Embedding(num_tasks, task_embed_dim)
        total_input = obs_dim + task_embed_dim

        self.actor = nn.Sequential(
            nn.Linear(total_input, hidden_dim),
            nn.ELU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ELU(),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ELU(),
            nn.Linear(hidden_dim // 2, action_dim),
            nn.Tanh(),
        )

    def forward(self, obs: torch.Tensor, task_id: torch.Tensor):
        task_embed = self.task_embedding(task_id)
        x = torch.cat([obs, task_embed], dim=-1)
        return self.actor(x)


def distill_specialists(
    specialists: list[nn.Module],
    task_envs: list,
    num_distill_steps: int = 10_000_000,
    temperature: float = 1.0,
):
    """
    Distill multiple specialist policies vào 1 generalist.

    Args:
        specialists: list of trained specialist policies
        task_envs: list of environments (1 per task)
        temperature: distillation temperature
    """
    device = torch.device("cuda")
    num_tasks = len(specialists)

    # Freeze specialists
    for spec in specialists:
        spec.eval()
        for p in spec.parameters():
            p.requires_grad = False

    # Create generalist
    generalist = GeneralistPolicy(
        obs_dim=task_envs[0].obs_dim,
        action_dim=task_envs[0].action_dim,
        num_tasks=num_tasks,
    ).to(device)

    optimizer = torch.optim.Adam(generalist.parameters(), lr=1e-4)

    total_steps = 0
    envs_obs = [env.reset() for env in task_envs]

    while total_steps < num_distill_steps:
        total_loss = 0.0

        for task_id, (specialist, env, obs) in enumerate(
            zip(specialists, task_envs, envs_obs)
        ):
            # Get specialist action (teacher)
            with torch.no_grad():
                teacher_action = specialist.get_action(obs, deterministic=True)

            # Get generalist action (student)
            task_ids = torch.full(
                (obs.shape[0],), task_id, dtype=torch.long, device=device
            )
            student_action = generalist(obs, task_ids)

            # Distillation loss: MSE between teacher and student actions
            distill_loss = F.mse_loss(student_action, teacher_action)

            # Also step environment for fresh data
            next_obs, _ = env.step(teacher_action)
            envs_obs[task_id] = next_obs

            total_loss += distill_loss

        # Update generalist
        optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(generalist.parameters(), 1.0)
        optimizer.step()

        total_steps += sum(env.num_envs for env in task_envs)

        if total_steps % 500_000 == 0:
            print(
                f"[Distill] Steps: {total_steps:>10d} | "
                f"Loss: {total_loss.item():.6f}"
            )

    return generalist

4. Domain Randomization Config

DOMAIN_RANDOMIZATION = {
    # Physics parameters
    "physics": {
        "friction": {
            "range": [0.3, 2.0],
            "operation": "scaling",  # Nhân với giá trị default
        },
        "restitution": {
            "range": [0.0, 0.3],
            "operation": "absolute",
        },
        "mass": {
            "range": [0.85, 1.15],
            "operation": "scaling",
        },
        "com_displacement": {
            "range": [-0.03, 0.03],  # meters
            "operation": "additive",
            "per_axis": True,
        },
    },

    # Actuator parameters
    "actuator": {
        "kp": {"range": [0.7, 1.3], "operation": "scaling"},
        "kd": {"range": [0.7, 1.3], "operation": "scaling"},
        "torque_limit": {"range": [0.85, 1.0], "operation": "scaling"},
        "action_delay": {"range": [0, 3], "operation": "absolute", "unit": "steps"},
    },

    # Sensor noise
    "sensor": {
        "joint_pos_noise": {"std": 0.01, "unit": "rad"},
        "joint_vel_noise": {"std": 0.1, "unit": "rad/s"},
        "imu_quat_noise": {"std": 0.005},
        "imu_gyro_noise": {"std": 0.02, "unit": "rad/s"},
    },

    # Object properties (cho manipulation)
    "object": {
        "mass": {"range": [0.1, 2.0], "unit": "kg"},
        "size_scale": {"range": [0.8, 1.2]},
        "friction": {"range": [0.3, 1.5]},
        "initial_pos_noise": {"std": 0.02, "unit": "m"},
        "initial_rot_noise": {"std": 0.1, "unit": "rad"},
    },

    # Visual randomization (cho camera-based policies)
    "visual": {
        "light_intensity": {"range": [0.5, 2.0]},
        "light_direction": {"range": [-30, 30], "unit": "deg"},
        "texture_randomize": True,
        "camera_fov_noise": {"std": 2.0, "unit": "deg"},
    },
}

5. Hybrid Object Representation

Paper dùng hybrid representation kết hợp nhiều modality cho object state:

import torch
import torch.nn as nn


class HybridObjectEncoder(nn.Module):
    """
    Hybrid object representation với modality-specific augmentation.
    Kết hợp: proprioceptive + tactile + visual features.
    """

    def __init__(
        self,
        proprio_dim: int = 13,   # object pos (3) + quat (4) + vel (6)
        tactile_dim: int = 24,   # fingertip sensors
        visual_dim: int = 64,    # CNN features (nếu dùng camera)
        output_dim: int = 32,
        use_visual: bool = False,
    ):
        super().__init__()
        self.use_visual = use_visual

        # Proprioceptive encoder
        self.proprio_encoder = nn.Sequential(
            nn.Linear(proprio_dim, 64),
            nn.LayerNorm(64),
            nn.ELU(),
            nn.Linear(64, 32),
        )

        # Tactile encoder
        self.tactile_encoder = nn.Sequential(
            nn.Linear(tactile_dim, 64),
            nn.LayerNorm(64),
            nn.ELU(),
            nn.Linear(64, 32),
        )

        # Visual encoder (optional)
        if use_visual:
            self.visual_encoder = nn.Sequential(
                nn.Linear(visual_dim, 64),
                nn.LayerNorm(64),
                nn.ELU(),
                nn.Linear(64, 32),
            )
            fusion_dim = 32 * 3
        else:
            fusion_dim = 32 * 2

        # Fusion layer
        self.fusion = nn.Sequential(
            nn.Linear(fusion_dim, 64),
            nn.ELU(),
            nn.Linear(64, output_dim),
        )

        # Augmentation parameters
        self.proprio_noise_std = 0.01
        self.tactile_noise_std = 0.05
        self.dropout = nn.Dropout(0.1)

    def forward(
        self,
        proprio: torch.Tensor,
        tactile: torch.Tensor,
        visual: torch.Tensor | None = None,
        augment: bool = False,
    ) -> torch.Tensor:
        """
        Encode object state từ multiple modalities.

        Args:
            proprio: (batch, proprio_dim) — object pose + velocity
            tactile: (batch, tactile_dim) — fingertip sensor readings
            visual: (batch, visual_dim) — CNN features (optional)
            augment: whether to apply modality-specific augmentation
        """
        # Modality-specific augmentation
        if augment and self.training:
            proprio = proprio + torch.randn_like(proprio) * self.proprio_noise_std
            tactile = tactile + torch.randn_like(tactile) * self.tactile_noise_std
            # Random modality dropout: sometimes zero out one modality
            if torch.rand(1).item() < 0.1:
                tactile = torch.zeros_like(tactile)

        # Encode each modality
        proprio_feat = self.proprio_encoder(proprio)
        tactile_feat = self.tactile_encoder(tactile)

        if self.use_visual and visual is not None:
            visual_feat = self.visual_encoder(visual)
            fused = torch.cat([proprio_feat, tactile_feat, visual_feat], dim=-1)
        else:
            fused = torch.cat([proprio_feat, tactile_feat], dim=-1)

        # Fusion
        fused = self.dropout(fused) if augment else fused
        output = self.fusion(fused)

        return output

6. Deployment Script hoàn chỉnh

import torch
import time
import numpy as np


class Sim2RealDeployer:
    """
    Deploy trained policy lên robot thật.
    Handles: observation processing, action filtering, safety checks.
    """

    def __init__(
        self,
        policy_path: str,
        robot_interface,
        control_freq: float = 30.0,
        action_smoothing: float = 0.3,  # EMA smoothing factor
    ):
        self.device = torch.device("cpu")
        self.dt = 1.0 / control_freq
        self.alpha = action_smoothing

        # Load policy
        ckpt = torch.load(policy_path, map_location=self.device)
        self.policy = GeneralistPolicy(
            obs_dim=ckpt["obs_dim"],
            action_dim=ckpt["action_dim"],
        ).to(self.device)
        self.policy.load_state_dict(ckpt["policy"])
        self.policy.eval()

        # Robot interface
        self.robot = robot_interface
        self.prev_action = None

        # Safety limits
        self.max_joint_delta = 0.1  # rad per step
        self.torque_limit = 50.0    # Nm

        # Observation normalization (from training)
        self.obs_mean = ckpt.get("obs_mean", torch.zeros(ckpt["obs_dim"]))
        self.obs_std = ckpt.get("obs_std", torch.ones(ckpt["obs_dim"]))

    def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:
        """Normalize observation using training statistics."""
        return (obs - self.obs_mean) / (self.obs_std + 1e-8)

    def _smooth_action(self, action: torch.Tensor) -> torch.Tensor:
        """EMA smoothing to prevent jerky motion."""
        if self.prev_action is None:
            self.prev_action = action
            return action
        smoothed = self.alpha * action + (1 - self.alpha) * self.prev_action
        self.prev_action = smoothed
        return smoothed

    def _safety_check(self, action: torch.Tensor, current_pos: torch.Tensor) -> torch.Tensor:
        """Clamp action to safety limits."""
        delta = action - current_pos
        delta = delta.clamp(-self.max_joint_delta, self.max_joint_delta)
        return current_pos + delta

    @torch.no_grad()
    def run(self, task_id: int, duration: float = 30.0):
        """
        Run policy trên robot thật.

        Args:
            task_id: 0=grasp, 1=lift, 2=handover
            duration: seconds to run
        """
        print(f"[Deploy] Starting task {task_id} for {duration}s")
        print("[Deploy] Press Ctrl+C to stop")

        task_tensor = torch.tensor([task_id], dtype=torch.long)
        start_time = time.time()
        step_count = 0

        try:
            while time.time() - start_time < duration:
                loop_start = time.time()

                # Read robot state
                state = self.robot.get_state()
                obs = self._build_observation(state)
                obs = self._normalize_obs(obs)

                # Get action
                action = self.policy(obs.unsqueeze(0), task_tensor).squeeze(0)

                # Post-process
                action = self._smooth_action(action)
                action = self._safety_check(action, state["joint_pos"])

                # Send to robot
                self.robot.set_joint_targets(action.numpy())

                step_count += 1
                if step_count % 100 == 0:
                    elapsed = time.time() - start_time
                    print(
                        f"[Deploy] Step {step_count} | "
                        f"Time: {elapsed:.1f}s | "
                        f"Action norm: {action.norm():.3f}"
                    )

                # Maintain control frequency
                elapsed_step = time.time() - loop_start
                if elapsed_step < self.dt:
                    time.sleep(self.dt - elapsed_step)

        except KeyboardInterrupt:
            print("[Deploy] Stopped by user")
        finally:
            # Safe shutdown: move to default pose slowly
            print("[Deploy] Moving to safe pose...")
            self.robot.move_to_default(duration=3.0)
            print("[Deploy] Done")

    def _build_observation(self, state: dict) -> torch.Tensor:
        """Build observation tensor from robot state dict."""
        return torch.cat([
            state["joint_pos"],
            state["joint_vel"],
            state["object_pos"],
            state["object_quat"],
            state["object_vel"],
            state["object_angular_vel"],
            state["tactile_left"],
            state["tactile_right"],
        ])

Kết quả thực nghiệm

Task Sim Success (%) Real Success (%) Sim-to-Real Gap
Grasp-and-Reach 92.3 78.5 13.8%
Box Lift 87.1 71.2 15.9%
Bimanual Handover 83.6 64.8 18.8%

Nhận xét:

Bài học quan trọng

1. Real-to-Sim > Domain Randomization

Domain randomization là necessary but not sufficient. Nếu simulation quá khác thực tế, dù randomize cỡ nào policy cũng không transfer tốt. Real-to-sim tuning đưa simulation gần thực tế trước, rồi randomize quanh đó.

2. Divide-and-Conquer giảm thời gian training 3-5x

Train 1 generalist trực tiếp mất 200M+ steps. Train 3 specialists (50M mỗi cái) + distill (10M) = 160M total, và kết quả tốt hơn vì mỗi specialist optimize cho 1 objective rõ ràng.

3. Action smoothing là bắt buộc cho hardware

Trong sim, action có thể thay đổi đột ngột mỗi step. Trên hardware, điều này tạo jerk gây hại cho motor và giảm tuổi thọ. EMA smoothing (alpha=0.3) là trick đơn giản nhưng critical cho deploy.

Đọc thêm

Bài viết liên quan

Tài liệu tham khảo

Bài viết liên quan

TutorialSim-to-Real cho Humanoid: Deployment Best Practices
sim2realhumanoiddeploymentrlunitreePhần 10

Sim-to-Real cho Humanoid: Deployment Best Practices

Pipeline hoàn chỉnh deploy RL locomotion policy lên robot humanoid thật — domain randomization, system identification, safety, và Unitree SDK.

9/4/202611 phút đọc
Nghiên cứuUnifoLM-VLA-0: Mô hình VLA cho Manipulation trên Unitree G1
vlaunitreeg1manipulationhumanoid

UnifoLM-VLA-0: Mô hình VLA cho Manipulation trên Unitree G1

Phân tích và hướng dẫn triển khai UnifoLM-VLA-0 — mô hình VLA open-source đầu tiên chạy trực tiếp trên G1 humanoid

8/4/202623 phút đọc
TutorialSim-to-Real Transfer: Deploy VLA Policy lên Robot thật
lerobotsim2realdeploymentvlaPhần 10

Sim-to-Real Transfer: Deploy VLA Policy lên Robot thật

Pipeline hoàn chỉnh từ simulation đến real robot — domain randomization, camera calibration, inference optimization và ROS 2 deployment.

8/4/20269 phút đọc