← Back to Blog
manipulationrlcurriculum-learninglong-horizonmanipulation

Multi-Step Manipulation: Curriculum Learning for Long-Horizon Tasks

Solving long-horizon manipulation with RL — curriculum learning, hierarchical RL, skill chaining, and IKEA furniture assembly benchmarks.

Nguyễn Anh Tuấn7 tháng 4, 20268 min read
Multi-Step Manipulation: Curriculum Learning for Long-Horizon Tasks

Imagine asking a robot to make a cup of coffee: open cabinet → grab cup → place on counter → open coffee machine → insert pod → press button → wait → retrieve cup → hand to you. That's 10+ sequential steps, each requiring success before the next is meaningful. This is the long-horizon manipulation problem — and it's the greatest challenge for RL in robotics.

This is the final post in the RL for Manipulation series. After covering fundamentals, grasping, force control, pick-and-place, carrying, contact-rich assembly, and tool use, we combine everything to tackle the most integrated problem.

Why is Long-Horizon Hard?

Challenge Impact
Credit assignment Reward at step 10, but the error was at step 3 — policy doesn't know what to fix
Exploration Probability of completing 10 random steps = near zero
Compounding error 5% error per step → 60% failure after 10 steps
State space Exponential growth (all object positions)
Reward sparsity Only +1 when the entire chain completes

Long horizon manipulation task

Curriculum Learning

Manual Curriculum

The simplest approach: design curriculum manually, starting from the last step and gradually adding earlier steps:

class ManualCurriculum:
    """Backward curriculum — start from the last step."""
    
    def __init__(self, subtasks):
        """
        subtasks: list of subtasks in order
        Example: ['open_cabinet', 'grasp_cup', 'place_cup', 
                'pour_water', 'serve']
        """
        self.subtasks = subtasks
        self.n_tasks = len(subtasks)
        self.current_start = self.n_tasks - 1  # Start from last task
        self.success_history = []
        self.success_threshold = 0.8
        self.window = 50
    
    def get_start_state(self):
        """Return starting state for current curriculum level."""
        # Fast-forward to current_start step
        # Skip earlier steps via scripted policy
        return self.current_start
    
    def update(self, success):
        self.success_history.append(success)
        if len(self.success_history) > self.window:
            self.success_history.pop(0)
        
        if len(self.success_history) >= self.window:
            rate = sum(self.success_history) / len(self.success_history)
            if rate >= self.success_threshold and self.current_start > 0:
                self.current_start -= 1
                self.success_history.clear()
                print(f"Curriculum: now starting from step "
                      f"{self.current_start}: "
                      f"{self.subtasks[self.current_start]}")
    
    def is_complete(self):
        """Has the full chain been mastered?"""
        return (self.current_start == 0 and 
                sum(self.success_history) / max(len(self.success_history), 1) 
                >= self.success_threshold)

Automatic Curriculum: ALP-GMM

Instead of manual curriculum design, ALP-GMM (Absolute Learning Progress — Gaussian Mixture Model) automatically selects task difficulty based on learning progress:

from sklearn.mixture import GaussianMixture
import numpy as np

class ALPGMM:
    """Automatic curriculum via Absolute Learning Progress."""
    
    def __init__(self, task_param_bounds, n_components=10):
        """
        task_param_bounds: [(min, max)] for each task parameter
        Example: [(0, 1)] for difficulty, 
                [(1, 10)] for number of subtasks
        """
        self.bounds = task_param_bounds
        self.n_dims = len(task_param_bounds)
        self.n_components = n_components
        
        # History: (task_params, competence_before, competence_after)
        self.history = []
        self.gmm = None
        self.min_samples = 50
    
    def sample_task(self):
        """Select next task based on learning progress."""
        if len(self.history) < self.min_samples:
            # Random sampling initially
            params = []
            for low, high in self.bounds:
                params.append(np.random.uniform(low, high))
            return params
        
        # Fit GMM on (task_params, learning_progress)
        X = np.array([h['params'] for h in self.history])
        LP = np.array([h['lp'] for h in self.history])
        
        # Absolute learning progress
        abs_lp = np.abs(LP)
        
        # Sample near high-LP regions
        high_lp_idx = np.argsort(abs_lp)[-10:]
        center = X[np.random.choice(high_lp_idx)]
        noise = np.random.randn(self.n_dims) * 0.1
        params = np.clip(center + noise, 
                         [b[0] for b in self.bounds],
                         [b[1] for b in self.bounds])
        
        return params.tolist()
    
    def update(self, params, reward_before, reward_after):
        """Update after each training episode."""
        lp = reward_after - reward_before  # Learning Progress
        self.history.append({
            'params': params,
            'lp': lp,
            'reward': reward_after,
        })

Hierarchical RL: High-Level Planner + Low-Level Skills

Split the problem into two levels: a high-level policy selects which skill to execute, and low-level policies execute each skill.

