manipulationrlcurriculum-learninglong-horizonmanipulation

Multi-Step Manipulation: Curriculum Learning cho Long-Horizon

Giải quyết manipulation dài hơi bằng RL — curriculum learning, hierarchical RL, skill chaining, và benchmark IKEA furniture assembly.

Nguyễn Anh Tuấn7 tháng 4, 202610 phút đọc
Multi-Step Manipulation: Curriculum Learning cho Long-Horizon

Hãy tưởng tượng bạn yêu cầu robot pha một cốc cà phê: mở tủ → lấy cốc → đặt lên bàn → mở máy pha → lấy viên cà phê → bỏ vào máy → nhấn nút → đợi → lấy cốc ra → đưa cho bạn. Đó là 10+ bước tuần tự, mỗi bước phải thành công thì bước sau mới có ý nghĩa. Đây là bài toán long-horizon manipulation — và nó là thách thức lớn nhất của RL trong robotics.

Đây là bài cuối cùng trong series RL for Manipulation. Sau khi đã đi qua fundamentals, grasping, force control, pick-and-place, carrying, contact-rich assembly, và tool use, chúng ta kết hợp tất cả để giải quyết bài toán tổng hợp nhất.

Tại sao Long-Horizon khó?

Thách thức Ảnh hưởng
Credit assignment Reward ở bước 10, nhưng lỗi ở bước 3 — policy không biết sửa gì
Exploration Xác suất hoàn thành 10 bước random = gần 0
Compounding error Sai 5% mỗi bước → 60% fail sau 10 bước
State space State tăng exponential (vị trí tất cả objects)
Reward sparsity Chỉ +1 khi hoàn thành toàn bộ chuỗi

Long horizon manipulation task

Curriculum Learning

Manual Curriculum

Cách đơn giản nhất: thiết kế curriculum thủ công, bắt đầu từ bước cuối và dần thêm bước:

class ManualCurriculum:
    """Backward curriculum — bắt đầu từ bước cuối."""
    
    def __init__(self, subtasks):
        """
        subtasks: danh sách subtasks theo thứ tự
        Ví dụ: ['open_cabinet', 'grasp_cup', 'place_cup', 
                'pour_water', 'serve']
        """
        self.subtasks = subtasks
        self.n_tasks = len(subtasks)
        self.current_start = self.n_tasks - 1  # Bắt đầu từ task cuối
        self.success_history = []
        self.success_threshold = 0.8
        self.window = 50
    
    def get_start_state(self):
        """Trả về state bắt đầu cho curriculum hiện tại."""
        # Tua nhanh đến bước current_start
        # Bỏ qua các bước trước đó bằng scripted policy
        return self.current_start
    
    def update(self, success):
        self.success_history.append(success)
        if len(self.success_history) > self.window:
            self.success_history.pop(0)
        
        if len(self.success_history) >= self.window:
            rate = sum(self.success_history) / len(self.success_history)
            if rate >= self.success_threshold and self.current_start > 0:
                self.current_start -= 1
                self.success_history.clear()
                print(f"Curriculum: now starting from step "
                      f"{self.current_start}: "
                      f"{self.subtasks[self.current_start]}")
    
    def is_complete(self):
        """Toàn bộ chuỗi đã master chưa?"""
        return (self.current_start == 0 and 
                sum(self.success_history) / max(len(self.success_history), 1) 
                >= self.success_threshold)

Automatic Curriculum: ALP-GMM

Thay vì thiết kế curriculum thủ công, ALP-GMM (Absolute Learning Progress - Gaussian Mixture Model) tự động chọn task difficulty dựa trên learning progress:

from sklearn.mixture import GaussianMixture
import numpy as np

