← Back to Blog
ailerobotlong-horizonmulti-steptask-planning

Long-Horizon Tasks: Chaining Actions for Multi-Step Manipulation

Building policies for long-horizon tasks — hierarchical planning, curriculum learning, and failure recovery for complex action sequences.

Nguyễn Anh Tuấn24 tháng 3, 20268 min read
Long-Horizon Tasks: Chaining Actions for Multi-Step Manipulation

Introduction: When One Action Isn't Enough

In previous posts, we handled single pick-and-place (post 3) and multi-object sorting (post 4). But think about real-world tasks: making coffee (open lid, pour grounds, add water, stir), clearing a table (collect dishes, load dishwasher, wipe table), or assembly (grab screw, align, tighten).

These are all long-horizon tasks — action sequences spanning hundreds or even thousands of steps, requiring the robot to plan, execute sequentially, and handle mid-execution failures. This is the hardest problem in current manipulation research.

Long-horizon task planning

Why Is Long-Horizon Hard?

Challenge Impact Example
Error accumulation Small errors compound into large failures 1mm misalignment in grasping topples entire stack
Credit assignment Hard to identify which step caused failure Failure at step 50, error from step 12
Exponential search Action combinations grow exponentially 10 steps x 100 actions = 100^10
Partial observability Can't see everything Object occluded after rotation
Recovery Must know how to recover from errors Drop object, pick up again, continue

Approach 1: Hierarchical Policies

The idea: divide large tasks into sub-tasks, each with its own policy, and a high-level planner decides the order.

Designing the Hierarchical System

import numpy as np
import torch
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Dict

class SubTask(Enum):
    """Sub-tasks for "Make Sandwich" task."""
    PICK_BREAD = "pick_bread"
    PLACE_BREAD = "place_bread"
    PICK_LETTUCE = "pick_lettuce"
    PLACE_LETTUCE = "place_lettuce"
    SPREAD_BUTTER = "spread_butter"
    CLOSE_SANDWICH = "close_sandwich"


@dataclass
class TaskPlan:
    """Task execution plan."""
    subtasks: List[SubTask]
    current_step: int = 0
    completed: List[bool] = None
    
    def __post_init__(self):
        if self.completed is None:
            self.completed = [False] * len(self.subtasks)
    
    @property
    def current_subtask(self) -> Optional[SubTask]:
        if self.current_step < len(self.subtasks):
            return self.subtasks[self.current_step]
        return None
    
    @property
    def is_done(self) -> bool:
        return self.current_step >= len(self.subtasks)
    
    def advance(self):
        """Move to next sub-task."""
        self.completed[self.current_step] = True
        self.current_step += 1


class HierarchicalPolicy:
    """Hierarchical policy for long-horizon tasks.
    
    Two levels:
    - High-level: Task planner — decides which sub-task is next
    - Low-level: Sub-task policies — executes each sub-task
    """
    
    def __init__(self, subtask_policies: Dict[SubTask, any]):
        self.subtask_policies = subtask_policies
        self.current_plan = None
        self.subtask_detector = SubTaskCompletionDetector()
    
    def plan(self, task_description: str, observation: dict) -> TaskPlan:
        """High-level planning: analyze task and create plan."""
        if "sandwich" in task_description.lower():
            plan = TaskPlan(subtasks=[
                SubTask.PICK_BREAD,
                SubTask.PLACE_BREAD,
                SubTask.SPREAD_BUTTER,
                SubTask.PICK_LETTUCE,
                SubTask.PLACE_LETTUCE,
                SubTask.CLOSE_SANDWICH,
            ])
        else:
            raise ValueError(f"Unknown task: {task_description}")
        
        self.current_plan = plan
        return plan
    
    def act(self, observation: dict) -> np.ndarray:
        """Select action based on current sub-task."""
        if self.current_plan is None or self.current_plan.is_done:
            return np.zeros(7)
        
        current_subtask = self.current_plan.current_subtask
        policy = self.subtask_policies[current_subtask]
        action = policy.predict(observation)
        
        if self.subtask_detector.is_complete(current_subtask, observation):
            print(f"Sub-task {current_subtask.value} completed!")
            self.current_plan.advance()
        
        return action


