Từ Simulation đến Robot Thật: Bài toán khó nhất của RL
Sim-to-real transfer luôn là nút thắt cổ chai của robot learning. Bạn có thể train một policy hoàn hảo trong simulation — 99% success rate, chuyển động mượt mà, gắp mọi thứ — nhưng khi đặt lên robot thật, nó fail thảm hại. Tại sao? Vì simulation không bao giờ chính xác 100% so với thế giới thực: ma sát khác, quán tính khác, delay khác, sensor noise khác.
Paper Sim-to-Real Reinforcement Learning for Humanoid Dexterous Manipulation (Feb 2025) đưa ra một recipe thực tế cho sim-to-real humanoid manipulation, giải quyết 3 tasks cụ thể: grasp-and-reach, box lift, và bimanual handover.
Điểm đặc biệt: paper không chỉ dùng domain randomization "standard" — mà có automated real-to-sim tuning module để tự động điều chỉnh simulation cho khớp với robot thật.
Tổng quan Pipeline
Pipeline gồm 4 giai đoạn chính:
- Real-to-Sim Tuning: Tự động điều chỉnh simulation parameters cho khớp robot thật
- Specialist Training: Train policy chuyên biệt cho từng task (grasp, lift, handover)
- Policy Distillation: Gom specialists thành 1 generalist policy
- Sim-to-Real Deployment: Domain randomization + transfer lên hardware
Tại sao Divide-and-Conquer?
Train trực tiếp 1 policy cho tất cả tasks rất khó vì:
- Reward conflict: reward cho grasp có thể conflict với reward cho handover
- Exploration difficulty: policy phải khám phá strategy cho cả 3 tasks cùng lúc
- Training instability: gradient từ nhiều tasks có thể triệt tiêu nhau
Giải pháp: train specialist riêng cho từng task → distill vào 1 generalist. Specialist dễ train hơn, generalist inherit kiến thức từ tất cả specialists.
1. Real-to-Sim Parameter Identification
Automated tuning module so sánh trajectory thật với trajectory simulation, tối ưu hóa physics parameters:
import torch
import numpy as np
from scipy.optimize import minimize
class RealToSimTuner:
"""
Automated real-to-sim parameter identification.
Thu thập trajectory từ robot thật → tối ưu simulation parameters
để simulation reproduce trajectory đó.
"""
# Parameters cần tune
PARAM_NAMES = [
"joint_friction", # Ma sát khớp (Nm)
"joint_damping", # Damping (Nm·s/rad)
"joint_armature", # Rotor inertia reflected (kg·m²)
"contact_friction", # Ma sát tiếp xúc vật thể
"contact_restitution", # Hệ số nảy
"actuator_kp", # PD gains
"actuator_kd",
"com_offset_x", # Sai lệch trọng tâm (m)
"com_offset_y",
"com_offset_z",
"mass_scale", # Tỷ lệ khối lượng thực/mô hình
]
def __init__(
self,
sim_env,
real_trajectories: list[dict],
num_iterations: int = 200,
):
"""
Args:
sim_env: Isaac Lab environment instance
real_trajectories: list of dicts with keys:
- joint_pos: (T, num_joints) — measured positions
- joint_vel: (T, num_joints) — measured velocities
- joint_torque: (T, num_joints) — commanded torques
- timestamps: (T,) — time stamps
"""
self.sim_env = sim_env
self.real_trajs = real_trajectories
self.num_iterations = num_iterations
# Initial parameter guess (from URDF defaults)
self.initial_params = self._get_default_params()
def _get_default_params(self) -> np.ndarray:
"""Load default parameters from URDF/simulation."""
return np.array([
0.05, # joint_friction (Nm)
0.5, # joint_damping (Nm·s/rad)
0.01, # joint_armature (kg·m²)
0.8, # contact_friction
0.0, # contact_restitution
200.0, # actuator_kp
20.0, # actuator_kd
0.0, # com_offset_x
0.0, # com_offset_y
0.0, # com_offset_z
1.0, # mass_scale
])
def _simulate_trajectory(
self, params: np.ndarray, real_traj: dict
) -> dict:
"""
Replay torque commands trong simulation với params cho trước.
Returns sim trajectory.
"""
# Apply parameters to simulation
self.sim_env.set_physics_params({
"joint_friction": params[0],
"joint_damping": params[1],
"joint_armature": params[2],
"contact_friction": params[3],
"contact_restitution": params[4],
"actuator_kp": params[5],
"actuator_kd": params[6],
"com_offset": params[7:10],
"mass_scale": params[10],
})
# Reset simulation to initial state from real trajectory
self.sim_env.reset()
self.sim_env.set_joint_state(
real_traj["joint_pos"][0],
real_traj["joint_vel"][0],
)
# Replay torque commands
sim_positions = []
sim_velocities = []
for t in range(len(real_traj["joint_torque"])):
self.sim_env.apply_torque(real_traj["joint_torque"][t])
self.sim_env.step()
sim_positions.append(self.sim_env.get_joint_pos())
sim_velocities.append(self.sim_env.get_joint_vel())
return {
"joint_pos": np.stack(sim_positions),
"joint_vel": np.stack(sim_velocities),
}
def _objective(self, params: np.ndarray) -> float:
"""
Objective function: tổng MSE giữa sim và real trajectories.
"""
total_error = 0.0
for real_traj in self.real_trajs:
sim_traj = self._simulate_trajectory(params, real_traj)
# Position error (weighted higher)
pos_error = np.mean(
(sim_traj["joint_pos"] - real_traj["joint_pos"]) ** 2
)
# Velocity error
vel_error = np.mean(
(sim_traj["joint_vel"] - real_traj["joint_vel"]) ** 2
)
total_error += 10.0 * pos_error + 1.0 * vel_error
return total_error / len(self.real_trajs)
def tune(self) -> dict:
"""
Chạy optimization để tìm sim parameters tốt nhất.
Returns: dict of tuned parameters.
"""
# Parameter bounds (physical constraints)
bounds = [
(0.001, 1.0), # joint_friction
(0.01, 5.0), # joint_damping
(0.001, 0.1), # joint_armature
(0.1, 2.0), # contact_friction
(0.0, 0.5), # contact_restitution
(50.0, 500.0), # actuator_kp
(5.0, 100.0), # actuator_kd
(-0.05, 0.05), # com_offset_x
(-0.05, 0.05), # com_offset_y
(-0.05, 0.05), # com_offset_z
(0.8, 1.2), # mass_scale
]
result = minimize(
self._objective,
self.initial_params,
method="L-BFGS-B",
bounds=bounds,
options={"maxiter": self.num_iterations, "disp": True},
)
tuned_params = result.x
print(f"[Real-to-Sim] Final error: {result.fun:.6f}")
print(f"[Real-to-Sim] Tuned params:")
for name, val in zip(self.PARAM_NAMES, tuned_params):
print(f" {name}: {val:.4f}")
return dict(zip(self.PARAM_NAMES, tuned_params))
2. Reward Function Design cho 3 Tasks
Mỗi task có reward function riêng, nhưng theo generalized formulation dựa trên contact và object goals:
import torch
class ManipulationRewards:
"""
Generalized reward formulation cho dexterous manipulation.
Dựa trên contact goals và object goals.
"""
@staticmethod
def grasp_and_reach(
hand_pos: torch.Tensor,
object_pos: torch.Tensor,
target_pos: torch.Tensor,
contact_force: torch.Tensor,
finger_positions: torch.Tensor,
object_in_hand: torch.Tensor,
) -> dict[str, torch.Tensor]:
"""
Reward cho task grasp-and-reach.
Phase 1: Tiến gần vật → Phase 2: Gắp → Phase 3: Di chuyển đến target.
"""
batch = hand_pos.shape[0]
# Phase 1: Approach — tay tiến gần vật
hand_to_obj = torch.norm(hand_pos - object_pos, dim=-1)
r_approach = torch.exp(-5.0 * hand_to_obj)
# Phase 2: Grasp — contact force đủ lớn + fingers wrap object
contact_magnitude = torch.norm(contact_force, dim=-1)
r_contact = torch.clamp(contact_magnitude / 10.0, 0, 1) # Normalize to [0,1]
# Finger closure metric: fingers gần nhau = gắp chặt hơn
finger_spread = torch.std(finger_positions, dim=-1)
r_closure = torch.exp(-3.0 * finger_spread)
# Phase 3: Reach — object đến target
obj_to_target = torch.norm(object_pos - target_pos, dim=-1)
r_reach = torch.exp(-3.0 * obj_to_target) * object_in_hand
# Bonus cho hoàn thành task (object tại target)
success = (obj_to_target < 0.05).float() * object_in_hand
r_success = success * 10.0
# Penalties
r_energy = -0.001 * contact_magnitude # Tránh squeeze quá mạnh
total = (
0.2 * r_approach
+ 0.2 * r_contact
+ 0.1 * r_closure
+ 0.3 * r_reach
+ r_success
+ r_energy
)
return {
"total": total,
"approach": r_approach,
"contact": r_contact,
"closure": r_closure,
"reach": r_reach,
"success": success,
}
@staticmethod
def box_lift(
left_hand_pos: torch.Tensor,
right_hand_pos: torch.Tensor,
box_pos: torch.Tensor,
box_quat: torch.Tensor,
target_height: float,
left_contact: torch.Tensor,
right_contact: torch.Tensor,
) -> dict[str, torch.Tensor]:
"""
Reward cho bimanual box lift.
Hai tay phải phối hợp nâng hộp lên target height.
"""
# Approach: cả hai tay gần box
left_dist = torch.norm(left_hand_pos - box_pos, dim=-1)
right_dist = torch.norm(right_hand_pos - box_pos, dim=-1)
r_approach = torch.exp(-3.0 * (left_dist + right_dist))
# Contact: cả hai tay chạm box
left_contact_mag = torch.norm(left_contact, dim=-1)
right_contact_mag = torch.norm(right_contact, dim=-1)
r_bimanual_contact = (
torch.clamp(left_contact_mag / 5.0, 0, 1)
* torch.clamp(right_contact_mag / 5.0, 0, 1)
)
# Lift: box height tiến gần target
height_error = torch.abs(box_pos[:, 2] - target_height)
r_lift = torch.exp(-5.0 * height_error)
# Orientation: giữ box thẳng (không nghiêng)
# w component of quaternion — gần 1 = thẳng đứng
r_orient = torch.abs(box_quat[:, 0])
# Symmetry: hai tay đối xứng (lực cân bằng)
force_diff = torch.abs(left_contact_mag - right_contact_mag)
r_symmetry = torch.exp(-2.0 * force_diff)
# Success
success = (height_error < 0.03).float() * (r_bimanual_contact > 0.5).float()
r_success = success * 10.0
total = (
0.15 * r_approach
+ 0.2 * r_bimanual_contact
+ 0.3 * r_lift
+ 0.1 * r_orient
+ 0.1 * r_symmetry
+ r_success
)
return {
"total": total,
"approach": r_approach,
"contact": r_bimanual_contact,
"lift": r_lift,
"orient": r_orient,
"symmetry": r_symmetry,
"success": success,
}
@staticmethod
def bimanual_handover(
giver_hand_pos: torch.Tensor,
receiver_hand_pos: torch.Tensor,
object_pos: torch.Tensor,
giver_contact: torch.Tensor,
receiver_contact: torch.Tensor,
phase: torch.Tensor, # 0=giver_grasp, 1=transfer, 2=receiver_grasp
) -> dict[str, torch.Tensor]:
"""
Reward cho bimanual handover.
Tay A gắp vật → đưa cho tay B → tay B nhận.
"""
batch = object_pos.shape[0]
# Phase 0: Giver gắp vật
giver_to_obj = torch.norm(giver_hand_pos - object_pos, dim=-1)
giver_contact_mag = torch.norm(giver_contact, dim=-1)
r_giver_grasp = (
torch.exp(-5.0 * giver_to_obj)
* torch.clamp(giver_contact_mag / 5.0, 0, 1)
)
# Phase 1: Di chuyển vật đến vùng transfer (giữa hai tay)
midpoint = (giver_hand_pos + receiver_hand_pos) / 2
obj_to_mid = torch.norm(object_pos - midpoint, dim=-1)
r_transfer = torch.exp(-3.0 * obj_to_mid)
# Phase 2: Receiver nhận vật (contact) + Giver nhả (no contact)
receiver_contact_mag = torch.norm(receiver_contact, dim=-1)
r_receiver_grasp = torch.clamp(receiver_contact_mag / 5.0, 0, 1)
r_giver_release = torch.exp(-2.0 * giver_contact_mag) # Reward for releasing
# Phase-weighted rewards
is_phase0 = (phase == 0).float()
is_phase1 = (phase == 1).float()
is_phase2 = (phase == 2).float()
total = (
is_phase0 * r_giver_grasp
+ is_phase1 * (0.5 * r_transfer + 0.3 * r_giver_grasp)
+ is_phase2 * (0.5 * r_receiver_grasp + 0.3 * r_giver_release)
)
# Success: receiver has object and giver released
success = (
(receiver_contact_mag > 3.0).float()
* (giver_contact_mag < 0.5).float()
)
total = total + success * 10.0
return {
"total": total,
"giver_grasp": r_giver_grasp,
"transfer": r_transfer,
"receiver_grasp": r_receiver_grasp,
"success": success,
}
3. Divide-and-Conquer: Specialist Training → Distillation
Specialist Training
Mỗi specialist là một policy riêng, train với reward function tương ứng:
import torch
import torch.nn as nn
class DexterousManipulationPolicy(nn.Module):
"""
Policy cho dexterous manipulation.
Observation space bao gồm: proprio + object state + tactile.
"""
def __init__(
self,
proprio_dim: int = 60, # joint pos (30) + joint vel (30)
object_dim: int = 13, # pos (3) + quat (4) + vel (3) + angular_vel (3)
tactile_dim: int = 24, # 12 fingertip sensors × 2 hands
action_dim: int = 30, # tất cả upper-body + hand joints
hidden_dim: int = 512,
):
super().__init__()
total_obs = proprio_dim + object_dim + tactile_dim # 97
# Actor
self.actor = nn.Sequential(
nn.Linear(total_obs, hidden_dim),
nn.ELU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ELU(),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ELU(),
nn.Linear(hidden_dim // 2, action_dim),
nn.Tanh(), # Actions trong [-1, 1]
)
# Critic
self.critic = nn.Sequential(
nn.Linear(total_obs, hidden_dim),
nn.ELU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ELU(),
nn.Linear(hidden_dim, 1),
)
def forward(self, obs: torch.Tensor):
return self.actor(obs), self.critic(obs)
def get_action(self, obs: torch.Tensor, deterministic: bool = False):
mean = self.actor(obs)
if deterministic:
return mean
# Add exploration noise
noise = torch.randn_like(mean) * 0.1
return (mean + noise).clamp(-1, 1)
def train_specialist(
task_name: str,
env_config: dict,
reward_fn,
num_timesteps: int = 50_000_000,
):
"""
Train specialist policy cho 1 task.
"""
device = torch.device("cuda")
# Create environment cho task cụ thể
env = create_manipulation_env(task_name, env_config)
policy = DexterousManipulationPolicy(
proprio_dim=env.proprio_dim,
object_dim=env.object_dim,
tactile_dim=env.tactile_dim,
action_dim=env.action_dim,
).to(device)
optimizer = torch.optim.Adam(policy.parameters(), lr=3e-4)
total_steps = 0
obs = env.reset()
while total_steps < num_timesteps:
# Collect rollout
actions = policy.get_action(obs)
next_obs, info = env.step(actions)
# Compute reward
reward_dict = reward_fn(**info)
reward = reward_dict["total"]
# PPO update (simplified — actual implementation uses GAE)
_, values = policy(obs)
_, next_values = policy(next_obs)
advantage = reward + 0.99 * next_values.squeeze() - values.squeeze()
policy_loss = -(advantage.detach() * policy.get_action(obs)).mean()
value_loss = advantage.pow(2).mean()
loss = policy_loss + 0.5 * value_loss
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(policy.parameters(), 1.0)
optimizer.step()
obs = next_obs
total_steps += env.num_envs
if total_steps % 1_000_000 == 0:
success_rate = reward_dict["success"].mean().item()
print(
f"[{task_name}] Steps: {total_steps:>10d} | "
f"Success: {success_rate:.1%} | "
f"Reward: {reward.mean().item():.3f}"
)
return policy
# Train 3 specialists
grasp_specialist = train_specialist(
"grasp_and_reach",
env_config={"object_type": "diverse", "randomize": True},
reward_fn=ManipulationRewards.grasp_and_reach,
)
lift_specialist = train_specialist(
"box_lift",
env_config={"box_sizes": [(0.1, 0.1, 0.1), (0.15, 0.15, 0.15)], "randomize": True},
reward_fn=ManipulationRewards.box_lift,
)
handover_specialist = train_specialist(
"bimanual_handover",
env_config={"object_type": "diverse", "randomize": True},
reward_fn=ManipulationRewards.bimanual_handover,
)
Policy Distillation: Specialists → Generalist
import torch
import torch.nn as nn
import torch.nn.functional as F
class GeneralistPolicy(nn.Module):
"""
Generalist policy — distilled từ multiple specialists.
Thêm task embedding để phân biệt tasks.
"""
def __init__(
self,
obs_dim: int = 97,
action_dim: int = 30,
num_tasks: int = 3,
task_embed_dim: int = 16,
hidden_dim: int = 512,
):
super().__init__()
self.task_embedding = nn.Embedding(num_tasks, task_embed_dim)
total_input = obs_dim + task_embed_dim
self.actor = nn.Sequential(
nn.Linear(total_input, hidden_dim),
nn.ELU(),
nn.Linear(hidden_dim, hidden_dim),
nn.ELU(),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ELU(),
nn.Linear(hidden_dim // 2, action_dim),
nn.Tanh(),
)
def forward(self, obs: torch.Tensor, task_id: torch.Tensor):
task_embed = self.task_embedding(task_id)
x = torch.cat([obs, task_embed], dim=-1)
return self.actor(x)
def distill_specialists(
specialists: list[nn.Module],
task_envs: list,
num_distill_steps: int = 10_000_000,
temperature: float = 1.0,
):
"""
Distill multiple specialist policies vào 1 generalist.
Args:
specialists: list of trained specialist policies
task_envs: list of environments (1 per task)
temperature: distillation temperature
"""
device = torch.device("cuda")
num_tasks = len(specialists)
# Freeze specialists
for spec in specialists:
spec.eval()
for p in spec.parameters():
p.requires_grad = False
# Create generalist
generalist = GeneralistPolicy(
obs_dim=task_envs[0].obs_dim,
action_dim=task_envs[0].action_dim,
num_tasks=num_tasks,
).to(device)
optimizer = torch.optim.Adam(generalist.parameters(), lr=1e-4)
total_steps = 0
envs_obs = [env.reset() for env in task_envs]
while total_steps < num_distill_steps:
total_loss = 0.0
for task_id, (specialist, env, obs) in enumerate(
zip(specialists, task_envs, envs_obs)
):
# Get specialist action (teacher)
with torch.no_grad():
teacher_action = specialist.get_action(obs, deterministic=True)
# Get generalist action (student)
task_ids = torch.full(
(obs.shape[0],), task_id, dtype=torch.long, device=device
)
student_action = generalist(obs, task_ids)
# Distillation loss: MSE between teacher and student actions
distill_loss = F.mse_loss(student_action, teacher_action)
# Also step environment for fresh data
next_obs, _ = env.step(teacher_action)
envs_obs[task_id] = next_obs
total_loss += distill_loss
# Update generalist
optimizer.zero_grad()
total_loss.backward()
torch.nn.utils.clip_grad_norm_(generalist.parameters(), 1.0)
optimizer.step()
total_steps += sum(env.num_envs for env in task_envs)
if total_steps % 500_000 == 0:
print(
f"[Distill] Steps: {total_steps:>10d} | "
f"Loss: {total_loss.item():.6f}"
)
return generalist
4. Domain Randomization Config
DOMAIN_RANDOMIZATION = {
# Physics parameters
"physics": {
"friction": {
"range": [0.3, 2.0],
"operation": "scaling", # Nhân với giá trị default
},
"restitution": {
"range": [0.0, 0.3],
"operation": "absolute",
},
"mass": {
"range": [0.85, 1.15],
"operation": "scaling",
},
"com_displacement": {
"range": [-0.03, 0.03], # meters
"operation": "additive",
"per_axis": True,
},
},
# Actuator parameters
"actuator": {
"kp": {"range": [0.7, 1.3], "operation": "scaling"},
"kd": {"range": [0.7, 1.3], "operation": "scaling"},
"torque_limit": {"range": [0.85, 1.0], "operation": "scaling"},
"action_delay": {"range": [0, 3], "operation": "absolute", "unit": "steps"},
},
# Sensor noise
"sensor": {
"joint_pos_noise": {"std": 0.01, "unit": "rad"},
"joint_vel_noise": {"std": 0.1, "unit": "rad/s"},
"imu_quat_noise": {"std": 0.005},
"imu_gyro_noise": {"std": 0.02, "unit": "rad/s"},
},
# Object properties (cho manipulation)
"object": {
"mass": {"range": [0.1, 2.0], "unit": "kg"},
"size_scale": {"range": [0.8, 1.2]},
"friction": {"range": [0.3, 1.5]},
"initial_pos_noise": {"std": 0.02, "unit": "m"},
"initial_rot_noise": {"std": 0.1, "unit": "rad"},
},
# Visual randomization (cho camera-based policies)
"visual": {
"light_intensity": {"range": [0.5, 2.0]},
"light_direction": {"range": [-30, 30], "unit": "deg"},
"texture_randomize": True,
"camera_fov_noise": {"std": 2.0, "unit": "deg"},
},
}
5. Hybrid Object Representation
Paper dùng hybrid representation kết hợp nhiều modality cho object state:
import torch
import torch.nn as nn
class HybridObjectEncoder(nn.Module):
"""
Hybrid object representation với modality-specific augmentation.
Kết hợp: proprioceptive + tactile + visual features.
"""
def __init__(
self,
proprio_dim: int = 13, # object pos (3) + quat (4) + vel (6)
tactile_dim: int = 24, # fingertip sensors
visual_dim: int = 64, # CNN features (nếu dùng camera)
output_dim: int = 32,
use_visual: bool = False,
):
super().__init__()
self.use_visual = use_visual
# Proprioceptive encoder
self.proprio_encoder = nn.Sequential(
nn.Linear(proprio_dim, 64),
nn.LayerNorm(64),
nn.ELU(),
nn.Linear(64, 32),
)
# Tactile encoder
self.tactile_encoder = nn.Sequential(
nn.Linear(tactile_dim, 64),
nn.LayerNorm(64),
nn.ELU(),
nn.Linear(64, 32),
)
# Visual encoder (optional)
if use_visual:
self.visual_encoder = nn.Sequential(
nn.Linear(visual_dim, 64),
nn.LayerNorm(64),
nn.ELU(),
nn.Linear(64, 32),
)
fusion_dim = 32 * 3
else:
fusion_dim = 32 * 2
# Fusion layer
self.fusion = nn.Sequential(
nn.Linear(fusion_dim, 64),
nn.ELU(),
nn.Linear(64, output_dim),
)
# Augmentation parameters
self.proprio_noise_std = 0.01
self.tactile_noise_std = 0.05
self.dropout = nn.Dropout(0.1)
def forward(
self,
proprio: torch.Tensor,
tactile: torch.Tensor,
visual: torch.Tensor | None = None,
augment: bool = False,
) -> torch.Tensor:
"""
Encode object state từ multiple modalities.
Args:
proprio: (batch, proprio_dim) — object pose + velocity
tactile: (batch, tactile_dim) — fingertip sensor readings
visual: (batch, visual_dim) — CNN features (optional)
augment: whether to apply modality-specific augmentation
"""
# Modality-specific augmentation
if augment and self.training:
proprio = proprio + torch.randn_like(proprio) * self.proprio_noise_std
tactile = tactile + torch.randn_like(tactile) * self.tactile_noise_std
# Random modality dropout: sometimes zero out one modality
if torch.rand(1).item() < 0.1:
tactile = torch.zeros_like(tactile)
# Encode each modality
proprio_feat = self.proprio_encoder(proprio)
tactile_feat = self.tactile_encoder(tactile)
if self.use_visual and visual is not None:
visual_feat = self.visual_encoder(visual)
fused = torch.cat([proprio_feat, tactile_feat, visual_feat], dim=-1)
else:
fused = torch.cat([proprio_feat, tactile_feat], dim=-1)
# Fusion
fused = self.dropout(fused) if augment else fused
output = self.fusion(fused)
return output
6. Deployment Script hoàn chỉnh
import torch
import time
import numpy as np
class Sim2RealDeployer:
"""
Deploy trained policy lên robot thật.
Handles: observation processing, action filtering, safety checks.
"""
def __init__(
self,
policy_path: str,
robot_interface,
control_freq: float = 30.0,
action_smoothing: float = 0.3, # EMA smoothing factor
):
self.device = torch.device("cpu")
self.dt = 1.0 / control_freq
self.alpha = action_smoothing
# Load policy
ckpt = torch.load(policy_path, map_location=self.device)
self.policy = GeneralistPolicy(
obs_dim=ckpt["obs_dim"],
action_dim=ckpt["action_dim"],
).to(self.device)
self.policy.load_state_dict(ckpt["policy"])
self.policy.eval()
# Robot interface
self.robot = robot_interface
self.prev_action = None
# Safety limits
self.max_joint_delta = 0.1 # rad per step
self.torque_limit = 50.0 # Nm
# Observation normalization (from training)
self.obs_mean = ckpt.get("obs_mean", torch.zeros(ckpt["obs_dim"]))
self.obs_std = ckpt.get("obs_std", torch.ones(ckpt["obs_dim"]))
def _normalize_obs(self, obs: torch.Tensor) -> torch.Tensor:
"""Normalize observation using training statistics."""
return (obs - self.obs_mean) / (self.obs_std + 1e-8)
def _smooth_action(self, action: torch.Tensor) -> torch.Tensor:
"""EMA smoothing to prevent jerky motion."""
if self.prev_action is None:
self.prev_action = action
return action
smoothed = self.alpha * action + (1 - self.alpha) * self.prev_action
self.prev_action = smoothed
return smoothed
def _safety_check(self, action: torch.Tensor, current_pos: torch.Tensor) -> torch.Tensor:
"""Clamp action to safety limits."""
delta = action - current_pos
delta = delta.clamp(-self.max_joint_delta, self.max_joint_delta)
return current_pos + delta
@torch.no_grad()
def run(self, task_id: int, duration: float = 30.0):
"""
Run policy trên robot thật.
Args:
task_id: 0=grasp, 1=lift, 2=handover
duration: seconds to run
"""
print(f"[Deploy] Starting task {task_id} for {duration}s")
print("[Deploy] Press Ctrl+C to stop")
task_tensor = torch.tensor([task_id], dtype=torch.long)
start_time = time.time()
step_count = 0
try:
while time.time() - start_time < duration:
loop_start = time.time()
# Read robot state
state = self.robot.get_state()
obs = self._build_observation(state)
obs = self._normalize_obs(obs)
# Get action
action = self.policy(obs.unsqueeze(0), task_tensor).squeeze(0)
# Post-process
action = self._smooth_action(action)
action = self._safety_check(action, state["joint_pos"])
# Send to robot
self.robot.set_joint_targets(action.numpy())
step_count += 1
if step_count % 100 == 0:
elapsed = time.time() - start_time
print(
f"[Deploy] Step {step_count} | "
f"Time: {elapsed:.1f}s | "
f"Action norm: {action.norm():.3f}"
)
# Maintain control frequency
elapsed_step = time.time() - loop_start
if elapsed_step < self.dt:
time.sleep(self.dt - elapsed_step)
except KeyboardInterrupt:
print("[Deploy] Stopped by user")
finally:
# Safe shutdown: move to default pose slowly
print("[Deploy] Moving to safe pose...")
self.robot.move_to_default(duration=3.0)
print("[Deploy] Done")
def _build_observation(self, state: dict) -> torch.Tensor:
"""Build observation tensor from robot state dict."""
return torch.cat([
state["joint_pos"],
state["joint_vel"],
state["object_pos"],
state["object_quat"],
state["object_vel"],
state["object_angular_vel"],
state["tactile_left"],
state["tactile_right"],
])
Kết quả thực nghiệm
| Task | Sim Success (%) | Real Success (%) | Sim-to-Real Gap |
|---|---|---|---|
| Grasp-and-Reach | 92.3 | 78.5 | 13.8% |
| Box Lift | 87.1 | 71.2 | 15.9% |
| Bimanual Handover | 83.6 | 64.8 | 18.8% |
Nhận xét:
- Sim-to-real gap ~14-19% — nhỏ hơn nhiều so với các phương pháp không có real-to-sim tuning (gap thường 30-50%)
- Bimanual handover khó nhất vì yêu cầu coordination timing chính xác giữa 2 tay
- Real-to-sim tuning giảm gap ~10% so với chỉ dùng domain randomization
Bài học quan trọng
1. Real-to-Sim > Domain Randomization
Domain randomization là necessary but not sufficient. Nếu simulation quá khác thực tế, dù randomize cỡ nào policy cũng không transfer tốt. Real-to-sim tuning đưa simulation gần thực tế trước, rồi randomize quanh đó.
2. Divide-and-Conquer giảm thời gian training 3-5x
Train 1 generalist trực tiếp mất 200M+ steps. Train 3 specialists (50M mỗi cái) + distill (10M) = 160M total, và kết quả tốt hơn vì mỗi specialist optimize cho 1 objective rõ ràng.
3. Action smoothing là bắt buộc cho hardware
Trong sim, action có thể thay đổi đột ngột mỗi step. Trên hardware, điều này tạo jerk gây hại cho motor và giảm tuổi thọ. EMA smoothing (alpha=0.3) là trick đơn giản nhưng critical cho deploy.
Đọc thêm
- RL cho Robotics: PPO, SAC và cách chọn algorithm — nền tảng RL
- FAME: RL Thích Ứng Lực cho Humanoid Manipulation — force-adaptive locomotion
- Imitation Learning cho Robot — alternative approach
- VLA Models cho Robot — vision-language-action models
Bài viết liên quan
- FAME: RL Thích Ứng Lực cho Humanoid Manipulation
- RL cho Robotics: PPO, SAC và cách chọn algorithm
- Loco-Manipulation cho Humanoid Robot
Tài liệu tham khảo
- Sim-to-Real Reinforcement Learning for Humanoid Dexterous Manipulation — arXiv 2502.20396, Feb 2025
- Sim-to-Real: Learning Agile Locomotion For Quadruped Robots — Tan, J., Zhang, T., Coumans, E., et al., RSS 2018
- Learning Dexterous In-Hand Manipulation — OpenAI, Andrychowicz, M., et al., IJRR 2020
- DexMV: Imitation Learning for Dexterous Manipulation from Human Videos — Qin, Y., et al., ECCV 2022