Hãy tưởng tượng bạn yêu cầu robot pha một cốc cà phê: mở tủ → lấy cốc → đặt lên bàn → mở máy pha → lấy viên cà phê → bỏ vào máy → nhấn nút → đợi → lấy cốc ra → đưa cho bạn. Đó là 10+ bước tuần tự, mỗi bước phải thành công thì bước sau mới có ý nghĩa. Đây là bài toán long-horizon manipulation — và nó là thách thức lớn nhất của RL trong robotics.
Đây là bài cuối cùng trong series RL for Manipulation. Sau khi đã đi qua fundamentals, grasping, force control, pick-and-place, carrying, contact-rich assembly, và tool use, chúng ta kết hợp tất cả để giải quyết bài toán tổng hợp nhất.
Tại sao Long-Horizon khó?
| Thách thức | Ảnh hưởng |
|---|---|
| Credit assignment | Reward ở bước 10, nhưng lỗi ở bước 3 — policy không biết sửa gì |
| Exploration | Xác suất hoàn thành 10 bước random = gần 0 |
| Compounding error | Sai 5% mỗi bước → 60% fail sau 10 bước |
| State space | State tăng exponential (vị trí tất cả objects) |
| Reward sparsity | Chỉ +1 khi hoàn thành toàn bộ chuỗi |
Curriculum Learning
Manual Curriculum
Cách đơn giản nhất: thiết kế curriculum thủ công, bắt đầu từ bước cuối và dần thêm bước:
class ManualCurriculum:
"""Backward curriculum — bắt đầu từ bước cuối."""
def __init__(self, subtasks):
"""
subtasks: danh sách subtasks theo thứ tự
Ví dụ: ['open_cabinet', 'grasp_cup', 'place_cup',
'pour_water', 'serve']
"""
self.subtasks = subtasks
self.n_tasks = len(subtasks)
self.current_start = self.n_tasks - 1 # Bắt đầu từ task cuối
self.success_history = []
self.success_threshold = 0.8
self.window = 50
def get_start_state(self):
"""Trả về state bắt đầu cho curriculum hiện tại."""
# Tua nhanh đến bước current_start
# Bỏ qua các bước trước đó bằng scripted policy
return self.current_start
def update(self, success):
self.success_history.append(success)
if len(self.success_history) > self.window:
self.success_history.pop(0)
if len(self.success_history) >= self.window:
rate = sum(self.success_history) / len(self.success_history)
if rate >= self.success_threshold and self.current_start > 0:
self.current_start -= 1
self.success_history.clear()
print(f"Curriculum: now starting from step "
f"{self.current_start}: "
f"{self.subtasks[self.current_start]}")
def is_complete(self):
"""Toàn bộ chuỗi đã master chưa?"""
return (self.current_start == 0 and
sum(self.success_history) / max(len(self.success_history), 1)
>= self.success_threshold)
Automatic Curriculum: ALP-GMM
Thay vì thiết kế curriculum thủ công, ALP-GMM (Absolute Learning Progress - Gaussian Mixture Model) tự động chọn task difficulty dựa trên learning progress:
from sklearn.mixture import GaussianMixture
import numpy as np
class ALPGMM:
"""Automatic curriculum via Absolute Learning Progress."""
def __init__(self, task_param_bounds, n_components=10):
"""
task_param_bounds: [(min, max)] cho mỗi task parameter
Ví dụ: [(0, 1)] cho difficulty,
[(1, 10)] cho number of subtasks
"""
self.bounds = task_param_bounds
self.n_dims = len(task_param_bounds)
self.n_components = n_components
# History: (task_params, competence_before, competence_after)
self.history = []
self.gmm = None
self.min_samples = 50
def sample_task(self):
"""Chọn task tiếp theo dựa trên learning progress."""
if len(self.history) < self.min_samples:
# Random sampling ban đầu
params = []
for low, high in self.bounds:
params.append(np.random.uniform(low, high))
return params
# Fit GMM trên (task_params, learning_progress)
X = np.array([h['params'] for h in self.history])
LP = np.array([h['lp'] for h in self.history])
# Absolute learning progress
abs_lp = np.abs(LP)
# Fit GMM
XLP = np.column_stack([X, abs_lp])
self.gmm = GaussianMixture(
n_components=min(self.n_components, len(X) // 5),
covariance_type='full'
)
self.gmm.fit(XLP)
# Sample from regions with high learning progress
# Weight components by their average LP
component_lps = []
for k in range(self.gmm.n_components):
mask = self.gmm.predict(XLP) == k
if mask.sum() > 0:
component_lps.append(abs_lp[mask].mean())
else:
component_lps.append(0)
# Sample from highest LP component
best_k = np.argmax(component_lps)
sample = self.gmm._estimate_weighted_log_prob(
XLP[self.gmm.predict(XLP) == best_k]
)
# Simplified: sample near high-LP regions
high_lp_idx = np.argsort(abs_lp)[-10:]
center = X[np.random.choice(high_lp_idx)]
noise = np.random.randn(self.n_dims) * 0.1
params = np.clip(center + noise,
[b[0] for b in self.bounds],
[b[1] for b in self.bounds])
return params.tolist()
def update(self, params, reward_before, reward_after):
"""Cập nhật sau mỗi training episode."""
lp = reward_after - reward_before # Learning Progress
self.history.append({
'params': params,
'lp': lp,
'reward': reward_after,
})
Hierarchical RL: High-Level Planner + Low-Level Skills
Chia bài toán thành 2 cấp: high-level policy chọn skill nào thực hiện, low-level policies thực hiện từng skill.
class HierarchicalManipulation:
"""Two-level hierarchical RL cho multi-step manipulation."""
def __init__(self):
# Low-level skills (đã trained riêng)
self.skills = {
'reach': self.load_skill('reach_policy.zip'),
'grasp': self.load_skill('grasp_policy.zip'),
'lift': self.load_skill('lift_policy.zip'),
'carry': self.load_skill('carry_policy.zip'),
'place': self.load_skill('place_policy.zip'),
'pour': self.load_skill('pour_policy.zip'),
'open': self.load_skill('open_policy.zip'),
'push': self.load_skill('push_policy.zip'),
}
self.skill_names = list(self.skills.keys())
self.n_skills = len(self.skill_names)
def load_skill(self, path):
"""Load pretrained skill policy."""
from stable_baselines3 import SAC
try:
return SAC.load(path)
except:
return None # Placeholder
def get_skill_obs(self, env_obs, skill_name):
"""Trích xuất observation phù hợp cho skill."""
# Mỗi skill cần observation khác nhau
if skill_name in ['reach', 'grasp']:
return env_obs[:20] # Robot + target object
elif skill_name in ['carry', 'pour']:
return env_obs[:25] # + object in hand
else:
return env_obs[:30] # Full observation
class HighLevelPolicy:
"""High-level policy chọn skill và subgoal."""
def __init__(self, n_skills, obs_dim, goal_dim):
self.n_skills = n_skills
# Observation: scene state + task progress
# Action: skill_id (discrete) + subgoal (continuous)
self.obs_space = gym.spaces.Box(-np.inf, np.inf, (obs_dim,))
self.action_space = gym.spaces.Dict({
'skill': gym.spaces.Discrete(n_skills),
'subgoal': gym.spaces.Box(-1, 1, (goal_dim,)),
})
def select_skill(self, obs, task_plan=None):
"""Chọn skill tiếp theo."""
if task_plan:
# Nếu có plan, follow plan
return task_plan.pop(0)
# Learned selection
# ... neural network forward pass ...
skill_probs = self._compute_skill_probs(obs)
skill_id = np.random.choice(self.n_skills, p=skill_probs)
subgoal = self._compute_subgoal(obs, skill_id)
return self.skill_names[skill_id], subgoal
Training Hierarchical System
from stable_baselines3 import SAC, PPO
def train_hierarchical(env, skills, n_iterations=1000):
"""Train hierarchical system end-to-end."""
# Phase 1: Train low-level skills independently
skill_envs = {
'reach': ReachEnv(),
'grasp': GraspEnv(),
'lift': LiftEnv(),
'carry': CarryEnv(),
'place': PlaceEnv(),
}
trained_skills = {}
for name, skill_env in skill_envs.items():
model = SAC("MlpPolicy", skill_env, verbose=0)
model.learn(total_timesteps=500_000)
trained_skills[name] = model
print(f"Skill '{name}' trained: "
f"success rate = {evaluate(model, skill_env):.1%}")
# Phase 2: Train high-level policy
# High-level env: mỗi step = 1 skill execution
high_level_env = HighLevelEnv(
base_env=env,
skills=trained_skills,
max_skills_per_episode=15,
)
# PPO tốt cho discrete action (skill selection)
high_level_model = PPO(
"MlpPolicy",
high_level_env,
learning_rate=3e-4,
n_steps=256,
batch_size=64,
n_epochs=10,
gamma=0.99,
verbose=1,
)
high_level_model.learn(total_timesteps=500_000)
# Phase 3: Fine-tune end-to-end (optional)
# Unfreeze low-level skills, train everything together
# with lower learning rate
return high_level_model, trained_skills
Goal-Conditioned RL cho Subtask Chaining
Thay vì hierarchical RL, một approach khác là goal-conditioned policies — mỗi policy biết đến bất kỳ goal nào, và chúng ta chain goals lại:
class GoalConditionedChaining:
"""Chain goal-conditioned policies cho long-horizon tasks."""
def __init__(self, policy, subtask_goals):
"""
policy: single goal-conditioned policy
subtask_goals: list of intermediate goals
"""
self.policy = policy
self.subtask_goals = subtask_goals
self.current_goal_idx = 0
def get_action(self, obs, achieved_goal):
"""Chọn action dựa trên goal hiện tại."""
current_goal = self.subtask_goals[self.current_goal_idx]
# Kiểm tra đã đạt goal hiện tại chưa
if self._goal_reached(achieved_goal, current_goal):
self.current_goal_idx += 1
if self.current_goal_idx >= len(self.subtask_goals):
return None, True # Hoàn thành!
current_goal = self.subtask_goals[self.current_goal_idx]
# Concat goal vào observation
goal_obs = np.concatenate([obs, current_goal])
action, _ = self.policy.predict(goal_obs, deterministic=True)
return action, False
def _goal_reached(self, achieved, desired, threshold=0.05):
return np.linalg.norm(achieved - desired) < threshold
Kitchen Task: Ví dụ End-to-End
Bài toán kinh điển: robot thực hiện chuỗi tác vụ trong bếp.
class KitchenTaskEnv:
"""Multi-step kitchen manipulation task."""
SUBTASKS = [
'open_cabinet', # Mở tủ
'grasp_cup', # Lấy cốc
'place_on_counter', # Đặt lên bàn
'open_faucet', # Mở vòi nước
'fill_cup', # Đổ nước
'close_faucet', # Đóng vòi
'carry_to_table', # Bê ra bàn
]
def __init__(self):
self.n_subtasks = len(self.SUBTASKS)
self.current_subtask = 0
self.subtask_completed = [False] * self.n_subtasks
def compute_reward(self, state):
"""Reward dựa trên subtask progress."""
# Check subtask completion
subtask = self.SUBTASKS[self.current_subtask]
completion_checks = {
'open_cabinet': lambda s: s['cabinet_angle'] > 1.2,
'grasp_cup': lambda s: s['cup_grasped'],
'place_on_counter': lambda s: (s['cup_on_counter'] and
not s['cup_grasped']),
'open_faucet': lambda s: s['faucet_on'],
'fill_cup': lambda s: s['cup_fill'] > 0.8,
'close_faucet': lambda s: not s['faucet_on'],
'carry_to_table': lambda s: s['cup_on_table'],
}
if completion_checks[subtask](state):
self.subtask_completed[self.current_subtask] = True
self.current_subtask += 1
if self.current_subtask >= self.n_subtasks:
return 100.0, True # All done!
return 10.0, False # Subtask bonus
# Dense reward cho subtask hiện tại
subtask_rewards = {
'open_cabinet': lambda s: -np.tanh(3 * abs(1.2 - s['cabinet_angle'])),
'grasp_cup': lambda s: 1 - np.tanh(5 * s['gripper_to_cup_dist']),
'place_on_counter': lambda s: 1 - np.tanh(5 * s['cup_to_counter_dist']),
'open_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
'fill_cup': lambda s: s['cup_fill'],
'close_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
'carry_to_table': lambda s: 1 - np.tanh(3 * s['cup_to_table_dist']),
}
return subtask_rewards[subtask](state), False
Benchmark: IKEA Furniture Assembly
IKEA Furniture Assembly là benchmark tiêu chuẩn cho long-horizon manipulation:
# IKEA Furniture Assembly benchmark results
benchmark_results = {
'table_lack': { # 4 legs + 1 top
'flat_rl': {'success': 0.05, 'steps': 'N/A'},
'curriculum': {'success': 0.32, 'steps': '~800'},
'hierarchical': {'success': 0.58, 'steps': '~500'},
'hierarchical_curriculum': {'success': 0.71, 'steps': '~400'},
},
'chair_simple': { # 4 legs + seat + back
'flat_rl': {'success': 0.01, 'steps': 'N/A'},
'curriculum': {'success': 0.18, 'steps': '~1200'},
'hierarchical': {'success': 0.42, 'steps': '~800'},
'hierarchical_curriculum': {'success': 0.55, 'steps': '~650'},
},
}
| Method | Table LACK | Chair Simple | Shelf |
|---|---|---|---|
| Flat RL (SAC) | 5% | 1% | 0% |
| SAC + Curriculum | 32% | 18% | 8% |
| Hierarchical RL | 58% | 42% | 25% |
| Hier + Curriculum | 71% | 55% | 38% |
| Hier + Curr + HER | 78% | 62% | 45% |
Reset-Free RL
Trong thực tế, robot không thể "reset" môi trường sau mỗi episode. Reset-free RL giải quyết bằng cách train policy "undo" (đưa về trạng thái ban đầu):
class ResetFreeTraining:
"""Training mà không cần reset environment."""
def __init__(self, forward_env, max_episode_steps=500):
self.env = forward_env
self.max_steps = max_episode_steps
# Train 2 policies:
# 1. Forward policy: thực hiện task
# 2. Reset policy: đưa về trạng thái ban đầu
self.forward_policy = None
self.reset_policy = None
def train(self):
"""Alternating training giữa forward và reset."""
for iteration in range(10000):
# Forward episode
obs = self.env.get_obs()
for step in range(self.max_steps):
action = self.forward_policy.predict(obs)
obs, reward, done, info = self.env.step(action)
self.forward_policy.store(obs, action, reward)
if done:
break
# Reset episode — đưa về initial state
obs = self.env.get_obs()
init_state = self.env.get_initial_state()
for step in range(self.max_steps):
# Goal = initial state
goal_obs = np.concatenate([obs, init_state])
action = self.reset_policy.predict(goal_obs)
obs, _, _, _ = self.env.step(action)
# Reward = closeness to initial state
reset_reward = -np.linalg.norm(obs - init_state)
self.reset_policy.store(goal_obs, action, reset_reward)
if np.linalg.norm(obs - init_state) < 0.1:
break
# Update both policies
self.forward_policy.update()
self.reset_policy.update()
Tổng kết Series
Qua 8 bài viết, chúng ta đã đi từ nền tảng đến frontier của RL cho manipulation:
| Bài | Chủ đề | Key Takeaway |
|---|---|---|
| 1 | MDP & Reward | State/action design quyết định 50% thành công |
| 2 | Grasping | Curriculum + SAC = grasping state-of-the-art |
| 3 | Force Control | RL + impedance control = best of both worlds |
| 4 | Pick-and-Place | HER là chìa khóa cho sparse reward |
| 5 | Carrying | Multi-objective reward = Pareto trade-offs |
| 6 | Contact-Rich | Tactile + Domain Randomization cho sim-to-real |
| 7 | Tool Use | Affordance + two-phase learning |
| 8 | Multi-Step | Hierarchical + Curriculum cho long-horizon |
Tài liệu tham khảo
- RELAY: Reinforcement Learning with Action Hierarchies — Gupta et al., 2020
- Composing Task-Agnostic Policies with Deep Reinforcement Learning — Lee et al., ICLR 2019
- Automatic Curriculum Learning for Deep RL: A Short Survey — Portelas et al., IJCAI 2020