class HierarchicalManipulation:
    """Two-level hierarchical RL for multi-step manipulation."""
    
    def __init__(self):
        # Low-level skills (trained independently)
        self.skills = {
            'reach': self.load_skill('reach_policy.zip'),
            'grasp': self.load_skill('grasp_policy.zip'),
            'lift': self.load_skill('lift_policy.zip'),
            'carry': self.load_skill('carry_policy.zip'),
            'place': self.load_skill('place_policy.zip'),
            'pour': self.load_skill('pour_policy.zip'),
            'open': self.load_skill('open_policy.zip'),
            'push': self.load_skill('push_policy.zip'),
        }
        self.skill_names = list(self.skills.keys())
        self.n_skills = len(self.skill_names)
    
    def load_skill(self, path):
        """Load pretrained skill policy."""
        from stable_baselines3 import SAC
        try:
            return SAC.load(path)
        except:
            return None  # Placeholder

Training the Hierarchical System

from stable_baselines3 import SAC, PPO

def train_hierarchical(env, skills, n_iterations=1000):
    """Train hierarchical system end-to-end."""
    
    # Phase 1: Train low-level skills independently
    skill_envs = {
        'reach': ReachEnv(),
        'grasp': GraspEnv(),
        'lift': LiftEnv(),
        'carry': CarryEnv(),
        'place': PlaceEnv(),
    }
    
    trained_skills = {}
    for name, skill_env in skill_envs.items():
        model = SAC("MlpPolicy", skill_env, verbose=0)
        model.learn(total_timesteps=500_000)
        trained_skills[name] = model
        print(f"Skill '{name}' trained: "
              f"success rate = {evaluate(model, skill_env):.1%}")
    
    # Phase 2: Train high-level policy
    # High-level env: each step = 1 skill execution
    high_level_env = HighLevelEnv(
        base_env=env,
        skills=trained_skills,
        max_skills_per_episode=15,
    )
    
    # PPO works well for discrete actions (skill selection)
    high_level_model = PPO(
        "MlpPolicy",
        high_level_env,
        learning_rate=3e-4,
        n_steps=256,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        verbose=1,
    )
    
    high_level_model.learn(total_timesteps=500_000)
    
    return high_level_model, trained_skills

Goal-Conditioned RL for Subtask Chaining

Instead of hierarchical RL, another approach is goal-conditioned policies — a single policy that can reach any goal, chaining goals together:

class GoalConditionedChaining:
    """Chain goal-conditioned policies for long-horizon tasks."""
    
    def __init__(self, policy, subtask_goals):
        """
        policy: single goal-conditioned policy
        subtask_goals: list of intermediate goals
        """
        self.policy = policy
        self.subtask_goals = subtask_goals
        self.current_goal_idx = 0
    
    def get_action(self, obs, achieved_goal):
        """Select action based on current goal."""
        current_goal = self.subtask_goals[self.current_goal_idx]
        
        # Check if current goal is reached
        if self._goal_reached(achieved_goal, current_goal):
            self.current_goal_idx += 1
            if self.current_goal_idx >= len(self.subtask_goals):
                return None, True  # All done!
            current_goal = self.subtask_goals[self.current_goal_idx]
        
        # Concat goal into observation
        goal_obs = np.concatenate([obs, current_goal])
        action, _ = self.policy.predict(goal_obs, deterministic=True)
        
        return action, False
    
    def _goal_reached(self, achieved, desired, threshold=0.05):
        return np.linalg.norm(achieved - desired) < threshold

Goal chaining visualization

Kitchen Task: End-to-End Example

The classic benchmark: a robot performs a sequence of kitchen tasks.

class KitchenTaskEnv:
    """Multi-step kitchen manipulation task."""
    
    SUBTASKS = [
        'open_cabinet',     # Open cabinet
        'grasp_cup',        # Grab cup
        'place_on_counter', # Place on counter
        'open_faucet',      # Turn on faucet
        'fill_cup',         # Fill with water
        'close_faucet',     # Turn off faucet
        'carry_to_table',   # Carry to table
    ]
    
    def __init__(self):
        self.n_subtasks = len(self.SUBTASKS)
        self.current_subtask = 0
        self.subtask_completed = [False] * self.n_subtasks
        
    def compute_reward(self, state):
        """Reward based on subtask progress."""
        subtask = self.SUBTASKS[self.current_subtask]
        
        completion_checks = {
            'open_cabinet': lambda s: s['cabinet_angle'] > 1.2,
            'grasp_cup': lambda s: s['cup_grasped'],
            'place_on_counter': lambda s: (s['cup_on_counter'] and 
                                            not s['cup_grasped']),
            'open_faucet': lambda s: s['faucet_on'],
            'fill_cup': lambda s: s['cup_fill'] > 0.8,
            'close_faucet': lambda s: not s['faucet_on'],
            'carry_to_table': lambda s: s['cup_on_table'],
        }
        
        if completion_checks[subtask](state):
            self.subtask_completed[self.current_subtask] = True
            self.current_subtask += 1
            
            if self.current_subtask >= self.n_subtasks:
                return 100.0, True  # All done!
            
            return 10.0, False  # Subtask bonus
        
        # Dense reward for current subtask
        subtask_rewards = {
            'open_cabinet': lambda s: -np.tanh(3 * abs(1.2 - s['cabinet_angle'])),
            'grasp_cup': lambda s: 1 - np.tanh(5 * s['gripper_to_cup_dist']),
            'place_on_counter': lambda s: 1 - np.tanh(5 * s['cup_to_counter_dist']),
            'open_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
            'fill_cup': lambda s: s['cup_fill'],
            'close_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
            'carry_to_table': lambda s: 1 - np.tanh(3 * s['cup_to_table_dist']),
        }
        
        return subtask_rewards[subtask](state), False