class SubTaskCompletionDetector:
    """Detects when a sub-task is completed.
    
    Can use:
    1. Rule-based: Check object positions
    2. Learned: Train classifier from data
    3. Vision: Use vision model for recognition
    """
    
    def is_complete(self, subtask: SubTask, observation: dict) -> bool:
        state = observation.get("state", np.zeros(10))
        
        if subtask == SubTask.PICK_BREAD:
            gripper_closed = state[-1] < 0.5
            object_lifted = state[2] > 0.1
            return gripper_closed and object_lifted
        
        elif subtask == SubTask.PLACE_BREAD:
            target_pos = np.array([0.0, 0.0, 0.02])
            current_pos = state[:3]
            return np.linalg.norm(current_pos - target_pos) < 0.03
        
        return False

Approach 2: Task and Motion Planning (TAMP)

TAMP combines symbolic planning (classical AI) with motion planning (robotics):

class TAMPSolver:
    """Task and Motion Planning solver.
    
    Symbolic level: PDDL-like planning
    Motion level: RRT/PRM path planning
    """
    
    def __init__(self):
        self.symbolic_state = {}
        self.action_primitives = {
            "pick": self._plan_pick,
            "place": self._plan_place,
            "push": self._plan_push,
            "pour": self._plan_pour,
        }
    
    def solve(self, initial_state: dict, goal: dict) -> List[dict]:
        """Find action sequence from current state to goal.
        
        Returns:
            List[dict]: Sequence of (action_name, params, motion_plan)
        """
        # Step 1: Symbolic search — find skeleton plan
        skeleton = self._symbolic_search(initial_state, goal)
        
        # Step 2: Motion planning — fill in trajectories
        full_plan = []
        current_state = initial_state.copy()
        
        for action_name, params in skeleton:
            motion_plan = self.action_primitives[action_name](
                current_state, params
            )
            
            if motion_plan is None:
                return self._backtrack_and_replan(
                    initial_state, goal, full_plan
                )
            
            full_plan.append({
                "action": action_name,
                "params": params,
                "trajectory": motion_plan,
            })
            
            current_state = self._apply_action(current_state, action_name, params)
        
        return full_plan
    
    def _symbolic_search(self, state, goal):
        """PDDL-like symbolic search."""
        queue = [(state, [])]
        visited = set()
        
        while queue:
            current, actions = queue.pop(0)
            
            if self._satisfies_goal(current, goal):
                return actions
            
            state_key = str(sorted(current.items()))
            if state_key in visited:
                continue
            visited.add(state_key)
            
            for action_name, planner in self.action_primitives.items():
                for params in self._get_valid_params(current, action_name):
                    next_state = self._apply_action(current, action_name, params)
                    queue.append((next_state, actions + [(action_name, params)]))
        
        return None
    
    def _plan_pick(self, state, params):
        """Motion planning for pick action."""
        object_pos = state[f"object_{params['object_id']}_pos"]
        approach_pos = object_pos + np.array([0, 0, 0.1])
        
        trajectory = []
        trajectory.extend(self._interpolate(state["ee_pos"], approach_pos, n_steps=30))
        trajectory.extend(self._interpolate(approach_pos, object_pos, n_steps=20))
        trajectory.append({"pos": object_pos, "gripper": -1.0})
        trajectory.extend(self._interpolate(object_pos, approach_pos, n_steps=20))
        
        return trajectory


solver = TAMPSolver()
plan = solver.solve(
    initial_state={
        "bread_pos": np.array([0.1, 0.0, 0.02]),
        "lettuce_pos": np.array([-0.1, 0.05, 0.02]),
        "butter_pos": np.array([0.0, -0.1, 0.02]),
        "ee_pos": np.array([0.0, 0.0, 0.3]),
    },
    goal={"sandwich_assembled": True},
)

