Một walking policy chỉ hoạt động trên mặt phẳng thì chưa đủ cho thế giới thực. Robot cần đi trên gạch lát không bằng phẳng, leo cầu thang, xuống dốc, và vượt qua chướng ngại vật. Trong bài này, chúng ta sẽ mở rộng policy từ bài G1 basic training lên terrain phức tạp với ba kỹ thuật quan trọng: terrain curriculum, domain randomization, và teacher-student training.
Terrain Curriculum: Từ phẳng đến phức tạp
Tại sao cần curriculum?
Nếu bắt đầu training ngay trên terrain khó (cầu thang, đá gồ ghề), policy sẽ không bao giờ học được — robot ngã ngay lập tức và không có gradient hữu ích. Curriculum learning giải quyết vấn đề này bằng cách tăng dần độ khó.
Thiết kế terrain levels
"""
Terrain curriculum configuration cho G1.
File: g1_terrain_cfg.py
"""
from omni.isaac.lab.terrains import TerrainImporterCfg, TerrainGeneratorCfg
from omni.isaac.lab.terrains.height_field import HfTerrainBaseCfg
class G1TerrainCurriculumCfg:
"""4 cấp độ terrain, tăng dần."""
terrain = TerrainImporterCfg(
prim_path="/World/ground",
terrain_type="generator",
terrain_generator=TerrainGeneratorCfg(
size=(8.0, 8.0),
border_width=20.0,
num_rows=10, # 10 hàng terrain
num_cols=20, # 20 cột (mỗi cột = 1 terrain instance)
horizontal_scale=0.1,
vertical_scale=0.005,
slope_threshold=0.75,
sub_terrains={
# Level 0: Flat (25%)
"flat": HfTerrainBaseCfg(
function="flat_terrain",
proportion=0.25,
),
# Level 1: Rough ground (25%)
"rough": HfTerrainBaseCfg(
function="random_uniform_terrain",
proportion=0.25,
noise_range=(0.01, 0.06), # 1-6cm bumps
noise_step=0.005,
),
# Level 2: Slopes (25%)
"slopes": HfTerrainBaseCfg(
function="pyramid_sloped_terrain",
proportion=0.25,
slope_range=(0.0, 0.3), # 0-17 degrees
),
# Level 3: Stairs (25%)
"stairs": HfTerrainBaseCfg(
function="pyramid_stairs_terrain",
proportion=0.25,
step_height_range=(0.05, 0.18), # 5-18cm steps
step_width=0.31,
),
},
),
)
Curriculum progression logic
class TerrainCurriculumManager:
"""
Quản lý terrain curriculum dựa trên performance.
Robot tốt → terrain khó hơn. Robot tệ → terrain dễ hơn.
"""
def __init__(self, num_envs, num_levels=4, num_terrains_per_level=5):
self.num_envs = num_envs
self.num_levels = num_levels
self.num_terrains_per_level = num_terrains_per_level
# Mỗi env bắt đầu ở level 0
self.terrain_levels = torch.zeros(num_envs, dtype=torch.long)
self.terrain_types = torch.randint(
0, num_terrains_per_level, (num_envs,)
)
# Performance tracking
self.success_threshold = 0.8 # 80% success → level up
self.failure_threshold = 0.3 # <30% success → level down
self.episode_successes = torch.zeros(num_envs)
self.episode_counts = torch.zeros(num_envs)
def update(self, env_ids, episode_length, max_episode_length):
"""
Cập nhật level sau mỗi episode reset.
episode_length: bao lâu robot sống được
max_episode_length: episode max length
"""
# Success = sống > 80% episode
success = (episode_length[env_ids] / max_episode_length) > 0.8
self.episode_successes[env_ids] += success.float()
self.episode_counts[env_ids] += 1
# Check upgrade/downgrade mỗi 50 episodes
ready = self.episode_counts[env_ids] >= 50
if ready.any():
ready_ids = env_ids[ready]
success_rate = (
self.episode_successes[ready_ids]
/ self.episode_counts[ready_ids]
)
# Level up
upgrade = success_rate > self.success_threshold
self.terrain_levels[ready_ids[upgrade]] = torch.clamp(
self.terrain_levels[ready_ids[upgrade]] + 1,
max=self.num_levels - 1,
)
# Level down
downgrade = success_rate < self.failure_threshold
self.terrain_levels[ready_ids[downgrade]] = torch.clamp(
self.terrain_levels[ready_ids[downgrade]] - 1,
min=0,
)
# Reset counters
self.episode_successes[ready_ids] = 0
self.episode_counts[ready_ids] = 0
# Log
avg_level = self.terrain_levels.float().mean().item()
print(f"Terrain curriculum: avg level = {avg_level:.2f}")
return self.terrain_levels[env_ids], self.terrain_types[env_ids]
Domain Randomization: Làm policy robust
Domain randomization là kỹ thuật randomize các parameters vật lý trong simulation để policy học cách xử lý uncertainty. Điều này critical cho sim-to-real transfer.
Các parameters cần randomize
class G1DomainRandomizationCfg:
"""Domain randomization cho G1 terrain walking."""
# === Physics randomization ===
friction = {
"range": [0.3, 2.0], # Friction coefficient
"operation": "scale", # Nhân với giá trị gốc
"distribution": "uniform",
}
restitution = {
"range": [0.0, 0.5],
"operation": "abs",
"distribution": "uniform",
}
# === Mass randomization ===
added_mass = {
"range": [-2.0, 5.0], # kg thêm/bớt vào torso
"body_names": ["torso"],
"operation": "add",
}
# === Center of mass shift ===
com_displacement = {
"range": [-0.05, 0.05], # ±5cm
"body_names": ["torso"],
}
# === Motor strength ===
motor_strength = {
"range": [0.8, 1.2], # ±20% motor strength
"operation": "scale",
}
# === External perturbations ===
push_robots = {
"toggle": True,
"interval_s": 8.0, # Push mỗi 8 giây
"max_vel": 0.5, # Max push velocity (m/s)
"max_ang_vel": 0.3, # Max angular push (rad/s)
}
# === Sensor noise ===
joint_pos_noise = {
"range": [-0.01, 0.01], # ±0.01 rad
"operation": "add",
}
joint_vel_noise = {
"range": [-0.1, 0.1], # ±0.1 rad/s
"operation": "add",
}
imu_noise = {
"ang_vel_noise": 0.05, # rad/s
"gravity_noise": 0.02, # normalized
}
# === Latency simulation ===
action_delay = {
"range": [0, 2], # 0-2 timestep delay
"distribution": "uniform_int",
}
Implementation trong Isaac Lab
import torch
import numpy as np
class DomainRandomizer:
"""Apply domain randomization tại mỗi episode reset."""
def __init__(self, cfg, num_envs, device="cuda"):
self.cfg = cfg
self.num_envs = num_envs
self.device = device
# Pre-allocate buffers
self.friction_coeffs = torch.ones(num_envs, device=device)
self.mass_offsets = torch.zeros(num_envs, device=device)
self.motor_strengths = torch.ones(num_envs, device=device)
self.action_delays = torch.zeros(num_envs, dtype=torch.long, device=device)
# Action delay buffer (circular)
self.max_delay = 3
self.action_buffer = torch.zeros(
num_envs, self.max_delay, 23, # 23 DOF for G1
device=device
)
def randomize(self, env_ids):
"""Randomize parameters cho các envs được reset."""
n = len(env_ids)
# Friction
self.friction_coeffs[env_ids] = torch.empty(n, device=self.device).uniform_(
self.cfg.friction["range"][0],
self.cfg.friction["range"][1],
)
# Mass
self.mass_offsets[env_ids] = torch.empty(n, device=self.device).uniform_(
self.cfg.added_mass["range"][0],
self.cfg.added_mass["range"][1],
)
# Motor strength
self.motor_strengths[env_ids] = torch.empty(n, device=self.device).uniform_(
self.cfg.motor_strength["range"][0],
self.cfg.motor_strength["range"][1],
)
# Action delay
self.action_delays[env_ids] = torch.randint(
self.cfg.action_delay["range"][0],
self.cfg.action_delay["range"][1] + 1,
(n,), device=self.device,
)
def apply_action_delay(self, actions, step):
"""Áp dụng action delay bằng circular buffer."""
# Store current action
idx = step % self.max_delay
self.action_buffer[:, idx, :] = actions
# Retrieve delayed action
delayed_idx = (step - self.action_delays) % self.max_delay
delayed_actions = self.action_buffer[
torch.arange(self.num_envs, device=self.device),
delayed_idx,
]
return delayed_actions
def apply_push(self, base_velocities, step, dt):
"""Áp dụng random push perturbation."""
push_interval = int(self.cfg.push_robots["interval_s"] / dt)
if step % push_interval == 0:
push_vel = torch.empty_like(base_velocities[:, :3]).uniform_(
-self.cfg.push_robots["max_vel"],
self.cfg.push_robots["max_vel"],
)
base_velocities[:, :3] += push_vel
return base_velocities
Teacher-Student Training
Tại sao cần teacher-student?
Trên terrain phức tạp, robot cần biết terrain phía trước (height map) để bước đúng. Nhưng trên robot thật, thông tin này không có sẵn (cần depth camera + processing). Giải pháp: teacher-student framework.
| Teacher | Student | |
|---|---|---|
| Observations | Proprioception + terrain height map | Proprioception only |
| Training | Train trước với privileged info | Distill từ teacher |
| Deploy | Không deploy | Deploy lên robot thật |
class TeacherStudentTrainer:
"""
Two-phase training: Teacher (privileged) → Student (deployable).
"""
def __init__(self, env, teacher_cfg, student_cfg):
self.env = env
self.teacher = self._build_network(teacher_cfg)
self.student = self._build_network(student_cfg)
def _build_network(self, cfg):
"""Build actor-critic network."""
import torch.nn as nn
class PolicyNetwork(nn.Module):
def __init__(self, obs_dim, act_dim, hidden_dims):
super().__init__()
layers = []
prev_dim = obs_dim
for h_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, h_dim),
nn.ELU(),
])
prev_dim = h_dim
layers.append(nn.Linear(prev_dim, act_dim))
self.network = nn.Sequential(*layers)
def forward(self, obs):
return self.network(obs)
return PolicyNetwork(
cfg["obs_dim"], cfg["act_dim"], cfg["hidden_dims"]
)
# Phase 1: Train teacher với privileged observations
def train_teacher(self, num_iterations=5000):
"""
Teacher nhận: proprioception (78) + height map (187) = 265 dims
Height map: 11x17 grid, 10cm resolution, centered at robot
"""
print("=== Phase 1: Training Teacher ===")
for iteration in range(num_iterations):
obs = self.env.get_observations()
privileged_obs = self.env.get_privileged_observations()
# Teacher observation = [proprioception, height_samples]
teacher_obs = torch.cat([obs, privileged_obs], dim=1)
actions = self.teacher(teacher_obs)
# PPO update...
# (standard PPO training loop)
if iteration % 100 == 0:
print(f"Teacher iter {iteration}: "
f"reward={self.env.episode_reward_mean:.2f}")
# Phase 2: Distill teacher → student
def distill_to_student(self, num_iterations=2000):
"""
Student chỉ nhận: proprioception (78 dims)
Objective: minimize ||student_action - teacher_action||^2
"""
print("=== Phase 2: Distilling to Student ===")
optimizer = torch.optim.Adam(self.student.parameters(), lr=3e-4)
for iteration in range(num_iterations):
obs = self.env.get_observations()
privileged_obs = self.env.get_privileged_observations()
# Teacher action (frozen, no grad)
teacher_obs = torch.cat([obs, privileged_obs], dim=1)
with torch.no_grad():
teacher_actions = self.teacher(teacher_obs)
# Student tries to match teacher
student_actions = self.student(obs)
loss = torch.nn.functional.mse_loss(
student_actions, teacher_actions
)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if iteration % 100 == 0:
print(f"Student iter {iteration}: distill_loss={loss:.4f}")
print("Distillation complete!")
Height map sampling
def sample_height_map(terrain, robot_pos, robot_yaw):
"""
Sample terrain heights xung quanh robot.
Returns: (num_envs, num_points) height samples
"""
# Grid pattern: 11 points forward, 17 points lateral
# Resolution: 0.1m
x_range = torch.arange(-0.3, 0.8, 0.1) # -0.3m behind → 0.7m ahead
y_range = torch.arange(-0.8, 0.9, 0.1) # ±0.8m lateral
# Create grid
xx, yy = torch.meshgrid(x_range, y_range, indexing='ij')
points = torch.stack([xx.flatten(), yy.flatten()], dim=1)
# → (187, 2) grid points
# Rotate by robot yaw
cos_yaw = torch.cos(robot_yaw).unsqueeze(1)
sin_yaw = torch.sin(robot_yaw).unsqueeze(1)
rotated_x = points[:, 0] * cos_yaw - points[:, 1] * sin_yaw
rotated_y = points[:, 0] * sin_yaw + points[:, 1] * cos_yaw
# Translate to robot position
sample_x = robot_pos[:, 0:1] + rotated_x
sample_y = robot_pos[:, 1:2] + rotated_y
# Query terrain height
heights = terrain.sample_heights(sample_x, sample_y)
# Return relative to robot base height
return heights - robot_pos[:, 2:3]
Evaluation trên các loại terrain
def evaluate_terrain_robustness(env, policy, num_episodes=100):
"""Đánh giá policy trên từng loại terrain riêng biệt."""
terrain_types = ["flat", "rough_easy", "rough_hard", "slopes_10deg",
"slopes_20deg", "stairs_10cm", "stairs_15cm"]
results = {}
for terrain in terrain_types:
env.set_terrain(terrain)
successes = 0
total_distance = 0
for ep in range(num_episodes):
obs = env.reset()
ep_distance = 0
done = False
while not done:
action = policy(obs)
obs, reward, done, info = env.step(action)
ep_distance += info["forward_distance"]
if info["episode_length"] >= 0.9 * env.max_episode_length:
successes += 1
total_distance += ep_distance
results[terrain] = {
"success_rate": successes / num_episodes,
"avg_distance": total_distance / num_episodes,
}
# Print table
print(f"{'Terrain':<20} {'Success%':>10} {'Avg Dist (m)':>12}")
print("-" * 44)
for terrain, r in results.items():
print(f"{terrain:<20} {r['success_rate']*100:>9.1f}% {r['avg_distance']:>11.2f}")
return results
Kết quả điển hình sau training:
| Terrain | Success Rate | Avg Distance |
|---|---|---|
| Flat | 98% | 18.5 m |
| Rough (easy) | 95% | 17.2 m |
| Rough (hard) | 82% | 14.8 m |
| Slopes 10° | 90% | 16.1 m |
| Slopes 20° | 72% | 12.3 m |
| Stairs 10cm | 78% | 13.5 m |
| Stairs 15cm | 55% | 9.8 m |
Để hiểu thêm về domain randomization chi tiết, xem bài Domain Randomization: Chiến lược cho Sim-to-Real. Về curriculum learning cho locomotion, xem Locomotion cơ bản.
Tổng kết
Trong bài này, chúng ta đã mở rộng walking policy cho G1 với:
- Terrain curriculum: 4 cấp độ (flat → rough → slopes → stairs), tự động điều chỉnh
- Domain randomization: friction, mass, motor strength, push perturbation, sensor noise, action delay
- Teacher-student: teacher học với height map (privileged), student distill về proprioception only
- Evaluation: đánh giá trên 7 loại terrain, success rate 55-98%
Bài tiếp theo — Unitree H1: Full-size Humanoid Locomotion — sẽ chuyển sang robot lớn hơn: H1 với 1.8m chiều cao và những thách thức riêng.
Tài liệu tham khảo
- Learning to Walk in Minutes Using Massively Parallel Deep Reinforcement Learning — Rudin et al., CoRL 2022
- Extreme Parkour with Legged Robots — Cheng et al., ICRA 2024
- Rapid Locomotion via Reinforcement Learning — Bellegarda & Ijspeert, RSS 2022
- Humanoid-Gym: Zero-Shot Sim-to-Real Transfer — Gu et al., 2024