Tổng quan Pipeline
Sim-to-real transfer không phải là "train xong rồi deploy" — nó là một pipeline nhiều bước với feedback loops. Bài viết này hướng dẫn end-to-end từ bước đầu tiên (define task) đến bước cuối (iterate trên robot thật).
┌─────────────────────────────────────────────────────┐
│ SIM-TO-REAL PIPELINE │
│ │
│ 1. Define Task ──► 2. Obs/Action Space Design │
│ │ │ │
│ ▼ ▼ │
│ 3. Train Base Policy (PPO) ──► 4. Add Domain Rand │
│ │ │ │
│ ▼ ▼ │
│ 5. Evaluate (diverse sim) ──► 6. Deploy (ROS 2) │
│ │ │ │
│ ▼ ▼ │
│ 7. Collect Real Data ──► 8. Fine-tune & Iterate │
│ │ │ │
│ └──────────── feedback ────────┘ │
└─────────────────────────────────────────────────────┘
Bước 1: Define Task trong Simulation
Bước đầu tiên là xác định rõ task mà robot cần làm. Ví dụ: robot arm pick-and-place một object từ vị trí A đến vị trí B.
"""
Bước 1: Define task specification.
Chạy được standalone — không cần hardware.
"""
from dataclasses import dataclass
import numpy as np
@dataclass
class TaskSpec:
"""Specification cho pick-and-place task."""
# Workspace bounds (meters)
workspace_min: np.ndarray = None
workspace_max: np.ndarray = None
# Object properties
object_size: float = 0.04 # 4cm cube
object_mass_range: tuple = (0.05, 0.5) # kg
# Success criteria
position_tolerance: float = 0.02 # 2cm
orientation_tolerance: float = 0.1 # radians
max_episode_steps: int = 200 # ~10 seconds at 20Hz
# Safety constraints
max_joint_velocity: float = 1.5 # rad/s
max_end_effector_force: float = 20.0 # Newton
def __post_init__(self):
if self.workspace_min is None:
self.workspace_min = np.array([0.3, -0.3, 0.0])
if self.workspace_max is None:
self.workspace_max = np.array([0.7, 0.3, 0.3])
def is_success(
self,
object_pos: np.ndarray,
goal_pos: np.ndarray,
) -> bool:
"""Kiểm tra task hoàn thành."""
distance = np.linalg.norm(object_pos - goal_pos)
return distance < self.position_tolerance
def is_safe(
self,
joint_velocities: np.ndarray,
ee_force: float,
) -> bool:
"""Kiểm tra safety constraints."""
vel_ok = np.all(np.abs(joint_velocities) < self.max_joint_velocity)
force_ok = ee_force < self.max_end_effector_force
return vel_ok and force_ok
# Tạo task spec
task = TaskSpec()
print(f"Workspace: {task.workspace_min} -> {task.workspace_max}")
print(f"Success tolerance: {task.position_tolerance}m")
print(f"Max episode: {task.max_episode_steps} steps")
Bước 2: Thiết kế Observation và Action Space
Đây là bước quan trọng nhất và cũng là nơi hay sai nhất. Observation space phải chứa đủ thông tin để solve task, nhưng cũng phải match được với sensors trên robot thật.
"""
Bước 2: Observation và Action Space design.
Nguyên tắc: chỉ dùng observations mà robot thật có thể đo được.
"""
import gymnasium as gym
import numpy as np
class ObservationSpaceDesign:
"""
Design observation space cho sim-to-real transfer.
Quy tắc vàng:
1. Chỉ dùng sensors có trên robot thật
2. Thêm noise matching sensor thật
3. Normalize tất cả values về [-1, 1]
4. Dùng observation history (không chỉ frame hiện tại)
"""
def __init__(self, history_length: int = 3):
self.history_length = history_length
# Proprioceptive observations (luôn available)
self.proprio_dim = 7 + 7 + 1 # joint_pos + joint_vel + gripper
# Exteroceptive observations (cần sensor)
self.extero_dim = 3 + 4 + 3 # obj_pos + obj_quat + goal_pos
# Total per frame
self.frame_dim = self.proprio_dim + self.extero_dim
# Observation space với history
self.obs_dim = self.frame_dim * self.history_length
def get_observation_space(self) -> gym.spaces.Box:
"""Return gym observation space."""
return gym.spaces.Box(
low=-1.0,
high=1.0,
shape=(self.obs_dim,),
dtype=np.float32,
)
def normalize_observation(self, raw_obs: dict) -> np.ndarray:
"""Normalize raw sensor data về [-1, 1]."""
# Joint positions: [-pi, pi] -> [-1, 1]
joint_pos = raw_obs["joint_pos"] / np.pi
# Joint velocities: [-max_vel, max_vel] -> [-1, 1]
max_vel = 2.0 # rad/s
joint_vel = np.clip(raw_obs["joint_vel"] / max_vel, -1, 1)
# Gripper: [0, 0.04] -> [-1, 1]
gripper = raw_obs["gripper_width"] / 0.04 * 2 - 1
# Object position: workspace bounds -> [-1, 1]
workspace_center = np.array([0.5, 0.0, 0.15])
workspace_scale = np.array([0.2, 0.3, 0.15])
obj_pos = (raw_obs["object_pos"] - workspace_center) / workspace_scale
# Object orientation: quaternion already in [-1, 1]
obj_quat = raw_obs["object_quat"]
# Goal position
goal_pos = (raw_obs["goal_pos"] - workspace_center) / workspace_scale
return np.concatenate([
joint_pos, joint_vel, [gripper],
obj_pos, obj_quat, goal_pos,
]).astype(np.float32)
class ActionSpaceDesign:
"""
Design action space.
Dùng delta position control — robust hơn torque control
cho sim-to-real transfer.
"""
def __init__(self):
# 7 joint delta positions + 1 gripper command
self.action_dim = 8
self.max_delta_pos = 0.05 # rad per step
self.max_gripper_speed = 0.04 # m per step
def get_action_space(self) -> gym.spaces.Box:
return gym.spaces.Box(
low=-1.0,
high=1.0,
shape=(self.action_dim,),
dtype=np.float32,
)
def denormalize_action(self, action: np.ndarray) -> dict:
"""Chuyển normalized action [-1,1] về physical commands."""
joint_delta = action[:7] * self.max_delta_pos
gripper_cmd = (action[7] + 1) / 2 * self.max_gripper_speed
return {
"joint_position_delta": joint_delta,
"gripper_width_target": gripper_cmd,
}
# Test
obs_design = ObservationSpaceDesign(history_length=3)
act_design = ActionSpaceDesign()
print(f"Observation dim: {obs_design.obs_dim}")
print(f"Action dim: {act_design.action_dim}")
print(f"Obs space: {obs_design.get_observation_space()}")
print(f"Act space: {act_design.get_action_space()}")
Bước 3: Train Base Policy với PPO
Bắt đầu train không có domain randomization để policy học task cơ bản trước.
"""
Bước 3: Train base policy với PPO.
Sử dụng RSL-RL (NVIDIA Isaac Lab's default RL library).
"""
from dataclasses import dataclass
@dataclass
class PPOConfig:
"""PPO hyperparameters cho robotics tasks."""
# Training
num_envs: int = 4096
num_steps_per_update: int = 24
max_iterations: int = 3000
learning_rate: float = 3e-4
lr_schedule: str = "adaptive" # Giảm LR khi KL divergence cao
# PPO specific
clip_param: float = 0.2
entropy_coef: float = 0.01
value_loss_coef: float = 1.0
max_grad_norm: float = 1.0
gamma: float = 0.99
lam: float = 0.95 # GAE lambda
num_mini_batches: int = 4
num_epochs: int = 5
# Network architecture
actor_hidden_dims: list = None
critic_hidden_dims: list = None
activation: str = "elu"
def __post_init__(self):
if self.actor_hidden_dims is None:
self.actor_hidden_dims = [256, 256, 128]
if self.critic_hidden_dims is None:
self.critic_hidden_dims = [256, 256, 128]
@dataclass
class RewardConfig:
"""Reward shaping cho pick-and-place."""
# Reaching reward: khoảng cách end-effector đến object
reaching_weight: float = 1.0
reaching_temperature: float = 0.1
# Grasping reward: object lifted
grasping_weight: float = 5.0
lift_threshold: float = 0.05 # meters
# Placing reward: object tại goal
placing_weight: float = 10.0
# Penalties
action_penalty_weight: float = 0.01
velocity_penalty_weight: float = 0.001
def compute_reward(
ee_pos: "np.ndarray",
obj_pos: "np.ndarray",
goal_pos: "np.ndarray",
action: "np.ndarray",
config: RewardConfig,
) -> float:
"""
Compute shaped reward cho pick-and-place.
"""
import numpy as np
# Reaching: exponential distance reward
reach_dist = np.linalg.norm(ee_pos - obj_pos)
reach_reward = np.exp(-reach_dist / config.reaching_temperature)
# Grasping: bonus khi object lifted
lift_height = obj_pos[2] - 0.0 # height above table
grasp_reward = float(lift_height > config.lift_threshold)
# Placing: distance object đến goal
place_dist = np.linalg.norm(obj_pos - goal_pos)
place_reward = np.exp(-place_dist / config.reaching_temperature)
# Penalties
action_penalty = np.sum(action ** 2)
reward = (
config.reaching_weight * reach_reward
+ config.grasping_weight * grasp_reward
+ config.placing_weight * place_reward
- config.action_penalty_weight * action_penalty
)
return reward
Training command:
# Train base policy — không DR, 4096 envs, ~30 phút trên RTX 4090
python train.py \
--task PickPlace \
--num_envs 4096 \
--max_iterations 3000 \
--headless \
--experiment base_policy
# Evaluate
python play.py \
--task PickPlace \
--checkpoint logs/base_policy/model_3000.pt \
--num_eval_episodes 100
Bước 4: Thêm Domain Randomization
Sau khi base policy đạt >90% success rate trong sim (không DR), thêm DR từ từ theo curriculum.
Chi tiết về DR đã được trình bày trong Domain Randomization: Chìa khóa Sim-to-Real Transfer. Ở đây focus vào cách integrate DR vào training loop.
"""
Bước 4: Curriculum-based Domain Randomization.
Tăng dần DR range theo training progress.
"""
import numpy as np
from dataclasses import dataclass
@dataclass
class DRCurriculumConfig:
"""Curriculum cho domain randomization."""
# Khi nào bắt đầu DR (iteration)
dr_start_iteration: int = 500
# Ramp-up period (iterations)
dr_ramp_iterations: int = 1500
# Final DR ranges
final_mass_scale: tuple = (0.5, 2.0)
final_friction: tuple = (0.3, 1.5)
final_actuator_scale: tuple = (0.8, 1.2)
final_action_delay: int = 3
final_obs_noise: float = 0.02
def get_dr_params(config: DRCurriculumConfig, iteration: int) -> dict:
"""Tính DR params theo curriculum."""
if iteration < config.dr_start_iteration:
return {
"mass_scale": (1.0, 1.0),
"friction": (0.8, 0.8),
"actuator_scale": (1.0, 1.0),
"action_delay": 0,
"obs_noise": 0.0,
}
# Tính progress 0->1
progress = min(
1.0,
(iteration - config.dr_start_iteration)
/ config.dr_ramp_iterations,
)
def lerp_range(nominal, final_range, p):
low = nominal - (nominal - final_range[0]) * p
high = nominal + (final_range[1] - nominal) * p
return (low, high)
return {
"mass_scale": lerp_range(1.0, config.final_mass_scale, progress),
"friction": lerp_range(0.8, config.final_friction, progress),
"actuator_scale": lerp_range(
1.0, config.final_actuator_scale, progress,
),
"action_delay": int(config.final_action_delay * progress),
"obs_noise": config.final_obs_noise * progress,
}
# Ví dụ: xem DR schedule
config = DRCurriculumConfig()
for it in [0, 500, 1000, 1500, 2000]:
params = get_dr_params(config, it)
print(f"Iteration {it}: {params}")
Bước 5: Evaluate trong diverse sim conditions
Trước khi deploy, evaluate policy trên nhiều configurations khác nhau để đảm bảo robustness.
"""
Bước 5: Systematic evaluation trong simulation.
Test policy trên grid of parameters.
"""
import numpy as np
from itertools import product
def systematic_evaluation(
policy,
env_factory,
num_episodes_per_config: int = 50,
) -> dict:
"""
Evaluate policy trên grid of DR parameters.
Returns success rate per configuration.
"""
# Parameter grid
frictions = [0.3, 0.5, 0.8, 1.0, 1.5]
masses = [0.5, 0.75, 1.0, 1.5, 2.0]
delays = [0, 1, 2, 3]
results = {}
for friction, mass_scale, delay in product(frictions, masses, delays):
config_name = f"f{friction}_m{mass_scale}_d{delay}"
env = env_factory(
friction=friction,
mass_scale=mass_scale,
action_delay=delay,
)
successes = 0
for ep in range(num_episodes_per_config):
obs = env.reset()
done = False
while not done:
action = policy.predict(obs)
obs, reward, done, info = env.step(action)
if info.get("is_success", False):
successes += 1
success_rate = successes / num_episodes_per_config
results[config_name] = success_rate
env.close()
# Tổng hợp
rates = list(results.values())
summary = {
"mean_success_rate": np.mean(rates),
"min_success_rate": np.min(rates),
"max_success_rate": np.max(rates),
"std_success_rate": np.std(rates),
"worst_config": min(results, key=results.get),
"best_config": max(results, key=results.get),
"per_config": results,
}
print(f"Mean success: {summary['mean_success_rate']:.1%}")
print(f"Min success: {summary['min_success_rate']:.1%}")
print(f"Worst config: {summary['worst_config']}")
return summary
Target: Mean success rate > 85% trên full DR range. Nếu chưa đạt, quay lại Bước 3-4.
Bước 6: Deploy lên Robot thật qua ROS 2
Đây là bước exciting nhất — chạy policy trên hardware thật.
"""
Bước 6: Deploy policy lên robot thật via ROS 2.
Node nhận sensor data, chạy inference, publish commands.
"""
import rclpy
from rclpy.node import Node
import torch
import numpy as np
from sensor_msgs.msg import JointState
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Float64MultiArray
class SimToRealDeployNode(Node):
"""ROS 2 node để deploy trained policy."""
def __init__(self):
super().__init__("sim2real_deploy")
# Load policy
model_path = self.declare_parameter(
"model_path", "models/policy.pt"
).value
self.policy = torch.jit.load(model_path)
self.policy.eval()
self.get_logger().info(f"Loaded policy from {model_path}")
# Observation buffer (history_length=3)
self.obs_history = []
self.history_length = 3
# Current sensor readings
self.joint_pos = np.zeros(7)
self.joint_vel = np.zeros(7)
self.gripper_width = 0.0
self.object_pos = np.zeros(3)
self.object_quat = np.array([1.0, 0.0, 0.0, 0.0])
self.goal_pos = np.array([0.5, 0.0, 0.15])
# Normalization params (phải match training!)
self.joint_pos_scale = np.pi
self.joint_vel_scale = 2.0
self.workspace_center = np.array([0.5, 0.0, 0.15])
self.workspace_scale = np.array([0.2, 0.3, 0.15])
# Safety limits
self.max_delta = 0.05 # rad per step
self.joint_limits_low = np.array(
[-2.89, -1.76, -2.89, -3.07, -2.89, -0.02, -2.89]
)
self.joint_limits_high = np.array(
[2.89, 1.76, 2.89, -0.07, 2.89, 3.75, 2.89]
)
# Subscribers
self.create_subscription(
JointState, "/joint_states", self.joint_cb, 10
)
self.create_subscription(
PoseStamped, "/object_pose", self.object_cb, 10
)
# Publisher
self.cmd_pub = self.create_publisher(
Float64MultiArray, "/joint_commands", 10
)
# Control loop at 20Hz
self.create_timer(0.05, self.control_loop)
self.get_logger().info("Deploy node ready, running at 20Hz")
def joint_cb(self, msg: JointState):
self.joint_pos = np.array(msg.position[:7])
self.joint_vel = np.array(msg.velocity[:7])
if len(msg.position) > 7:
self.gripper_width = msg.position[7]
def object_cb(self, msg: PoseStamped):
p = msg.pose.position
q = msg.pose.orientation
self.object_pos = np.array([p.x, p.y, p.z])
self.object_quat = np.array([q.w, q.x, q.y, q.z])
def build_observation(self) -> np.ndarray:
"""Build normalized observation matching training format."""
# Normalize
jp = self.joint_pos / self.joint_pos_scale
jv = np.clip(self.joint_vel / self.joint_vel_scale, -1, 1)
gw = self.gripper_width / 0.04 * 2 - 1
op = (self.object_pos - self.workspace_center) / self.workspace_scale
oq = self.object_quat
gp = (self.goal_pos - self.workspace_center) / self.workspace_scale
frame = np.concatenate([jp, jv, [gw], op, oq, gp]).astype(np.float32)
# Update history
self.obs_history.append(frame)
if len(self.obs_history) > self.history_length:
self.obs_history.pop(0)
# Pad nếu chưa đủ history
while len(self.obs_history) < self.history_length:
self.obs_history.insert(0, frame.copy())
return np.concatenate(self.obs_history)
def control_loop(self):
"""Main control loop — inference + safety check + publish."""
obs = self.build_observation()
# Inference
with torch.no_grad():
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
action = self.policy(obs_tensor).squeeze().numpy()
# Denormalize action
joint_delta = action[:7] * self.max_delta
# Safety: clamp to joint limits
target_pos = self.joint_pos + joint_delta
target_pos = np.clip(
target_pos, self.joint_limits_low, self.joint_limits_high
)
# Safety: smooth (low-pass filter)
alpha = 0.3 # Smoothing factor
if hasattr(self, "_prev_target"):
target_pos = alpha * target_pos + (1 - alpha) * self._prev_target
self._prev_target = target_pos.copy()
# Publish
cmd = Float64MultiArray()
cmd.data = target_pos.tolist()
self.cmd_pub.publish(cmd)
def main(args=None):
rclpy.init(args=args)
node = SimToRealDeployNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
Bước 7: Collect Real Data và Fine-tune
Sau khi deploy, thu thập data từ robot thật để fine-tune policy:
"""
Bước 7: Collect real-world data và fine-tune.
Record trajectories từ robot thật, dùng để improve policy.
"""
import numpy as np
from pathlib import Path
from datetime import datetime
class RealWorldDataCollector:
"""Thu thập và lưu trajectories từ robot thật."""
def __init__(self, save_dir: str = "real_data"):
self.save_dir = Path(save_dir)
self.save_dir.mkdir(parents=True, exist_ok=True)
self.current_episode = []
def record_step(
self,
observation: np.ndarray,
action: np.ndarray,
reward: float,
is_success: bool,
):
"""Ghi lại mỗi step."""
self.current_episode.append({
"obs": observation.copy(),
"action": action.copy(),
"reward": reward,
"success": is_success,
})
def end_episode(self, metadata: dict = None):
"""Kết thúc episode, lưu file."""
if not self.current_episode:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
episode_data = {
"steps": self.current_episode,
"num_steps": len(self.current_episode),
"total_reward": sum(s["reward"] for s in self.current_episode),
"success": self.current_episode[-1]["success"],
"metadata": metadata or {},
}
filepath = self.save_dir / f"episode_{timestamp}.npz"
np.savez_compressed(filepath, **episode_data)
print(
f"Saved episode: {len(self.current_episode)} steps, "
f"success={episode_data['success']}"
)
self.current_episode = []
return filepath
def fine_tune_with_real_data(
policy,
real_data_dir: str,
sim_replay_env,
num_fine_tune_iterations: int = 500,
):
"""
Fine-tune policy dùng real data.
Strategy: replay real trajectories trong sim để identify gap,
rồi adjust DR parameters.
"""
real_episodes = list(Path(real_data_dir).glob("episode_*.npz"))
print(f"Loaded {len(real_episodes)} real episodes")
# Phân tích failure modes
failures = []
successes = []
for ep_path in real_episodes:
data = np.load(ep_path, allow_pickle=True)
if data["success"]:
successes.append(data)
else:
failures.append(data)
success_rate = len(successes) / max(len(real_episodes), 1)
print(f"Real-world success rate: {success_rate:.1%}")
print(f"Failures: {len(failures)}, Successes: {len(successes)}")
# Nếu success rate < 80%, cần adjust
if success_rate < 0.8:
print("Success rate thấp — cần điều chỉnh DR hoặc retrain")
# Analyze failure patterns...
return success_rate
Bước 8: Iterate
Sim-to-real là một iterative process. Mỗi vòng lặp:
- Analyze failures từ real deployment
- Adjust simulation — thêm DR cho failure modes, hoặc cải thiện physics model
- Retrain policy với updated sim
- Re-deploy và collect data lại
| Vòng lặp | Thường focus vào |
|---|---|
| 1st | Observation mismatch, action scaling |
| 2nd | Dynamics gap (friction, mass) |
| 3rd | Sensor noise, communication delay |
| 4th+ | Edge cases, long-horizon robustness |
Common Pitfalls
1. Observation Space Mismatch
Vấn đề: Observation trong sim và real khác format, scale, hoặc coordinate frame.
# SAI: sim dùng world frame, real dùng robot base frame
# Sim: object_pos = [1.5, 0.3, 0.8] (world frame)
# Real: object_pos = [0.5, 0.3, 0.1] (robot base frame)
# ĐÚNG: luôn dùng robot base frame ở cả sim và real
object_pos_base_frame = transform_to_base_frame(
object_pos_world, robot_base_pose
)
2. Action Delay
Vấn đề: Sim thường không có delay, nhưng real robot có 1-5 frames delay do communication, motor response.
Giải pháp: Thêm random action delay trong sim training (đã cover ở Bước 4).
3. Sensor Calibration
Vấn đề: Camera intrinsics/extrinsics, joint encoder offsets khác giữa sim và real.
Giải pháp: Calibrate sensors trên robot thật trước khi deploy. Dùng camera calibration cho vision, joint homing cho encoders.
So sánh Tools: Isaac Lab vs MuJoCo Playground
| Feature | Isaac Lab | MuJoCo Playground |
|---|---|---|
| GPU parallel envs | 10,000+ (PhysX 5) | 1,000+ (MJX/JAX) |
| Rendering | RTX ray-tracing | Basic OpenGL |
| DR support | Built-in YAML config | Manual Python code |
| ROS 2 bridge | Isaac ROS | ros2_mujoco (community) |
| Learning curve | Steep (Omniverse stack) | Medium (standalone) |
| Best for | Visual sim-to-real, large-scale RL | Contact-rich manipulation, research |
| Hardware req | NVIDIA GPU (RTX 3070+) | CPU hoặc GPU (flexible) |
| Community | NVIDIA forums | Google DeepMind + open source |
Khuyến nghị: Dùng Isaac Lab nếu bạn có NVIDIA GPU và cần visual sim-to-real. Dùng MuJoCo nếu focus vào manipulation research hoặc cần flexible hơn.
Checklist trước khi Deploy
Trước khi chạy policy trên robot thật, verify tất cả items:
- Observation space match giữa sim và real (units, frames, ordering)
- Action space match (delta position? absolute? torque?)
- Normalization params consistent
- Safety limits configured (joint limits, velocity limits, force limits)
- Emergency stop tested
- Low-pass filter on actions
- Sim success rate > 85% trên full DR range
- Communication latency measured và accounted for
- Sensor calibration done
Tổng kết
Sim-to-real pipeline không phải one-shot — nó là iterative process đòi hỏi careful engineering ở mỗi bước. Observation/action space design thường quan trọng hơn algorithm choice. Domain randomization là necessary nhưng không sufficient — cần kết hợp với careful system identification và real-world iteration.
Bài viết liên quan
- Domain Randomization: Chìa khóa Sim-to-Real Transfer — Deep dive vào kỹ thuật domain randomization
- Sim-to-Real Transfer: Train simulation, chạy thực tế — Tổng quan sim-to-real
- Digital Twins và ROS 2: Simulation trong sản xuất — Ứng dụng trong công nghiệp
- RL cho Robot đi 2 chân: Từ MuJoCo đến thực tế — Case study thực tế của sim-to-real pipeline