Imagine asking a robot to make a cup of coffee: open cabinet → grab cup → place on counter → open coffee machine → insert pod → press button → wait → retrieve cup → hand to you. That's 10+ sequential steps, each requiring success before the next is meaningful. This is the long-horizon manipulation problem — and it's the greatest challenge for RL in robotics.
This is the final post in the RL for Manipulation series. After covering fundamentals, grasping, force control, pick-and-place, carrying, contact-rich assembly, and tool use, we combine everything to tackle the most integrated problem.
Why is Long-Horizon Hard?
| Challenge | Impact |
|---|---|
| Credit assignment | Reward at step 10, but the error was at step 3 — policy doesn't know what to fix |
| Exploration | Probability of completing 10 random steps = near zero |
| Compounding error | 5% error per step → 60% failure after 10 steps |
| State space | Exponential growth (all object positions) |
| Reward sparsity | Only +1 when the entire chain completes |
Curriculum Learning
Manual Curriculum
The simplest approach: design curriculum manually, starting from the last step and gradually adding earlier steps:
class ManualCurriculum:
"""Backward curriculum — start from the last step."""
def __init__(self, subtasks):
"""
subtasks: list of subtasks in order
Example: ['open_cabinet', 'grasp_cup', 'place_cup',
'pour_water', 'serve']
"""
self.subtasks = subtasks
self.n_tasks = len(subtasks)
self.current_start = self.n_tasks - 1 # Start from last task
self.success_history = []
self.success_threshold = 0.8
self.window = 50
def get_start_state(self):
"""Return starting state for current curriculum level."""
# Fast-forward to current_start step
# Skip earlier steps via scripted policy
return self.current_start
def update(self, success):
self.success_history.append(success)
if len(self.success_history) > self.window:
self.success_history.pop(0)
if len(self.success_history) >= self.window:
rate = sum(self.success_history) / len(self.success_history)
if rate >= self.success_threshold and self.current_start > 0:
self.current_start -= 1
self.success_history.clear()
print(f"Curriculum: now starting from step "
f"{self.current_start}: "
f"{self.subtasks[self.current_start]}")
def is_complete(self):
"""Has the full chain been mastered?"""
return (self.current_start == 0 and
sum(self.success_history) / max(len(self.success_history), 1)
>= self.success_threshold)
Automatic Curriculum: ALP-GMM
Instead of manual curriculum design, ALP-GMM (Absolute Learning Progress — Gaussian Mixture Model) automatically selects task difficulty based on learning progress:
from sklearn.mixture import GaussianMixture
import numpy as np
class ALPGMM:
"""Automatic curriculum via Absolute Learning Progress."""
def __init__(self, task_param_bounds, n_components=10):
"""
task_param_bounds: [(min, max)] for each task parameter
Example: [(0, 1)] for difficulty,
[(1, 10)] for number of subtasks
"""
self.bounds = task_param_bounds
self.n_dims = len(task_param_bounds)
self.n_components = n_components
# History: (task_params, competence_before, competence_after)
self.history = []
self.gmm = None
self.min_samples = 50
def sample_task(self):
"""Select next task based on learning progress."""
if len(self.history) < self.min_samples:
# Random sampling initially
params = []
for low, high in self.bounds:
params.append(np.random.uniform(low, high))
return params
# Fit GMM on (task_params, learning_progress)
X = np.array([h['params'] for h in self.history])
LP = np.array([h['lp'] for h in self.history])
# Absolute learning progress
abs_lp = np.abs(LP)
# Sample near high-LP regions
high_lp_idx = np.argsort(abs_lp)[-10:]
center = X[np.random.choice(high_lp_idx)]
noise = np.random.randn(self.n_dims) * 0.1
params = np.clip(center + noise,
[b[0] for b in self.bounds],
[b[1] for b in self.bounds])
return params.tolist()
def update(self, params, reward_before, reward_after):
"""Update after each training episode."""
lp = reward_after - reward_before # Learning Progress
self.history.append({
'params': params,
'lp': lp,
'reward': reward_after,
})
Hierarchical RL: High-Level Planner + Low-Level Skills
Split the problem into two levels: a high-level policy selects which skill to execute, and low-level policies execute each skill.
class HierarchicalManipulation:
"""Two-level hierarchical RL for multi-step manipulation."""
def __init__(self):
# Low-level skills (trained independently)
self.skills = {
'reach': self.load_skill('reach_policy.zip'),
'grasp': self.load_skill('grasp_policy.zip'),
'lift': self.load_skill('lift_policy.zip'),
'carry': self.load_skill('carry_policy.zip'),
'place': self.load_skill('place_policy.zip'),
'pour': self.load_skill('pour_policy.zip'),
'open': self.load_skill('open_policy.zip'),
'push': self.load_skill('push_policy.zip'),
}
self.skill_names = list(self.skills.keys())
self.n_skills = len(self.skill_names)
def load_skill(self, path):
"""Load pretrained skill policy."""
from stable_baselines3 import SAC
try:
return SAC.load(path)
except:
return None # Placeholder
Training the Hierarchical System
from stable_baselines3 import SAC, PPO
def train_hierarchical(env, skills, n_iterations=1000):
"""Train hierarchical system end-to-end."""
# Phase 1: Train low-level skills independently
skill_envs = {
'reach': ReachEnv(),
'grasp': GraspEnv(),
'lift': LiftEnv(),
'carry': CarryEnv(),
'place': PlaceEnv(),
}
trained_skills = {}
for name, skill_env in skill_envs.items():
model = SAC("MlpPolicy", skill_env, verbose=0)
model.learn(total_timesteps=500_000)
trained_skills[name] = model
print(f"Skill '{name}' trained: "
f"success rate = {evaluate(model, skill_env):.1%}")
# Phase 2: Train high-level policy
# High-level env: each step = 1 skill execution
high_level_env = HighLevelEnv(
base_env=env,
skills=trained_skills,
max_skills_per_episode=15,
)
# PPO works well for discrete actions (skill selection)
high_level_model = PPO(
"MlpPolicy",
high_level_env,
learning_rate=3e-4,
n_steps=256,
batch_size=64,
n_epochs=10,
gamma=0.99,
verbose=1,
)
high_level_model.learn(total_timesteps=500_000)
return high_level_model, trained_skills
Goal-Conditioned RL for Subtask Chaining
Instead of hierarchical RL, another approach is goal-conditioned policies — a single policy that can reach any goal, chaining goals together:
class GoalConditionedChaining:
"""Chain goal-conditioned policies for long-horizon tasks."""
def __init__(self, policy, subtask_goals):
"""
policy: single goal-conditioned policy
subtask_goals: list of intermediate goals
"""
self.policy = policy
self.subtask_goals = subtask_goals
self.current_goal_idx = 0
def get_action(self, obs, achieved_goal):
"""Select action based on current goal."""
current_goal = self.subtask_goals[self.current_goal_idx]
# Check if current goal is reached
if self._goal_reached(achieved_goal, current_goal):
self.current_goal_idx += 1
if self.current_goal_idx >= len(self.subtask_goals):
return None, True # All done!
current_goal = self.subtask_goals[self.current_goal_idx]
# Concat goal into observation
goal_obs = np.concatenate([obs, current_goal])
action, _ = self.policy.predict(goal_obs, deterministic=True)
return action, False
def _goal_reached(self, achieved, desired, threshold=0.05):
return np.linalg.norm(achieved - desired) < threshold
Kitchen Task: End-to-End Example
The classic benchmark: a robot performs a sequence of kitchen tasks.
class KitchenTaskEnv:
"""Multi-step kitchen manipulation task."""
SUBTASKS = [
'open_cabinet', # Open cabinet
'grasp_cup', # Grab cup
'place_on_counter', # Place on counter
'open_faucet', # Turn on faucet
'fill_cup', # Fill with water
'close_faucet', # Turn off faucet
'carry_to_table', # Carry to table
]
def __init__(self):
self.n_subtasks = len(self.SUBTASKS)
self.current_subtask = 0
self.subtask_completed = [False] * self.n_subtasks
def compute_reward(self, state):
"""Reward based on subtask progress."""
subtask = self.SUBTASKS[self.current_subtask]
completion_checks = {
'open_cabinet': lambda s: s['cabinet_angle'] > 1.2,
'grasp_cup': lambda s: s['cup_grasped'],
'place_on_counter': lambda s: (s['cup_on_counter'] and
not s['cup_grasped']),
'open_faucet': lambda s: s['faucet_on'],
'fill_cup': lambda s: s['cup_fill'] > 0.8,
'close_faucet': lambda s: not s['faucet_on'],
'carry_to_table': lambda s: s['cup_on_table'],
}
if completion_checks[subtask](state):
self.subtask_completed[self.current_subtask] = True
self.current_subtask += 1
if self.current_subtask >= self.n_subtasks:
return 100.0, True # All done!
return 10.0, False # Subtask bonus
# Dense reward for current subtask
subtask_rewards = {
'open_cabinet': lambda s: -np.tanh(3 * abs(1.2 - s['cabinet_angle'])),
'grasp_cup': lambda s: 1 - np.tanh(5 * s['gripper_to_cup_dist']),
'place_on_counter': lambda s: 1 - np.tanh(5 * s['cup_to_counter_dist']),
'open_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
'fill_cup': lambda s: s['cup_fill'],
'close_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
'carry_to_table': lambda s: 1 - np.tanh(3 * s['cup_to_table_dist']),
}
return subtask_rewards[subtask](state), False
Benchmark: IKEA Furniture Assembly
IKEA Furniture Assembly is the standard benchmark for long-horizon manipulation:
| Method | Table LACK | Chair Simple | Shelf |
|---|---|---|---|
| Flat RL (SAC) | 5% | 1% | 0% |
| SAC + Curriculum | 32% | 18% | 8% |
| Hierarchical RL | 58% | 42% | 25% |
| Hier + Curriculum | 71% | 55% | 38% |
| Hier + Curr + HER | 78% | 62% | 45% |
The results are clear: hierarchical structure + curriculum learning + HER is the winning combination for long-horizon tasks. Flat RL essentially fails beyond 3-4 steps.
Reset-Free RL
In practice, robots can't "reset" the environment after each episode. Reset-free RL solves this by training a paired "undo" policy that returns the environment to its initial state:
class ResetFreeTraining:
"""Training without environment resets."""
def __init__(self, forward_env, max_episode_steps=500):
self.env = forward_env
self.max_steps = max_episode_steps
# Train 2 policies:
# 1. Forward policy: perform the task
# 2. Reset policy: return to initial state
self.forward_policy = None
self.reset_policy = None
def train(self):
"""Alternating training between forward and reset."""
for iteration in range(10000):
# Forward episode
obs = self.env.get_obs()
for step in range(self.max_steps):
action = self.forward_policy.predict(obs)
obs, reward, done, info = self.env.step(action)
self.forward_policy.store(obs, action, reward)
if done:
break
# Reset episode — return to initial state
obs = self.env.get_obs()
init_state = self.env.get_initial_state()
for step in range(self.max_steps):
goal_obs = np.concatenate([obs, init_state])
action = self.reset_policy.predict(goal_obs)
obs, _, _, _ = self.env.step(action)
reset_reward = -np.linalg.norm(obs - init_state)
self.reset_policy.store(goal_obs, action, reset_reward)
if np.linalg.norm(obs - init_state) < 0.1:
break
# Update both policies
self.forward_policy.update()
self.reset_policy.update()
Series Summary
Across 8 posts, we've gone from fundamentals to the frontier of RL for manipulation:
| Post | Topic | Key Takeaway |
|---|---|---|
| 1 | MDP & Reward | State/action design determines 50% of success |
| 2 | Grasping | Curriculum + SAC = grasping state-of-the-art |
| 3 | Force Control | RL + impedance control = best of both worlds |
| 4 | Pick-and-Place | HER is the key for sparse rewards |
| 5 | Carrying | Multi-objective reward = Pareto trade-offs |
| 6 | Contact-Rich | Tactile + Domain Randomization for sim-to-real |
| 7 | Tool Use | Affordance + two-phase learning |
| 8 | Multi-Step (this post) | Hierarchical + Curriculum for long-horizon |
References
- RELAY: Reinforcement Learning with Action Hierarchies — Gupta et al., 2020
- Composing Task-Agnostic Policies with Deep Reinforcement Learning — Lee et al., ICLR 2019
- Automatic Curriculum Learning for Deep RL: A Short Survey — Portelas et al., IJCAI 2020