Benchmark: IKEA Furniture Assembly

IKEA Furniture Assembly is the standard benchmark for long-horizon manipulation:

Method Table LACK Chair Simple Shelf
Flat RL (SAC) 5% 1% 0%
SAC + Curriculum 32% 18% 8%
Hierarchical RL 58% 42% 25%
Hier + Curriculum 71% 55% 38%
Hier + Curr + HER 78% 62% 45%

The results are clear: hierarchical structure + curriculum learning + HER is the winning combination for long-horizon tasks. Flat RL essentially fails beyond 3-4 steps.

Reset-Free RL

In practice, robots can't "reset" the environment after each episode. Reset-free RL solves this by training a paired "undo" policy that returns the environment to its initial state:

class ResetFreeTraining:
    """Training without environment resets."""
    
    def __init__(self, forward_env, max_episode_steps=500):
        self.env = forward_env
        self.max_steps = max_episode_steps
        
        # Train 2 policies:
        # 1. Forward policy: perform the task
        # 2. Reset policy: return to initial state
        self.forward_policy = None
        self.reset_policy = None
    
    def train(self):
        """Alternating training between forward and reset."""
        
        for iteration in range(10000):
            # Forward episode
            obs = self.env.get_obs()
            for step in range(self.max_steps):
                action = self.forward_policy.predict(obs)
                obs, reward, done, info = self.env.step(action)
                self.forward_policy.store(obs, action, reward)
                if done:
                    break
            
            # Reset episode — return to initial state
            obs = self.env.get_obs()
            init_state = self.env.get_initial_state()
            for step in range(self.max_steps):
                goal_obs = np.concatenate([obs, init_state])
                action = self.reset_policy.predict(goal_obs)
                obs, _, _, _ = self.env.step(action)
                
                reset_reward = -np.linalg.norm(obs - init_state)
                self.reset_policy.store(goal_obs, action, reset_reward)
                
                if np.linalg.norm(obs - init_state) < 0.1:
                    break
            
            # Update both policies
            self.forward_policy.update()
            self.reset_policy.update()

Series Summary

Across 8 posts, we've gone from fundamentals to the frontier of RL for manipulation:

Post Topic Key Takeaway
1 MDP & Reward State/action design determines 50% of success
2 Grasping Curriculum + SAC = grasping state-of-the-art
3 Force Control RL + impedance control = best of both worlds
4 Pick-and-Place HER is the key for sparse rewards
5 Carrying Multi-objective reward = Pareto trade-offs
6 Contact-Rich Tactile + Domain Randomization for sim-to-real
7 Tool Use Affordance + two-phase learning
8 Multi-Step (this post) Hierarchical + Curriculum for long-horizon

References

  1. RELAY: Reinforcement Learning with Action Hierarchies — Gupta et al., 2020
  2. Composing Task-Agnostic Policies with Deep Reinforcement Learning — Lee et al., ICLR 2019
  3. Automatic Curriculum Learning for Deep RL: A Short Survey — Portelas et al., IJCAI 2020

Related Posts

Related Posts

TutorialSim-to-Real cho Humanoid: Deployment Best Practices
sim2realhumanoiddeploymentrlunitreePart 10

Sim-to-Real cho Humanoid: Deployment Best Practices

Pipeline hoàn chỉnh deploy RL locomotion policy lên robot humanoid thật — domain randomization, system identification, safety, và Unitree SDK.

9/4/202611 min read
ResearchUnifoLM-VLA-0: Mô hình VLA cho Manipulation trên Unitree G1
vlaunitreeg1manipulationhumanoid

UnifoLM-VLA-0: Mô hình VLA cho Manipulation trên Unitree G1

Phân tích và hướng dẫn triển khai UnifoLM-VLA-0 — mô hình VLA open-source đầu tiên chạy trực tiếp trên G1 humanoid

8/4/202623 min read
ResearchWholeBodyVLA: VLA Toàn Thân cho Humanoid Loco-Manipulation
vlahumanoidloco-manipulationiclrrl

WholeBodyVLA: VLA Toàn Thân cho Humanoid Loco-Manipulation

ICLR 2026 — học manipulation từ egocentric video, kết hợp VLA + RL cho locomotion-aware control

7/4/202613 min read