Introduction: When One Action Isn't Enough
In previous posts, we handled single pick-and-place (post 3) and multi-object sorting (post 4). But think about real-world tasks: making coffee (open lid, pour grounds, add water, stir), clearing a table (collect dishes, load dishwasher, wipe table), or assembly (grab screw, align, tighten).
These are all long-horizon tasks — action sequences spanning hundreds or even thousands of steps, requiring the robot to plan, execute sequentially, and handle mid-execution failures. This is the hardest problem in current manipulation research.
Why Is Long-Horizon Hard?
| Challenge | Impact | Example |
|---|---|---|
| Error accumulation | Small errors compound into large failures | 1mm misalignment in grasping topples entire stack |
| Credit assignment | Hard to identify which step caused failure | Failure at step 50, error from step 12 |
| Exponential search | Action combinations grow exponentially | 10 steps x 100 actions = 100^10 |
| Partial observability | Can't see everything | Object occluded after rotation |
| Recovery | Must know how to recover from errors | Drop object, pick up again, continue |
Approach 1: Hierarchical Policies
The idea: divide large tasks into sub-tasks, each with its own policy, and a high-level planner decides the order.
Designing the Hierarchical System
import numpy as np
import torch
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Dict
class SubTask(Enum):
"""Sub-tasks for "Make Sandwich" task."""
PICK_BREAD = "pick_bread"
PLACE_BREAD = "place_bread"
PICK_LETTUCE = "pick_lettuce"
PLACE_LETTUCE = "place_lettuce"
SPREAD_BUTTER = "spread_butter"
CLOSE_SANDWICH = "close_sandwich"
@dataclass
class TaskPlan:
"""Task execution plan."""
subtasks: List[SubTask]
current_step: int = 0
completed: List[bool] = None
def __post_init__(self):
if self.completed is None:
self.completed = [False] * len(self.subtasks)
@property
def current_subtask(self) -> Optional[SubTask]:
if self.current_step < len(self.subtasks):
return self.subtasks[self.current_step]
return None
@property
def is_done(self) -> bool:
return self.current_step >= len(self.subtasks)
def advance(self):
"""Move to next sub-task."""
self.completed[self.current_step] = True
self.current_step += 1
class HierarchicalPolicy:
"""Hierarchical policy for long-horizon tasks.
Two levels:
- High-level: Task planner — decides which sub-task is next
- Low-level: Sub-task policies — executes each sub-task
"""
def __init__(self, subtask_policies: Dict[SubTask, any]):
self.subtask_policies = subtask_policies
self.current_plan = None
self.subtask_detector = SubTaskCompletionDetector()
def plan(self, task_description: str, observation: dict) -> TaskPlan:
"""High-level planning: analyze task and create plan."""
if "sandwich" in task_description.lower():
plan = TaskPlan(subtasks=[
SubTask.PICK_BREAD,
SubTask.PLACE_BREAD,
SubTask.SPREAD_BUTTER,
SubTask.PICK_LETTUCE,
SubTask.PLACE_LETTUCE,
SubTask.CLOSE_SANDWICH,
])
else:
raise ValueError(f"Unknown task: {task_description}")
self.current_plan = plan
return plan
def act(self, observation: dict) -> np.ndarray:
"""Select action based on current sub-task."""
if self.current_plan is None or self.current_plan.is_done:
return np.zeros(7)
current_subtask = self.current_plan.current_subtask
policy = self.subtask_policies[current_subtask]
action = policy.predict(observation)
if self.subtask_detector.is_complete(current_subtask, observation):
print(f"Sub-task {current_subtask.value} completed!")
self.current_plan.advance()
return action
class SubTaskCompletionDetector:
"""Detects when a sub-task is completed.
Can use:
1. Rule-based: Check object positions
2. Learned: Train classifier from data
3. Vision: Use vision model for recognition
"""
def is_complete(self, subtask: SubTask, observation: dict) -> bool:
state = observation.get("state", np.zeros(10))
if subtask == SubTask.PICK_BREAD:
gripper_closed = state[-1] < 0.5
object_lifted = state[2] > 0.1
return gripper_closed and object_lifted
elif subtask == SubTask.PLACE_BREAD:
target_pos = np.array([0.0, 0.0, 0.02])
current_pos = state[:3]
return np.linalg.norm(current_pos - target_pos) < 0.03
return False
Approach 2: Task and Motion Planning (TAMP)
TAMP combines symbolic planning (classical AI) with motion planning (robotics):
class TAMPSolver:
"""Task and Motion Planning solver.
Symbolic level: PDDL-like planning
Motion level: RRT/PRM path planning
"""
def __init__(self):
self.symbolic_state = {}
self.action_primitives = {
"pick": self._plan_pick,
"place": self._plan_place,
"push": self._plan_push,
"pour": self._plan_pour,
}
def solve(self, initial_state: dict, goal: dict) -> List[dict]:
"""Find action sequence from current state to goal.
Returns:
List[dict]: Sequence of (action_name, params, motion_plan)
"""
# Step 1: Symbolic search — find skeleton plan
skeleton = self._symbolic_search(initial_state, goal)
# Step 2: Motion planning — fill in trajectories
full_plan = []
current_state = initial_state.copy()
for action_name, params in skeleton:
motion_plan = self.action_primitives[action_name](
current_state, params
)
if motion_plan is None:
return self._backtrack_and_replan(
initial_state, goal, full_plan
)
full_plan.append({
"action": action_name,
"params": params,
"trajectory": motion_plan,
})
current_state = self._apply_action(current_state, action_name, params)
return full_plan
def _symbolic_search(self, state, goal):
"""PDDL-like symbolic search."""
queue = [(state, [])]
visited = set()
while queue:
current, actions = queue.pop(0)
if self._satisfies_goal(current, goal):
return actions
state_key = str(sorted(current.items()))
if state_key in visited:
continue
visited.add(state_key)
for action_name, planner in self.action_primitives.items():
for params in self._get_valid_params(current, action_name):
next_state = self._apply_action(current, action_name, params)
queue.append((next_state, actions + [(action_name, params)]))
return None
def _plan_pick(self, state, params):
"""Motion planning for pick action."""
object_pos = state[f"object_{params['object_id']}_pos"]
approach_pos = object_pos + np.array([0, 0, 0.1])
trajectory = []
trajectory.extend(self._interpolate(state["ee_pos"], approach_pos, n_steps=30))
trajectory.extend(self._interpolate(approach_pos, object_pos, n_steps=20))
trajectory.append({"pos": object_pos, "gripper": -1.0})
trajectory.extend(self._interpolate(object_pos, approach_pos, n_steps=20))
return trajectory
solver = TAMPSolver()
plan = solver.solve(
initial_state={
"bread_pos": np.array([0.1, 0.0, 0.02]),
"lettuce_pos": np.array([-0.1, 0.05, 0.02]),
"butter_pos": np.array([0.0, -0.1, 0.02]),
"ee_pos": np.array([0.0, 0.0, 0.3]),
},
goal={"sandwich_assembled": True},
)
for step in plan:
print(f"Action: {step['action']}({step['params']})")
print(f" Trajectory: {len(step['trajectory'])} waypoints")
Approach 3: Curriculum Learning
Instead of learning the entire long-horizon task at once, curriculum learning starts from simple sub-tasks and gradually combines them:
class CurriculumTrainer:
"""Train policy with curriculum from easy to hard.
Stage 1: Learn individual sub-tasks
Stage 2: Combine 2 consecutive sub-tasks
Stage 3: Combine 3+ sub-tasks
Stage 4: Full task
"""
def __init__(self, policy, device="cuda"):
self.policy = policy
self.device = device
self.stage = 1
self.stage_thresholds = {
1: 0.85, # 85% success to advance
2: 0.80,
3: 0.75,
4: 0.70,
}
def get_curriculum_dataset(self, stage: int) -> LeRobotDataset:
"""Get dataset appropriate for current stage."""
if stage == 1:
return LeRobotDataset(
"my-user/sandwich-subtasks",
episodes=[0, 50],
)
elif stage == 2:
return LeRobotDataset(
"my-user/sandwich-2steps",
episodes=[0, 100],
)
elif stage == 3:
return LeRobotDataset("my-user/sandwich-multistep")
else:
return LeRobotDataset("my-user/sandwich-full")
def train_stage(self, stage: int, max_epochs=100):
"""Train one stage of the curriculum."""
dataset = self.get_curriculum_dataset(stage)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=8, shuffle=True
)
optimizer = torch.optim.AdamW(self.policy.parameters(), lr=1e-5)
for epoch in range(max_epochs):
total_loss = 0
n_batches = 0
for batch in dataloader:
batch = {k: v.to(self.device) for k, v in batch.items()}
output = self.policy.forward(batch)
loss = output["loss"]
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
n_batches += 1
avg_loss = total_loss / n_batches
if (epoch + 1) % 10 == 0:
print(f"Stage {stage} | Epoch {epoch+1} | Loss: {avg_loss:.4f}")
def run_full_curriculum(self, env, max_stages=4):
"""Run the complete curriculum."""
for stage in range(1, max_stages + 1):
self.stage = stage
print(f"\n{'='*50}")
print(f"Starting Stage {stage}")
print(f"{'='*50}")
self.train_stage(stage)
Approach 4: LLM-based Task Planning
Using Large Language Models to analyze tasks and create plans:
class LLMTaskPlanner:
"""Use LLM to decompose tasks into sub-tasks.
Inspired by SayCan (Ahn et al. 2022) and Inner Monologue (Huang et al. 2022).
"""
def __init__(self, available_skills: List[str]):
self.skills = available_skills
self.history = []
def create_prompt(self, task: str, observation_description: str) -> str:
"""Create prompt for LLM."""
skills_str = "\n".join(f"- {s}" for s in self.skills)
history_str = "\n".join(
f"Step {i+1}: {h['action']} -> {'Success' if h['success'] else 'Failed'}"
for i, h in enumerate(self.history)
)
prompt = f"""You are a robot task planner. Given the current scene and available skills,
plan the next action to complete the task.
Task: {task}
Available skills:
{skills_str}
Current scene: {observation_description}
Previous actions:
{history_str if history_str else "None yet"}
What should the robot do next? Reply with exactly one skill name and parameters.
Format: skill_name(param1, param2)
"""
return prompt
def get_next_action(self, task: str, observation: dict) -> tuple:
"""Ask LLM for the next action."""
obs_description = self._describe_observation(observation)
prompt = self.create_prompt(task, obs_description)
response = self._call_llm(prompt)
skill_name, params = self._parse_response(response)
return skill_name, params
planner = LLMTaskPlanner(
available_skills=[
"pick(object_name)",
"place(target_position)",
"push(object_name, direction)",
"pour(source, target)",
"spread(tool, surface)",
"open_gripper()",
"close_gripper()",
]
)
Failure Recovery
Long-horizon tasks will fail. What matters is knowing how to recover:
class FailureRecoveryPolicy:
"""Policy with failure detection and recovery capabilities."""
def __init__(self, main_policy, recovery_policies: dict):
self.main_policy = main_policy
self.recovery_policies = recovery_policies
self.failure_detector = FailureDetector()
def act_with_recovery(self, observation: dict, max_retries=3) -> np.ndarray:
"""Execute action with failure detection and recovery."""
for attempt in range(max_retries):
failure_type = self.failure_detector.detect(observation)
if failure_type is None:
return self.main_policy.predict(observation)
print(f"Detected failure: {failure_type} (attempt {attempt+1}/{max_retries})")
if failure_type in self.recovery_policies:
recovery = self.recovery_policies[failure_type]
return recovery.predict(observation)
else:
print(f"No recovery policy for {failure_type}")
return np.zeros(7)
print("Max retries exceeded!")
return np.zeros(7)
class FailureDetector:
"""Detect common failure types."""
def detect(self, observation: dict) -> Optional[str]:
state = observation.get("state", np.zeros(10))
if self._object_dropped(state):
return "object_dropped"
if self._collision_detected(state):
return "collision"
if self._is_stuck(state):
return "stuck"
return None
def _object_dropped(self, state):
gripper_state = state[-1]
object_z = state[2]
return gripper_state < 0.5 and object_z < 0.01
def _collision_detected(self, state):
contact_force = state[-2] if len(state) > 8 else 0
return abs(contact_force) > 50.0
def _is_stuck(self, state):
if not hasattr(self, '_position_history'):
self._position_history = []
self._position_history.append(state[:3].copy())
if len(self._position_history) > 30:
self._position_history.pop(0)
if len(self._position_history) >= 30:
movement = np.linalg.norm(
self._position_history[-1] - self._position_history[0]
)
return movement < 0.005
return False
Reference Papers
- SayCan: Do As I Can, Not As I Say — Ahn et al., 2022 — LLM + affordance for robot planning
- Inner Monologue: Embodied Reasoning through Planning with Language Models — Huang et al., 2022 — Feedback loop between LLM planner and robot
- Integrated Task and Motion Planning Survey — Garrett et al., 2021 — Comprehensive TAMP survey
Conclusion and Next Steps
Long-horizon tasks demand systems thinking — not just good policies but also planning, monitoring, and recovery. Hierarchical decomposition and curriculum learning are currently the two most effective approaches.
The next post — Dual-Arm Robot: Setup & Calibration — will expand from single-arm to dual-arm, unlocking tasks that require coordinated bimanual manipulation.
Related Posts
- Multi-Object Manipulation — Foundation for long-horizon tasks
- VLA Models for Robots — Language conditioning for task planning
- Sim-to-Real Pipeline — Deploying complex policies to real robots