class ALPGMM:
    """Automatic curriculum via Absolute Learning Progress."""
    
    def __init__(self, task_param_bounds, n_components=10):
        """
        task_param_bounds: [(min, max)] cho mỗi task parameter
        Ví dụ: [(0, 1)] cho difficulty, 
                [(1, 10)] cho number of subtasks
        """
        self.bounds = task_param_bounds
        self.n_dims = len(task_param_bounds)
        self.n_components = n_components
        
        # History: (task_params, competence_before, competence_after)
        self.history = []
        self.gmm = None
        self.min_samples = 50
    
    def sample_task(self):
        """Chọn task tiếp theo dựa trên learning progress."""
        if len(self.history) < self.min_samples:
            # Random sampling ban đầu
            params = []
            for low, high in self.bounds:
                params.append(np.random.uniform(low, high))
            return params
        
        # Fit GMM trên (task_params, learning_progress)
        X = np.array([h['params'] for h in self.history])
        LP = np.array([h['lp'] for h in self.history])
        
        # Absolute learning progress
        abs_lp = np.abs(LP)
        
        # Fit GMM
        XLP = np.column_stack([X, abs_lp])
        self.gmm = GaussianMixture(
            n_components=min(self.n_components, len(X) // 5),
            covariance_type='full'
        )
        self.gmm.fit(XLP)
        
        # Sample from regions with high learning progress
        # Weight components by their average LP
        component_lps = []
        for k in range(self.gmm.n_components):
            mask = self.gmm.predict(XLP) == k
            if mask.sum() > 0:
                component_lps.append(abs_lp[mask].mean())
            else:
                component_lps.append(0)
        
        # Sample from highest LP component
        best_k = np.argmax(component_lps)
        sample = self.gmm._estimate_weighted_log_prob(
            XLP[self.gmm.predict(XLP) == best_k]
        )
        
        # Simplified: sample near high-LP regions
        high_lp_idx = np.argsort(abs_lp)[-10:]
        center = X[np.random.choice(high_lp_idx)]
        noise = np.random.randn(self.n_dims) * 0.1
        params = np.clip(center + noise, 
                         [b[0] for b in self.bounds],
                         [b[1] for b in self.bounds])
        
        return params.tolist()
    
    def update(self, params, reward_before, reward_after):
        """Cập nhật sau mỗi training episode."""
        lp = reward_after - reward_before  # Learning Progress
        self.history.append({
            'params': params,
            'lp': lp,
            'reward': reward_after,
        })

Hierarchical RL: High-Level Planner + Low-Level Skills

Chia bài toán thành 2 cấp: high-level policy chọn skill nào thực hiện, low-level policies thực hiện từng skill.

class HierarchicalManipulation:
    """Two-level hierarchical RL cho multi-step manipulation."""
    
    def __init__(self):
        # Low-level skills (đã trained riêng)
        self.skills = {
            'reach': self.load_skill('reach_policy.zip'),
            'grasp': self.load_skill('grasp_policy.zip'),
            'lift': self.load_skill('lift_policy.zip'),
            'carry': self.load_skill('carry_policy.zip'),
            'place': self.load_skill('place_policy.zip'),
            'pour': self.load_skill('pour_policy.zip'),
            'open': self.load_skill('open_policy.zip'),
            'push': self.load_skill('push_policy.zip'),
        }
        self.skill_names = list(self.skills.keys())
        self.n_skills = len(self.skill_names)
    
    def load_skill(self, path):
        """Load pretrained skill policy."""
        from stable_baselines3 import SAC
        try:
            return SAC.load(path)
        except:
            return None  # Placeholder
    
    def get_skill_obs(self, env_obs, skill_name):
        """Trích xuất observation phù hợp cho skill."""
        # Mỗi skill cần observation khác nhau
        if skill_name in ['reach', 'grasp']:
            return env_obs[:20]  # Robot + target object
        elif skill_name in ['carry', 'pour']:
            return env_obs[:25]  # + object in hand
        else:
            return env_obs[:30]  # Full observation


class HighLevelPolicy:
    """High-level policy chọn skill và subgoal."""
    
    def __init__(self, n_skills, obs_dim, goal_dim):
        self.n_skills = n_skills
        
        # Observation: scene state + task progress
        # Action: skill_id (discrete) + subgoal (continuous)
        self.obs_space = gym.spaces.Box(-np.inf, np.inf, (obs_dim,))
        self.action_space = gym.spaces.Dict({
            'skill': gym.spaces.Discrete(n_skills),
            'subgoal': gym.spaces.Box(-1, 1, (goal_dim,)),
        })
    
    def select_skill(self, obs, task_plan=None):
        """Chọn skill tiếp theo."""
        if task_plan:
            # Nếu có plan, follow plan
            return task_plan.pop(0)
        
        # Learned selection
        # ... neural network forward pass ...
        skill_probs = self._compute_skill_probs(obs)
        skill_id = np.random.choice(self.n_skills, p=skill_probs)
        subgoal = self._compute_subgoal(obs, skill_id)
        
        return self.skill_names[skill_id], subgoal

Training Hierarchical System

from stable_baselines3 import SAC, PPO

def train_hierarchical(env, skills, n_iterations=1000):
    """Train hierarchical system end-to-end."""
    
    # Phase 1: Train low-level skills independently
    skill_envs = {
        'reach': ReachEnv(),
        'grasp': GraspEnv(),
        'lift': LiftEnv(),
        'carry': CarryEnv(),
        'place': PlaceEnv(),
    }
    
    trained_skills = {}
    for name, skill_env in skill_envs.items():
        model = SAC("MlpPolicy", skill_env, verbose=0)
        model.learn(total_timesteps=500_000)
        trained_skills[name] = model
        print(f"Skill '{name}' trained: "
              f"success rate = {evaluate(model, skill_env):.1%}")
    
    # Phase 2: Train high-level policy
    # High-level env: mỗi step = 1 skill execution
    high_level_env = HighLevelEnv(
        base_env=env,
        skills=trained_skills,
        max_skills_per_episode=15,
    )
    
    # PPO tốt cho discrete action (skill selection)
    high_level_model = PPO(
        "MlpPolicy",
        high_level_env,
        learning_rate=3e-4,
        n_steps=256,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        verbose=1,
    )
    
    high_level_model.learn(total_timesteps=500_000)
    
    # Phase 3: Fine-tune end-to-end (optional)
    # Unfreeze low-level skills, train everything together
    # with lower learning rate
    
    return high_level_model, trained_skills

Goal-Conditioned RL cho Subtask Chaining

Thay vì hierarchical RL, một approach khác là goal-conditioned policies — mỗi policy biết đến bất kỳ goal nào, và chúng ta chain goals lại:

class GoalConditionedChaining:
    """Chain goal-conditioned policies cho long-horizon tasks."""
    
    def __init__(self, policy, subtask_goals):
        """
        policy: single goal-conditioned policy
        subtask_goals: list of intermediate goals
        """
        self.policy = policy
        self.subtask_goals = subtask_goals
        self.current_goal_idx = 0
    
    def get_action(self, obs, achieved_goal):
        """Chọn action dựa trên goal hiện tại."""
        current_goal = self.subtask_goals[self.current_goal_idx]
        
        # Kiểm tra đã đạt goal hiện tại chưa
        if self._goal_reached(achieved_goal, current_goal):
            self.current_goal_idx += 1
            if self.current_goal_idx >= len(self.subtask_goals):
                return None, True  # Hoàn thành!
            current_goal = self.subtask_goals[self.current_goal_idx]
        
        # Concat goal vào observation
        goal_obs = np.concatenate([obs, current_goal])
        action, _ = self.policy.predict(goal_obs, deterministic=True)
        
        return action, False
    
    def _goal_reached(self, achieved, desired, threshold=0.05):
        return np.linalg.norm(achieved - desired) < threshold

Goal chaining visualization

Kitchen Task: Ví dụ End-to-End

Bài toán kinh điển: robot thực hiện chuỗi tác vụ trong bếp.

class KitchenTaskEnv:
    """Multi-step kitchen manipulation task."""
    
    SUBTASKS = [
        'open_cabinet',     # Mở tủ
        'grasp_cup',        # Lấy cốc
        'place_on_counter', # Đặt lên bàn
        'open_faucet',      # Mở vòi nước
        'fill_cup',         # Đổ nước
        'close_faucet',     # Đóng vòi
        'carry_to_table',   # Bê ra bàn
    ]
    
    def __init__(self):
        self.n_subtasks = len(self.SUBTASKS)
        self.current_subtask = 0
        self.subtask_completed = [False] * self.n_subtasks
        
    def compute_reward(self, state):
        """Reward dựa trên subtask progress."""
        
        # Check subtask completion
        subtask = self.SUBTASKS[self.current_subtask]
        
        completion_checks = {
            'open_cabinet': lambda s: s['cabinet_angle'] > 1.2,
            'grasp_cup': lambda s: s['cup_grasped'],
            'place_on_counter': lambda s: (s['cup_on_counter'] and 
                                            not s['cup_grasped']),
            'open_faucet': lambda s: s['faucet_on'],
            'fill_cup': lambda s: s['cup_fill'] > 0.8,
            'close_faucet': lambda s: not s['faucet_on'],
            'carry_to_table': lambda s: s['cup_on_table'],
        }
        
        if completion_checks[subtask](state):
            self.subtask_completed[self.current_subtask] = True
            self.current_subtask += 1
            
            if self.current_subtask >= self.n_subtasks:
                return 100.0, True  # All done!
            
            return 10.0, False  # Subtask bonus
        
        # Dense reward cho subtask hiện tại
        subtask_rewards = {
            'open_cabinet': lambda s: -np.tanh(3 * abs(1.2 - s['cabinet_angle'])),
            'grasp_cup': lambda s: 1 - np.tanh(5 * s['gripper_to_cup_dist']),
            'place_on_counter': lambda s: 1 - np.tanh(5 * s['cup_to_counter_dist']),
            'open_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
            'fill_cup': lambda s: s['cup_fill'],
            'close_faucet': lambda s: 1 - np.tanh(5 * s['gripper_to_faucet_dist']),
            'carry_to_table': lambda s: 1 - np.tanh(3 * s['cup_to_table_dist']),
        }
        
        return subtask_rewards[subtask](state), False

Benchmark: IKEA Furniture Assembly

IKEA Furniture Assembly là benchmark tiêu chuẩn cho long-horizon manipulation:

# IKEA Furniture Assembly benchmark results
benchmark_results = {
    'table_lack': {  # 4 legs + 1 top
        'flat_rl': {'success': 0.05, 'steps': 'N/A'},
        'curriculum': {'success': 0.32, 'steps': '~800'},
        'hierarchical': {'success': 0.58, 'steps': '~500'},
        'hierarchical_curriculum': {'success': 0.71, 'steps': '~400'},
    },
    'chair_simple': {  # 4 legs + seat + back
        'flat_rl': {'success': 0.01, 'steps': 'N/A'},
        'curriculum': {'success': 0.18, 'steps': '~1200'},
        'hierarchical': {'success': 0.42, 'steps': '~800'},
        'hierarchical_curriculum': {'success': 0.55, 'steps': '~650'},
    },
}
Method Table LACK Chair Simple Shelf
Flat RL (SAC) 5% 1% 0%
SAC + Curriculum 32% 18% 8%
Hierarchical RL 58% 42% 25%
Hier + Curriculum 71% 55% 38%
Hier + Curr + HER 78% 62% 45%

Reset-Free RL

Trong thực tế, robot không thể "reset" môi trường sau mỗi episode. Reset-free RL giải quyết bằng cách train policy "undo" (đưa về trạng thái ban đầu):

class ResetFreeTraining:
    """Training mà không cần reset environment."""
    
    def __init__(self, forward_env, max_episode_steps=500):
        self.env = forward_env
        self.max_steps = max_episode_steps
        
        # Train 2 policies:
        # 1. Forward policy: thực hiện task
        # 2. Reset policy: đưa về trạng thái ban đầu
        self.forward_policy = None
        self.reset_policy = None
    
    def train(self):
        """Alternating training giữa forward và reset."""
        
        for iteration in range(10000):
            # Forward episode
            obs = self.env.get_obs()
            for step in range(self.max_steps):
                action = self.forward_policy.predict(obs)
                obs, reward, done, info = self.env.step(action)
                self.forward_policy.store(obs, action, reward)
                
                if done:
                    break
            
            # Reset episode — đưa về initial state
            obs = self.env.get_obs()
            init_state = self.env.get_initial_state()
            for step in range(self.max_steps):
                # Goal = initial state
                goal_obs = np.concatenate([obs, init_state])
                action = self.reset_policy.predict(goal_obs)
                obs, _, _, _ = self.env.step(action)
                
                # Reward = closeness to initial state
                reset_reward = -np.linalg.norm(obs - init_state)
                self.reset_policy.store(goal_obs, action, reset_reward)
                
                if np.linalg.norm(obs - init_state) < 0.1:
                    break
            
            # Update both policies
            self.forward_policy.update()
            self.reset_policy.update()

Tổng kết Series

Qua 8 bài viết, chúng ta đã đi từ nền tảng đến frontier của RL cho manipulation:

Bài Chủ đề Key Takeaway
1 MDP & Reward State/action design quyết định 50% thành công
2 Grasping Curriculum + SAC = grasping state-of-the-art
3 Force Control RL + impedance control = best of both worlds
4 Pick-and-Place HER là chìa khóa cho sparse reward
5 Carrying Multi-objective reward = Pareto trade-offs
6 Contact-Rich Tactile + Domain Randomization cho sim-to-real
7 Tool Use Affordance + two-phase learning
8 Multi-Step Hierarchical + Curriculum cho long-horizon

Tài liệu tham khảo

  1. RELAY: Reinforcement Learning with Action Hierarchies — Gupta et al., 2020
  2. Composing Task-Agnostic Policies with Deep Reinforcement Learning — Lee et al., ICLR 2019
  3. Automatic Curriculum Learning for Deep RL: A Short Survey — Portelas et al., IJCAI 2020

Bài viết liên quan

NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWTutorial
Hướng dẫn GigaBrain-0: VLA + World Model + RL
vlaworld-modelreinforcement-learninggigabrainroboticsmanipulation

Hướng dẫn GigaBrain-0: VLA + World Model + RL

Hướng dẫn chi tiết huấn luyện VLA bằng World Model và Reinforcement Learning với framework RAMP từ GigaBrain — open-source, 3.5B params.

12/4/202611 phút đọc
NEWDeep Dive
WholebodyVLA Open-Source: Hướng Dẫn Kiến Trúc & Code
vlahumanoidloco-manipulationiclrrlopen-sourceisaac-lab

WholebodyVLA Open-Source: Hướng Dẫn Kiến Trúc & Code

Deep-dive vào codebase WholebodyVLA — kiến trúc latent action, LMO RL policy, và cách xây dựng pipeline whole-body loco-manipulation cho humanoid.

12/4/202619 phút đọc
NEWTutorial
Sim-to-Real cho Humanoid: Deployment Best Practices
sim2realhumanoiddeploymentrlunitreePhần 10

Sim-to-Real cho Humanoid: Deployment Best Practices

Pipeline hoàn chỉnh deploy RL locomotion policy lên robot humanoid thật — domain randomization, system identification, safety, và Unitree SDK.

9/4/202611 phút đọc