for step in plan:
    print(f"Action: {step['action']}({step['params']})")
    print(f"  Trajectory: {len(step['trajectory'])} waypoints")

Task and Motion Planning hierarchy

Approach 3: Curriculum Learning

Instead of learning the entire long-horizon task at once, curriculum learning starts from simple sub-tasks and gradually combines them:

class CurriculumTrainer:
    """Train policy with curriculum from easy to hard.
    
    Stage 1: Learn individual sub-tasks
    Stage 2: Combine 2 consecutive sub-tasks
    Stage 3: Combine 3+ sub-tasks
    Stage 4: Full task
    """
    
    def __init__(self, policy, device="cuda"):
        self.policy = policy
        self.device = device
        self.stage = 1
        self.stage_thresholds = {
            1: 0.85,  # 85% success to advance
            2: 0.80,
            3: 0.75,
            4: 0.70,
        }
    
    def get_curriculum_dataset(self, stage: int) -> LeRobotDataset:
        """Get dataset appropriate for current stage."""
        if stage == 1:
            return LeRobotDataset(
                "my-user/sandwich-subtasks",
                episodes=[0, 50],
            )
        elif stage == 2:
            return LeRobotDataset(
                "my-user/sandwich-2steps",
                episodes=[0, 100],
            )
        elif stage == 3:
            return LeRobotDataset("my-user/sandwich-multistep")
        else:
            return LeRobotDataset("my-user/sandwich-full")
    
    def train_stage(self, stage: int, max_epochs=100):
        """Train one stage of the curriculum."""
        dataset = self.get_curriculum_dataset(stage)
        dataloader = torch.utils.data.DataLoader(
            dataset, batch_size=8, shuffle=True
        )
        
        optimizer = torch.optim.AdamW(self.policy.parameters(), lr=1e-5)
        
        for epoch in range(max_epochs):
            total_loss = 0
            n_batches = 0
            
            for batch in dataloader:
                batch = {k: v.to(self.device) for k, v in batch.items()}
                output = self.policy.forward(batch)
                loss = output["loss"]
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
                n_batches += 1
            
            avg_loss = total_loss / n_batches
            
            if (epoch + 1) % 10 == 0:
                print(f"Stage {stage} | Epoch {epoch+1} | Loss: {avg_loss:.4f}")
    
    def run_full_curriculum(self, env, max_stages=4):
        """Run the complete curriculum."""
        for stage in range(1, max_stages + 1):
            self.stage = stage
            print(f"\n{'='*50}")
            print(f"Starting Stage {stage}")
            print(f"{'='*50}")
            
            self.train_stage(stage)

Approach 4: LLM-based Task Planning

Using Large Language Models to analyze tasks and create plans:

class LLMTaskPlanner:
    """Use LLM to decompose tasks into sub-tasks.
    
    Inspired by SayCan (Ahn et al. 2022) and Inner Monologue (Huang et al. 2022).
    """
    
    def __init__(self, available_skills: List[str]):
        self.skills = available_skills
        self.history = []
    
    def create_prompt(self, task: str, observation_description: str) -> str:
        """Create prompt for LLM."""
        skills_str = "\n".join(f"- {s}" for s in self.skills)
        history_str = "\n".join(
            f"Step {i+1}: {h['action']} -> {'Success' if h['success'] else 'Failed'}"
            for i, h in enumerate(self.history)
        )
        
        prompt = f"""You are a robot task planner. Given the current scene and available skills,
plan the next action to complete the task.

Task: {task}

Available skills:
{skills_str}

Current scene: {observation_description}

Previous actions:
{history_str if history_str else "None yet"}

What should the robot do next? Reply with exactly one skill name and parameters.
Format: skill_name(param1, param2)
"""
        return prompt
    
    def get_next_action(self, task: str, observation: dict) -> tuple:
        """Ask LLM for the next action."""
        obs_description = self._describe_observation(observation)
        prompt = self.create_prompt(task, obs_description)
        response = self._call_llm(prompt)
        skill_name, params = self._parse_response(response)
        return skill_name, params


