Pipeline Overview
Sim-to-real transfer is not "train and deploy" — it's a multi-step pipeline with feedback loops. This guide walks through end-to-end from the first step (define task) to the final step (iterate on real hardware).
┌─────────────────────────────────────────────────────┐
│ SIM-TO-REAL PIPELINE │
│ │
│ 1. Define Task ──► 2. Obs/Action Space Design │
│ │ │ │
│ ▼ ▼ │
│ 3. Train Base Policy (PPO) ──► 4. Add Domain Rand │
│ │ │ │
│ ▼ ▼ │
│ 5. Evaluate (diverse sim) ──► 6. Deploy (ROS 2) │
│ │ │ │
│ ▼ ▼ │
│ 7. Collect Real Data ──► 8. Fine-tune & Iterate │
│ │ │ │
│ └──────────── feedback ────────┘ │
└─────────────────────────────────────────────────────┘
Step 1: Define Task in Simulation
The first step is to clearly define the task the robot needs to perform. Example: a robot arm picks and places an object from position A to position B.
"""
Step 1: Define task specification.
Runs standalone — no hardware needed.
"""
from dataclasses import dataclass
import numpy as np
@dataclass
class TaskSpec:
"""Specification for pick-and-place task."""
# Workspace bounds (meters)
workspace_min: np.ndarray = None
workspace_max: np.ndarray = None
# Object properties
object_size: float = 0.04 # 4cm cube
object_mass_range: tuple = (0.05, 0.5) # kg
# Success criteria
position_tolerance: float = 0.02 # 2cm
orientation_tolerance: float = 0.1 # radians
max_episode_steps: int = 200 # ~10 seconds at 20Hz
# Safety constraints
max_joint_velocity: float = 1.5 # rad/s
max_end_effector_force: float = 20.0 # Newton
def __post_init__(self):
if self.workspace_min is None:
self.workspace_min = np.array([0.3, -0.3, 0.0])
if self.workspace_max is None:
self.workspace_max = np.array([0.7, 0.3, 0.3])
def is_success(
self,
object_pos: np.ndarray,
goal_pos: np.ndarray,
) -> bool:
"""Check if task is complete."""
distance = np.linalg.norm(object_pos - goal_pos)
return distance < self.position_tolerance
def is_safe(
self,
joint_velocities: np.ndarray,
ee_force: float,
) -> bool:
"""Check safety constraints."""
vel_ok = np.all(np.abs(joint_velocities) < self.max_joint_velocity)
force_ok = ee_force < self.max_end_effector_force
return vel_ok and force_ok
# Create task spec
task = TaskSpec()
print(f"Workspace: {task.workspace_min} -> {task.workspace_max}")
print(f"Success tolerance: {task.position_tolerance}m")
print(f"Max episode: {task.max_episode_steps} steps")
Step 2: Design Observation and Action Space
This is the most important and most error-prone step. Observation space must contain enough information to solve the task, but also match sensors on the real robot.
"""
Step 2: Observation and Action Space design.
Principle: only use observations the real robot can measure.
"""
import gymnasium as gym
import numpy as np
class ObservationSpaceDesign:
"""
Design observation space for sim-to-real transfer.
Golden rules:
1. Only use sensors available on real robot
2. Add noise matching real sensor noise
3. Normalize all values to [-1, 1]
4. Use observation history (not just current frame)
"""
def __init__(self, history_length: int = 3):
self.history_length = history_length
# Proprioceptive observations (always available)
self.proprio_dim = 7 + 7 + 1 # joint_pos + joint_vel + gripper
# Exteroceptive observations (need sensor)
self.extero_dim = 3 + 4 + 3 # obj_pos + obj_quat + goal_pos
# Total per frame
self.frame_dim = self.proprio_dim + self.extero_dim
# Observation space with history
self.obs_dim = self.frame_dim * self.history_length
def get_observation_space(self) -> gym.spaces.Box:
"""Return gym observation space."""
return gym.spaces.Box(
low=-1.0,
high=1.0,
shape=(self.obs_dim,),
dtype=np.float32,
)
def normalize_observation(self, raw_obs: dict) -> np.ndarray:
"""Normalize raw sensor data to [-1, 1]."""
# Joint positions: [-pi, pi] -> [-1, 1]
joint_pos = raw_obs["joint_pos"] / np.pi
# Joint velocities: [-max_vel, max_vel] -> [-1, 1]
max_vel = 2.0 # rad/s
joint_vel = np.clip(raw_obs["joint_vel"] / max_vel, -1, 1)
# Gripper: [0, 0.04] -> [-1, 1]
gripper = raw_obs["gripper_width"] / 0.04 * 2 - 1
# Object position: workspace bounds -> [-1, 1]
workspace_center = np.array([0.5, 0.0, 0.15])
workspace_scale = np.array([0.2, 0.3, 0.15])
obj_pos = (raw_obs["object_pos"] - workspace_center) / workspace_scale
# Object orientation: quaternion already in [-1, 1]
obj_quat = raw_obs["object_quat"]
# Goal position
goal_pos = (raw_obs["goal_pos"] - workspace_center) / workspace_scale
return np.concatenate([
joint_pos, joint_vel, [gripper],
obj_pos, obj_quat, goal_pos,
]).astype(np.float32)
class ActionSpaceDesign:
"""
Design action space.
Use delta position control — more robust than torque control
for sim-to-real transfer.
"""
def __init__(self):
# 7 joint delta positions + 1 gripper command
self.action_dim = 8
self.max_delta_pos = 0.05 # rad per step
self.max_gripper_speed = 0.04 # m per step
def get_action_space(self) -> gym.spaces.Box:
return gym.spaces.Box(
low=-1.0,
high=1.0,
shape=(self.action_dim,),
dtype=np.float32,
)
def denormalize_action(self, action: np.ndarray) -> dict:
"""Convert normalized action [-1,1] to physical commands."""
joint_delta = action[:7] * self.max_delta_pos
gripper_cmd = (action[7] + 1) / 2 * self.max_gripper_speed
return {
"joint_position_delta": joint_delta,
"gripper_width_target": gripper_cmd,
}
# Test
obs_design = ObservationSpaceDesign(history_length=3)
act_design = ActionSpaceDesign()
print(f"Observation dim: {obs_design.obs_dim}")
print(f"Action dim: {act_design.action_dim}")
print(f"Obs space: {obs_design.get_observation_space()}")
print(f"Act space: {act_design.get_action_space()}")
Step 3: Train Base Policy with PPO
Start training without domain randomization so the policy learns the base task first.
"""
Step 3: Train base policy with PPO.
Using RSL-RL (NVIDIA Isaac Lab's default RL library).
"""
from dataclasses import dataclass
@dataclass
class PPOConfig:
"""PPO hyperparameters for robotics tasks."""
# Training
num_envs: int = 4096
num_steps_per_update: int = 24
max_iterations: int = 3000
learning_rate: float = 3e-4
lr_schedule: str = "adaptive" # Reduce LR when KL divergence high
# PPO specific
clip_param: float = 0.2
entropy_coef: float = 0.01
value_loss_coef: float = 1.0
max_grad_norm: float = 1.0
gamma: float = 0.99
lam: float = 0.95 # GAE lambda
num_mini_batches: int = 4
num_epochs: int = 5
# Network architecture
actor_hidden_dims: list = None
critic_hidden_dims: list = None
activation: str = "elu"
def __post_init__(self):
if self.actor_hidden_dims is None:
self.actor_hidden_dims = [256, 256, 128]
if self.critic_hidden_dims is None:
self.critic_hidden_dims = [256, 256, 128]
@dataclass
class RewardConfig:
"""Reward shaping for pick-and-place."""
# Reaching reward: distance of end-effector to object
reaching_weight: float = 1.0
reaching_temperature: float = 0.1
# Grasping reward: object lifted
grasping_weight: float = 5.0
lift_threshold: float = 0.05 # meters
# Placing reward: object at goal
placing_weight: float = 10.0
# Penalties
action_penalty_weight: float = 0.01
velocity_penalty_weight: float = 0.001
def compute_reward(
ee_pos: "np.ndarray",
obj_pos: "np.ndarray",
goal_pos: "np.ndarray",
action: "np.ndarray",
config: RewardConfig,
) -> float:
"""
Compute shaped reward for pick-and-place.
"""
import numpy as np
# Reaching: exponential distance reward
reach_dist = np.linalg.norm(ee_pos - obj_pos)
reach_reward = np.exp(-reach_dist / config.reaching_temperature)
# Grasping: bonus when object lifted
lift_height = obj_pos[2] - 0.0 # height above table
grasp_reward = float(lift_height > config.lift_threshold)
# Placing: distance of object to goal
place_dist = np.linalg.norm(obj_pos - goal_pos)
place_reward = np.exp(-place_dist / config.reaching_temperature)
# Penalties
action_penalty = np.sum(action ** 2)
reward = (
config.reaching_weight * reach_reward
+ config.grasping_weight * grasp_reward
+ config.placing_weight * place_reward
- config.action_penalty_weight * action_penalty
)
return reward
Training command:
# Train base policy — no DR, 4096 envs, ~30 min on RTX 4090
python train.py \
--task PickPlace \
--num_envs 4096 \
--max_iterations 3000 \
--headless \
--experiment base_policy
# Evaluate
python play.py \
--task PickPlace \
--checkpoint logs/base_policy/model_3000.pt \
--num_eval_episodes 100
Step 4: Add Domain Randomization
After base policy reaches >90% success rate in sim (no DR), gradually add DR with curriculum.
See Domain Randomization: Key to Sim-to-Real Transfer for detailed DR techniques. Here we focus on integrating DR into the training loop.
"""
Step 4: Curriculum-based Domain Randomization.
Gradually increase DR range with training progress.
"""
import numpy as np
from dataclasses import dataclass
@dataclass
class DRCurriculumConfig:
"""Curriculum for domain randomization."""
# When to start DR (iteration)
dr_start_iteration: int = 500
# Ramp-up period (iterations)
dr_ramp_iterations: int = 1500
# Final DR ranges
final_mass_scale: tuple = (0.5, 2.0)
final_friction: tuple = (0.3, 1.5)
final_actuator_scale: tuple = (0.8, 1.2)
final_action_delay: int = 3
final_obs_noise: float = 0.02
def get_dr_params(config: DRCurriculumConfig, iteration: int) -> dict:
"""Compute DR params according to curriculum."""
if iteration < config.dr_start_iteration:
return {
"mass_scale": (1.0, 1.0),
"friction": (0.8, 0.8),
"actuator_scale": (1.0, 1.0),
"action_delay": 0,
"obs_noise": 0.0,
}
# Compute progress 0->1
progress = min(
1.0,
(iteration - config.dr_start_iteration)
/ config.dr_ramp_iterations,
)
def lerp_range(nominal, final_range, p):
low = nominal - (nominal - final_range[0]) * p
high = nominal + (final_range[1] - nominal) * p
return (low, high)
return {
"mass_scale": lerp_range(1.0, config.final_mass_scale, progress),
"friction": lerp_range(0.8, config.final_friction, progress),
"actuator_scale": lerp_range(
1.0, config.final_actuator_scale, progress,
),
"action_delay": int(config.final_action_delay * progress),
"obs_noise": config.final_obs_noise * progress,
}
# Example: view DR schedule
config = DRCurriculumConfig()
for it in [0, 500, 1000, 1500, 2000]:
params = get_dr_params(config, it)
print(f"Iteration {it}: {params}")
Step 5: Evaluate in Diverse Sim Conditions
Before deploying, evaluate the policy on many different configurations to ensure robustness.
"""
Step 5: Systematic evaluation in simulation.
Test policy on grid of parameters.
"""
import numpy as np
from itertools import product
def systematic_evaluation(
policy,
env_factory,
num_episodes_per_config: int = 50,
) -> dict:
"""
Evaluate policy on grid of DR parameters.
Returns success rate per configuration.
"""
# Parameter grid
frictions = [0.3, 0.5, 0.8, 1.0, 1.5]
masses = [0.5, 0.75, 1.0, 1.5, 2.0]
delays = [0, 1, 2, 3]
results = {}
for friction, mass_scale, delay in product(frictions, masses, delays):
config_name = f"f{friction}_m{mass_scale}_d{delay}"
env = env_factory(
friction=friction,
mass_scale=mass_scale,
action_delay=delay,
)
successes = 0
for ep in range(num_episodes_per_config):
obs = env.reset()
done = False
while not done:
action = policy.predict(obs)
obs, reward, done, info = env.step(action)
if info.get("is_success", False):
successes += 1
success_rate = successes / num_episodes_per_config
results[config_name] = success_rate
env.close()
# Summary
rates = list(results.values())
summary = {
"mean_success_rate": np.mean(rates),
"min_success_rate": np.min(rates),
"max_success_rate": np.max(rates),
"std_success_rate": np.std(rates),
"worst_config": min(results, key=results.get),
"best_config": max(results, key=results.get),
"per_config": results,
}
print(f"Mean success: {summary['mean_success_rate']:.1%}")
print(f"Min success: {summary['min_success_rate']:.1%}")
print(f"Worst config: {summary['worst_config']}")
return summary
Target: Mean success rate > 85% across full DR range. If not achieved, go back to Steps 3-4.
Step 6: Deploy to Real Robot via ROS 2
This is the exciting step — running the policy on real hardware.
"""
Step 6: Deploy policy to real robot via ROS 2.
Node receives sensor data, runs inference, publishes commands.
"""
import rclpy
from rclpy.node import Node
import torch
import numpy as np
from sensor_msgs.msg import JointState
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Float64MultiArray
class SimToRealDeployNode(Node):
"""ROS 2 node for deploying trained policy."""
def __init__(self):
super().__init__("sim2real_deploy")
# Load policy
model_path = self.declare_parameter(
"model_path", "models/policy.pt"
).value
self.policy = torch.jit.load(model_path)
self.policy.eval()
self.get_logger().info(f"Loaded policy from {model_path}")
# Observation buffer (history_length=3)
self.obs_history = []
self.history_length = 3
# Current sensor readings
self.joint_pos = np.zeros(7)
self.joint_vel = np.zeros(7)
self.gripper_width = 0.0
self.object_pos = np.zeros(3)
self.object_quat = np.array([1.0, 0.0, 0.0, 0.0])
self.goal_pos = np.array([0.5, 0.0, 0.15])
# Normalization params (must match training!)
self.joint_pos_scale = np.pi
self.joint_vel_scale = 2.0
self.workspace_center = np.array([0.5, 0.0, 0.15])
self.workspace_scale = np.array([0.2, 0.3, 0.15])
# Safety limits
self.max_delta = 0.05 # rad per step
self.joint_limits_low = np.array(
[-2.89, -1.76, -2.89, -3.07, -2.89, -0.02, -2.89]
)
self.joint_limits_high = np.array(
[2.89, 1.76, 2.89, -0.07, 2.89, 3.75, 2.89]
)
# Subscribers
self.create_subscription(
JointState, "/joint_states", self.joint_cb, 10
)
self.create_subscription(
PoseStamped, "/object_pose", self.object_cb, 10
)
# Publisher
self.cmd_pub = self.create_publisher(
Float64MultiArray, "/joint_commands", 10
)
# Control loop at 20Hz
self.create_timer(0.05, self.control_loop)
self.get_logger().info("Deploy node ready, running at 20Hz")
def joint_cb(self, msg: JointState):
self.joint_pos = np.array(msg.position[:7])
self.joint_vel = np.array(msg.velocity[:7])
if len(msg.position) > 7:
self.gripper_width = msg.position[7]
def object_cb(self, msg: PoseStamped):
p = msg.pose.position
q = msg.pose.orientation
self.object_pos = np.array([p.x, p.y, p.z])
self.object_quat = np.array([q.w, q.x, q.y, q.z])
def build_observation(self) -> np.ndarray:
"""Build normalized observation matching training format."""
# Normalize
jp = self.joint_pos / self.joint_pos_scale
jv = np.clip(self.joint_vel / self.joint_vel_scale, -1, 1)
gw = self.gripper_width / 0.04 * 2 - 1
op = (self.object_pos - self.workspace_center) / self.workspace_scale
oq = self.object_quat
gp = (self.goal_pos - self.workspace_center) / self.workspace_scale
frame = np.concatenate([jp, jv, [gw], op, oq, gp]).astype(np.float32)
# Update history
self.obs_history.append(frame)
if len(self.obs_history) > self.history_length:
self.obs_history.pop(0)
# Pad if not enough history
while len(self.obs_history) < self.history_length:
self.obs_history.insert(0, frame.copy())
return np.concatenate(self.obs_history)
def control_loop(self):
"""Main control loop — inference + safety check + publish."""
obs = self.build_observation()
# Inference
with torch.no_grad():
obs_tensor = torch.FloatTensor(obs).unsqueeze(0)
action = self.policy(obs_tensor).squeeze().numpy()
# Denormalize action
joint_delta = action[:7] * self.max_delta
# Safety: clamp to joint limits
target_pos = self.joint_pos + joint_delta
target_pos = np.clip(
target_pos, self.joint_limits_low, self.joint_limits_high
)
# Safety: smooth (low-pass filter)
alpha = 0.3 # Smoothing factor
if hasattr(self, "_prev_target"):
target_pos = alpha * target_pos + (1 - alpha) * self._prev_target
self._prev_target = target_pos.copy()
# Publish
cmd = Float64MultiArray()
cmd.data = target_pos.tolist()
self.cmd_pub.publish(cmd)
def main(args=None):
rclpy.init(args=args)
node = SimToRealDeployNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
Step 7: Collect Real Data and Fine-tune
After deployment, collect data from the real robot to fine-tune the policy:
"""
Step 7: Collect real-world data and fine-tune.
Record trajectories from real robot, use to improve policy.
"""
import numpy as np
from pathlib import Path
from datetime import datetime
class RealWorldDataCollector:
"""Collect and save trajectories from real robot."""
def __init__(self, save_dir: str = "real_data"):
self.save_dir = Path(save_dir)
self.save_dir.mkdir(parents=True, exist_ok=True)
self.current_episode = []
def record_step(
self,
observation: np.ndarray,
action: np.ndarray,
reward: float,
is_success: bool,
):
"""Record each step."""
self.current_episode.append({
"obs": observation.copy(),
"action": action.copy(),
"reward": reward,
"success": is_success,
})
def end_episode(self, metadata: dict = None):
"""End episode, save file."""
if not self.current_episode:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
episode_data = {
"steps": self.current_episode,
"num_steps": len(self.current_episode),
"total_reward": sum(s["reward"] for s in self.current_episode),
"success": self.current_episode[-1]["success"],
"metadata": metadata or {},
}
filepath = self.save_dir / f"episode_{timestamp}.npz"
np.savez_compressed(filepath, **episode_data)
print(
f"Saved episode: {len(self.current_episode)} steps, "
f"success={episode_data['success']}"
)
self.current_episode = []
return filepath
def fine_tune_with_real_data(
policy,
real_data_dir: str,
sim_replay_env,
num_fine_tune_iterations: int = 500,
):
"""
Fine-tune policy using real data.
Strategy: replay real trajectories in sim to identify gap,
then adjust DR parameters.
"""
real_episodes = list(Path(real_data_dir).glob("episode_*.npz"))
print(f"Loaded {len(real_episodes)} real episodes")
# Analyze failure modes
failures = []
successes = []
for ep_path in real_episodes:
data = np.load(ep_path, allow_pickle=True)
if data["success"]:
successes.append(data)
else:
failures.append(data)
success_rate = len(successes) / max(len(real_episodes), 1)
print(f"Real-world success rate: {success_rate:.1%}")
print(f"Failures: {len(failures)}, Successes: {len(successes)}")
# If success rate < 80%, need adjustment
if success_rate < 0.8:
print("Low success rate — need to adjust DR or retrain")
# Analyze failure patterns...
return success_rate
Step 8: Iterate
Sim-to-real is an iterative process. Each loop:
- Analyze failures from real deployment
- Adjust simulation — add DR for failure modes, or improve physics model
- Retrain policy with updated sim
- Re-deploy and collect data again
| Loop | Focus typically on |
|---|---|
| 1st | Observation mismatch, action scaling |
| 2nd | Dynamics gap (friction, mass) |
| 3rd | Sensor noise, communication delay |
| 4th+ | Edge cases, long-horizon robustness |
Common Pitfalls
1. Observation Space Mismatch
Problem: Observations in sim and real have different format, scale, or coordinate frame.
# WRONG: sim uses world frame, real uses robot base frame
# Sim: object_pos = [1.5, 0.3, 0.8] (world frame)
# Real: object_pos = [0.5, 0.3, 0.1] (robot base frame)
# CORRECT: always use robot base frame in both sim and real
object_pos_base_frame = transform_to_base_frame(
object_pos_world, robot_base_pose
)
2. Action Delay
Problem: Sim typically has no delay, but real robots have 1-5 frames of delay from communication, motor response.
Solution: Add random action delay in sim training (covered in Step 4).
3. Sensor Calibration
Problem: Camera intrinsics/extrinsics, joint encoder offsets differ between sim and real.
Solution: Calibrate sensors on real robot before deploying. Use camera calibration for vision, joint homing for encoders.
Tools Comparison: Isaac Lab vs MuJoCo Playground
| Feature | Isaac Lab | MuJoCo Playground |
|---|---|---|
| GPU parallel envs | 10,000+ (PhysX 5) | 1,000+ (MJX/JAX) |
| Rendering | RTX ray-tracing | Basic OpenGL |
| DR support | Built-in YAML config | Manual Python code |
| ROS 2 bridge | Isaac ROS | ros2_mujoco (community) |
| Learning curve | Steep (Omniverse stack) | Medium (standalone) |
| Best for | Visual sim-to-real, large-scale RL | Contact-rich manipulation, research |
| Hardware req | NVIDIA GPU (RTX 3070+) | CPU or GPU (flexible) |
| Community | NVIDIA forums | Google DeepMind + open source |
Recommendation: Use Isaac Lab if you have NVIDIA GPU and need visual sim-to-real. Use MuJoCo if focused on manipulation research or need more flexibility.
Deployment Checklist
Before running the policy on real hardware, verify all items:
- Observation space matches between sim and real (units, frames, ordering)
- Action space matches (delta position? absolute? torque?)
- Normalization params consistent
- Safety limits configured (joint limits, velocity limits, force limits)
- Emergency stop tested
- Low-pass filter on actions
- Sim success rate > 85% across full DR range
- Communication latency measured and accounted for
- Sensor calibration done
Conclusion
Sim-to-real pipeline is not one-shot — it's an iterative process requiring careful engineering at each step. Observation/action space design often matters more than algorithm choice. Domain randomization is necessary but not sufficient — combine it with careful system identification and real-world iteration.
Related Articles
- Domain Randomization: Key to Sim-to-Real Transfer — Deep dive into domain randomization technique
- Sim-to-Real Transfer: Train simulation, run reality — Sim-to-real overview
- Digital Twins and ROS 2: Simulation in Manufacturing — Applications in industry
- RL for Bipedal Walking: From MuJoCo to Reality — Real-world case study of sim-to-real pipeline