planner = LLMTaskPlanner(
    available_skills=[
        "pick(object_name)",
        "place(target_position)",
        "push(object_name, direction)",
        "pour(source, target)",
        "spread(tool, surface)",
        "open_gripper()",
        "close_gripper()",
    ]
)

Failure Recovery

Long-horizon tasks will fail. What matters is knowing how to recover:

class FailureRecoveryPolicy:
    """Policy with failure detection and recovery capabilities."""
    
    def __init__(self, main_policy, recovery_policies: dict):
        self.main_policy = main_policy
        self.recovery_policies = recovery_policies
        self.failure_detector = FailureDetector()
    
    def act_with_recovery(self, observation: dict, max_retries=3) -> np.ndarray:
        """Execute action with failure detection and recovery."""
        for attempt in range(max_retries):
            failure_type = self.failure_detector.detect(observation)
            
            if failure_type is None:
                return self.main_policy.predict(observation)
            
            print(f"Detected failure: {failure_type} (attempt {attempt+1}/{max_retries})")
            
            if failure_type in self.recovery_policies:
                recovery = self.recovery_policies[failure_type]
                return recovery.predict(observation)
            else:
                print(f"No recovery policy for {failure_type}")
                return np.zeros(7)
        
        print("Max retries exceeded!")
        return np.zeros(7)


class FailureDetector:
    """Detect common failure types."""
    
    def detect(self, observation: dict) -> Optional[str]:
        state = observation.get("state", np.zeros(10))
        
        if self._object_dropped(state):
            return "object_dropped"
        if self._collision_detected(state):
            return "collision"
        if self._is_stuck(state):
            return "stuck"
        return None
    
    def _object_dropped(self, state):
        gripper_state = state[-1]
        object_z = state[2]
        return gripper_state < 0.5 and object_z < 0.01
    
    def _collision_detected(self, state):
        contact_force = state[-2] if len(state) > 8 else 0
        return abs(contact_force) > 50.0
    
    def _is_stuck(self, state):
        if not hasattr(self, '_position_history'):
            self._position_history = []
        
        self._position_history.append(state[:3].copy())
        if len(self._position_history) > 30:
            self._position_history.pop(0)
        
        if len(self._position_history) >= 30:
            movement = np.linalg.norm(
                self._position_history[-1] - self._position_history[0]
            )
            return movement < 0.005
        return False

Reference Papers

  1. SayCan: Do As I Can, Not As I SayAhn et al., 2022 — LLM + affordance for robot planning
  2. Inner Monologue: Embodied Reasoning through Planning with Language ModelsHuang et al., 2022 — Feedback loop between LLM planner and robot
  3. Integrated Task and Motion Planning SurveyGarrett et al., 2021 — Comprehensive TAMP survey

Conclusion and Next Steps

Long-horizon tasks demand systems thinking — not just good policies but also planning, monitoring, and recovery. Hierarchical decomposition and curriculum learning are currently the two most effective approaches.

The next post — Dual-Arm Robot: Setup & Calibration — will expand from single-arm to dual-arm, unlocking tasks that require coordinated bimanual manipulation.

Related Posts

Related Posts

ComparisonSimpleVLA-RL (5): So sánh với LeRobot
ai-perceptionvlareinforcement-learninglerobotresearchPart 5

SimpleVLA-RL (5): So sánh với LeRobot

So sánh chi tiết SimpleVLA-RL và LeRobot: RL approach, VLA models, sim vs real, data efficiency — hai framework bổ trợ nhau.

11/4/202612 min read
TutorialSimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Part 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 min read
TutorialSimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPart 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 min read