ailerobotlong-horizonmulti-steptask-planning

Long-Horizon Tasks: Chuỗi hành động cho Multi-Step Manipulation

Xây dựng policy cho long-horizon tasks — hierarchical planning, curriculum learning, và failure recovery cho chuỗi hành động phức tạp.

Nguyễn Anh Tuấn24 tháng 3, 202611 phút đọc
Long-Horizon Tasks: Chuỗi hành động cho Multi-Step Manipulation

Giới thiệu: Khi một hành động không đủ

Trong các bài trước của series, chúng ta đã xử lý pick-and-place đơn lẻ (bài 3) và multi-object sorting (bài 4). Nhưng hãy nghĩ đến các tác vụ thực sự trong đời thường: pha cà phê (mở nắp, đổ bột, rót nước, khuấy), dọn bàn (gom đĩa, xếp vào máy rửa, lau bàn), hoặc lắp ráp (lấy ốc, căn chỉnh, vặn).

Tất cả đều là long-horizon tasks — chuỗi hành động kéo dài hàng trăm thậm chí hàng ngàn bước, đòi hỏi robot phải lên kế hoạch, thực thi tuần tự, và xử lý sai sót giữa chừng. Đây là bài toán khó nhất trong manipulation research hiện tại.

Long-horizon task planning

Tại sao Long-Horizon khó?

Thách thức Ảnh hưởng Ví dụ
Error accumulation Sai sót nhỏ tích lũy thành thất bại lớn Gắp lệch 1mm → đổ toàn bộ stack
Credit assignment Khó biết bước nào gây lỗi Thất bại ở bước 50, lỗi từ bước 12
Exponential search Số tổ hợp hành động tăng exponential 10 bước × 100 actions = 100^10
Partial observability Không thấy hết mọi thứ Object bị che khuất sau khi xoay
Recovery Phải biết quay lại khi sai Làm rơi đồ → nhặt lại → tiếp tục

Phương pháp 1: Hierarchical Policies

Ý tưởng: chia task lớn thành nhiều sub-tasks, mỗi sub-task có policy riêng, và có một high-level planner quyết định thứ tự.

Thiết kế Hierarchical System

import numpy as np
import torch
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Dict

class SubTask(Enum):
    """Các sub-tasks cho task "Make Sandwich"."""
    PICK_BREAD = "pick_bread"
    PLACE_BREAD = "place_bread"
    PICK_LETTUCE = "pick_lettuce"
    PLACE_LETTUCE = "place_lettuce"
    SPREAD_BUTTER = "spread_butter"
    CLOSE_SANDWICH = "close_sandwich"


@dataclass
class TaskPlan:
    """Kế hoạch thực hiện task."""
    subtasks: List[SubTask]
    current_step: int = 0
    completed: List[bool] = None
    
    def __post_init__(self):
        if self.completed is None:
            self.completed = [False] * len(self.subtasks)
    
    @property
    def current_subtask(self) -> Optional[SubTask]:
        if self.current_step < len(self.subtasks):
            return self.subtasks[self.current_step]
        return None
    
    @property
    def is_done(self) -> bool:
        return self.current_step >= len(self.subtasks)
    
    def advance(self):
        """Chuyển sang sub-task tiếp theo."""
        self.completed[self.current_step] = True
        self.current_step += 1


class HierarchicalPolicy:
    """Policy phân cấp cho long-horizon tasks.
    
    Gồm 2 tầng:
    - High-level: Task planner — quyết định sub-task nào tiếp theo
    - Low-level: Sub-task policies — thực hiện từng sub-task
    """
    
    def __init__(self, subtask_policies: Dict[SubTask, any]):
        self.subtask_policies = subtask_policies
        self.current_plan = None
        self.subtask_detector = SubTaskCompletionDetector()
    
    def plan(self, task_description: str, observation: dict) -> TaskPlan:
        """High-level planning: phân tích task và tạo kế hoạch."""
        # Trong thực tế, dùng LLM hoặc TAMP solver
        if "sandwich" in task_description.lower():
            plan = TaskPlan(subtasks=[
                SubTask.PICK_BREAD,
                SubTask.PLACE_BREAD,
                SubTask.SPREAD_BUTTER,
                SubTask.PICK_LETTUCE,
                SubTask.PLACE_LETTUCE,
                SubTask.CLOSE_SANDWICH,
            ])
        else:
            raise ValueError(f"Unknown task: {task_description}")
        
        self.current_plan = plan
        return plan
    
    def act(self, observation: dict) -> np.ndarray:
        """Chọn action dựa trên sub-task hiện tại."""
        if self.current_plan is None or self.current_plan.is_done:
            return np.zeros(7)  # No-op
        
        current_subtask = self.current_plan.current_subtask
        
        # Dùng policy tương ứng
        policy = self.subtask_policies[current_subtask]
        action = policy.predict(observation)
        
        # Kiểm tra sub-task đã hoàn thành chưa
        if self.subtask_detector.is_complete(current_subtask, observation):
            print(f"Sub-task {current_subtask.value} completed!")
            self.current_plan.advance()
        
        return action


class SubTaskCompletionDetector:
    """Phát hiện khi sub-task hoàn thành.
    
    Có thể dùng:
    1. Rule-based: Kiểm tra vị trí object
    2. Learned: Train classifier từ data
    3. Vision: Dùng vision model nhận diện
    """
    
    def is_complete(self, subtask: SubTask, observation: dict) -> bool:
        """Kiểm tra sub-task đã hoàn thành."""
        state = observation.get("state", np.zeros(10))
        
        if subtask == SubTask.PICK_BREAD:
            gripper_closed = state[-1] < 0.5
            object_lifted = state[2] > 0.1  # z > 10cm
            return gripper_closed and object_lifted
        
        elif subtask == SubTask.PLACE_BREAD:
            target_pos = np.array([0.0, 0.0, 0.02])
            current_pos = state[:3]
            return np.linalg.norm(current_pos - target_pos) < 0.03
        
        # ... các sub-tasks khác
        return False

Phương pháp 2: Task and Motion Planning (TAMP)

TAMP kết hợp symbolic planning (AI cổ điển) với motion planning (robotics):

class TAMPSolver:
    """Task and Motion Planning solver.
    
    Symbolic level: PDDL-like planning
    Motion level: RRT/PRM path planning
    """
    
    def __init__(self):
        self.symbolic_state = {}
        self.action_primitives = {
            "pick": self._plan_pick,
            "place": self._plan_place,
            "push": self._plan_push,
            "pour": self._plan_pour,
        }
    
    def solve(self, initial_state: dict, goal: dict) -> List[dict]:
        """Tìm chuỗi actions từ state hiện tại đến goal.
        
        Returns:
            List[dict]: Chuỗi (action_name, params, motion_plan)
        """
        # Bước 1: Symbolic search — tìm skeleton plan
        skeleton = self._symbolic_search(initial_state, goal)
        
        # Bước 2: Motion planning — fill in trajectories
        full_plan = []
        current_state = initial_state.copy()
        
        for action_name, params in skeleton:
            motion_plan = self.action_primitives[action_name](
                current_state, params
            )
            
            if motion_plan is None:
                # Backtrack nếu motion planning fail
                return self._backtrack_and_replan(
                    initial_state, goal, full_plan
                )
            
            full_plan.append({
                "action": action_name,
                "params": params,
                "trajectory": motion_plan,
            })
            
            # Update symbolic state
            current_state = self._apply_action(current_state, action_name, params)
        
        return full_plan
    
    def _symbolic_search(self, state, goal):
        """PDDL-like symbolic search."""
        # Simplified: BFS over action space
        queue = [(state, [])]
        visited = set()
        
        while queue:
            current, actions = queue.pop(0)
            
            if self._satisfies_goal(current, goal):
                return actions
            
            state_key = str(sorted(current.items()))
            if state_key in visited:
                continue
            visited.add(state_key)
            
            for action_name, planner in self.action_primitives.items():
                for params in self._get_valid_params(current, action_name):
                    next_state = self._apply_action(current, action_name, params)
                    queue.append((next_state, actions + [(action_name, params)]))
        
        return None  # No plan found
    
    def _plan_pick(self, state, params):
        """Motion planning cho pick action."""
        object_pos = state[f"object_{params['object_id']}_pos"]
        approach_pos = object_pos + np.array([0, 0, 0.1])  # 10cm above
        
        trajectory = []
        # Phase 1: Approach from above
        trajectory.extend(self._interpolate(state["ee_pos"], approach_pos, n_steps=30))
        # Phase 2: Descend
        trajectory.extend(self._interpolate(approach_pos, object_pos, n_steps=20))
        # Phase 3: Close gripper
        trajectory.append({"pos": object_pos, "gripper": -1.0})
        # Phase 4: Lift
        trajectory.extend(self._interpolate(object_pos, approach_pos, n_steps=20))
        
        return trajectory


# Sử dụng TAMP solver
solver = TAMPSolver()
plan = solver.solve(
    initial_state={
        "bread_pos": np.array([0.1, 0.0, 0.02]),
        "lettuce_pos": np.array([-0.1, 0.05, 0.02]),
        "butter_pos": np.array([0.0, -0.1, 0.02]),
        "ee_pos": np.array([0.0, 0.0, 0.3]),
    },
    goal={
        "sandwich_assembled": True,
    },
)

for step in plan:
    print(f"Action: {step['action']}({step['params']})")
    print(f"  Trajectory: {len(step['trajectory'])} waypoints")

Task and Motion Planning hierarchy

Phương pháp 3: Curriculum Learning

Thay vì học toàn bộ long-horizon task cùng lúc, curriculum learning bắt đầu từ sub-tasks đơn giản rồi ghép lại dần:

class CurriculumTrainer:
    """Huấn luyện policy theo curriculum từ dễ đến khó.
    
    Stage 1: Học từng sub-task riêng biệt
    Stage 2: Ghép 2 sub-tasks liên tiếp
    Stage 3: Ghép 3+ sub-tasks
    Stage 4: Full task
    """
    
    def __init__(self, policy, device="cuda"):
        self.policy = policy
        self.device = device
        self.stage = 1
        self.stage_thresholds = {
            1: 0.85,  # 85% success → lên stage 2
            2: 0.80,
            3: 0.75,
            4: 0.70,
        }
    
    def get_curriculum_dataset(self, stage: int) -> LeRobotDataset:
        """Tạo dataset phù hợp với stage hiện tại."""
        if stage == 1:
            # Chỉ lấy episodes ngắn (1 sub-task)
            return LeRobotDataset(
                "my-user/sandwich-subtasks",
                episodes=[0, 50],  # First 50 episodes = individual subtasks
            )
        elif stage == 2:
            # Episodes 2 sub-tasks
            return LeRobotDataset(
                "my-user/sandwich-2steps",
                episodes=[0, 100],
            )
        elif stage == 3:
            # Episodes 3-4 sub-tasks
            return LeRobotDataset(
                "my-user/sandwich-multistep",
            )
        else:
            # Full task
            return LeRobotDataset("my-user/sandwich-full")
    
    def train_stage(self, stage: int, max_epochs=100):
        """Train một stage của curriculum."""
        dataset = self.get_curriculum_dataset(stage)
        dataloader = torch.utils.data.DataLoader(
            dataset, batch_size=8, shuffle=True
        )
        
        optimizer = torch.optim.AdamW(self.policy.parameters(), lr=1e-5)
        
        for epoch in range(max_epochs):
            total_loss = 0
            n_batches = 0
            
            for batch in dataloader:
                batch = {k: v.to(self.device) for k, v in batch.items()}
                output = self.policy.forward(batch)
                loss = output["loss"]
                
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
                n_batches += 1
            
            avg_loss = total_loss / n_batches
            
            if (epoch + 1) % 10 == 0:
                print(f"Stage {stage} | Epoch {epoch+1} | Loss: {avg_loss:.4f}")
    
    def evaluate_and_advance(self, env, n_episodes=50):
        """Đánh giá và tự động lên stage nếu đủ điều kiện."""
        success_rate = self._evaluate(env, n_episodes)
        threshold = self.stage_thresholds.get(self.stage, 0.7)
        
        print(f"Stage {self.stage}: {success_rate:.1%} "
              f"(threshold: {threshold:.1%})")
        
        if success_rate >= threshold:
            self.stage += 1
            print(f"Advancing to stage {self.stage}!")
            return True
        return False
    
    def run_full_curriculum(self, env, max_stages=4):
        """Chạy toàn bộ curriculum."""
        for stage in range(1, max_stages + 1):
            self.stage = stage
            print(f"\n{'='*50}")
            print(f"Starting Stage {stage}")
            print(f"{'='*50}")
            
            self.train_stage(stage)
            
            if not self.evaluate_and_advance(env):
                print(f"Failed to pass stage {stage}. Retraining...")
                self.train_stage(stage, max_epochs=200)

Phương pháp 4: LLM-based Task Planning

Sử dụng Large Language Models để phân tích task và tạo plan:

class LLMTaskPlanner:
    """Dùng LLM để phân tích task thành sub-tasks.
    
    Inspired by SayCan (Ahn et al. 2022) và Inner Monologue (Huang et al. 2022).
    """
    
    def __init__(self, available_skills: List[str]):
        self.skills = available_skills
        self.history = []
    
    def create_prompt(self, task: str, observation_description: str) -> str:
        """Tạo prompt cho LLM."""
        skills_str = "\n".join(f"- {s}" for s in self.skills)
        history_str = "\n".join(
            f"Step {i+1}: {h['action']} -> {'Success' if h['success'] else 'Failed'}"
            for i, h in enumerate(self.history)
        )
        
        prompt = f"""You are a robot task planner. Given the current scene and available skills,
plan the next action to complete the task.

Task: {task}

Available skills:
{skills_str}

Current scene: {observation_description}

Previous actions:
{history_str if history_str else "None yet"}

What should the robot do next? Reply with exactly one skill name and parameters.
Format: skill_name(param1, param2)
"""
        return prompt
    
    def get_next_action(self, task: str, observation: dict) -> tuple:
        """Hỏi LLM để lấy action tiếp theo."""
        # Mô tả observation bằng text (hoặc dùng VLM)
        obs_description = self._describe_observation(observation)
        
        prompt = self.create_prompt(task, obs_description)
        
        # Gọi LLM API (ví dụ: OpenAI, Claude, etc.)
        response = self._call_llm(prompt)
        
        # Parse response thành skill + params
        skill_name, params = self._parse_response(response)
        
        return skill_name, params
    
    def _describe_observation(self, observation: dict) -> str:
        """Mô tả observation bằng text."""
        state = observation.get("state", {})
        description = []
        
        # Object positions
        for key, value in state.items():
            if "pos" in key:
                obj_name = key.replace("_pos", "")
                description.append(
                    f"{obj_name} is at position ({value[0]:.2f}, {value[1]:.2f}, {value[2]:.2f})"
                )
        
        # Gripper state
        gripper = state.get("gripper", 1.0)
        description.append(f"Gripper is {'open' if gripper > 0.5 else 'closed'}")
        
        return ". ".join(description)


# Sử dụng
planner = LLMTaskPlanner(
    available_skills=[
        "pick(object_name)",
        "place(target_position)",
        "push(object_name, direction)",
        "pour(source, target)",
        "spread(tool, surface)",
        "open_gripper()",
        "close_gripper()",
    ]
)

skill, params = planner.get_next_action(
    task="Make a peanut butter sandwich",
    observation=current_obs,
)
print(f"Next action: {skill}({params})")

Failure Recovery

Long-horizon tasks sẽ fail. Quan trọng là biết recovery:

class FailureRecoveryPolicy:
    """Policy với khả năng phát hiện và recovery từ failures."""
    
    def __init__(self, main_policy, recovery_policies: dict):
        self.main_policy = main_policy
        self.recovery_policies = recovery_policies
        self.failure_detector = FailureDetector()
    
    def act_with_recovery(self, observation: dict, max_retries=3) -> np.ndarray:
        """Execute action với failure detection và recovery."""
        for attempt in range(max_retries):
            # Kiểm tra failure
            failure_type = self.failure_detector.detect(observation)
            
            if failure_type is None:
                # Không có failure — dùng main policy
                return self.main_policy.predict(observation)
            
            print(f"Detected failure: {failure_type} (attempt {attempt+1}/{max_retries})")
            
            # Dùng recovery policy tương ứng
            if failure_type in self.recovery_policies:
                recovery = self.recovery_policies[failure_type]
                action = recovery.predict(observation)
                return action
            else:
                # Không có recovery → dừng
                print(f"No recovery policy for {failure_type}")
                return np.zeros(7)
        
        print("Max retries exceeded!")
        return np.zeros(7)


class FailureDetector:
    """Phát hiện các loại failure phổ biến."""
    
    def detect(self, observation: dict) -> Optional[str]:
        """Phát hiện failure từ observation."""
        state = observation.get("state", np.zeros(10))
        
        # Failure 1: Object dropped
        if self._object_dropped(state):
            return "object_dropped"
        
        # Failure 2: Collision detected
        if self._collision_detected(state):
            return "collision"
        
        # Failure 3: Timeout (stuck)
        if self._is_stuck(state):
            return "stuck"
        
        return None
    
    def _object_dropped(self, state):
        """Kiểm tra object có bị rơi không."""
        gripper_state = state[-1]
        object_z = state[2]
        return gripper_state < 0.5 and object_z < 0.01  # Gripper đóng nhưng object trên sàn
    
    def _collision_detected(self, state):
        """Kiểm tra va chạm."""
        contact_force = state[-2] if len(state) > 8 else 0
        return abs(contact_force) > 50.0  # Force threshold
    
    def _is_stuck(self, state):
        """Kiểm tra robot bị kẹt."""
        # So sánh vị trí hiện tại với vị trí 10 bước trước
        if not hasattr(self, '_position_history'):
            self._position_history = []
        
        self._position_history.append(state[:3].copy())
        if len(self._position_history) > 30:
            self._position_history.pop(0)
        
        if len(self._position_history) >= 30:
            movement = np.linalg.norm(
                self._position_history[-1] - self._position_history[0]
            )
            return movement < 0.005  # Di chuyển < 5mm trong 30 bước
        
        return False

Papers tham khảo

  1. SayCan: Do As I Can, Not As I SayAhn et al., 2022 — Dùng LLM + affordance để lập kế hoạch robot
  2. Inner Monologue: Embodied Reasoning through Planning with Language ModelsHuang et al., 2022 — Feedback loop giữa LLM planner và robot
  3. TAMP Survey: Integrated Task and Motion PlanningGarrett et al., 2021 — Survey về TAMP methods

Kết luận và bước tiếp theo

Long-horizon tasks đòi hỏi tư duy hệ thống — không chỉ policy tốt mà còn cần planning, monitoring, và recovery. Hierarchical decomposition và curriculum learning là hai phương pháp hiệu quả nhất hiện tại.

Bài tiếp theo — Dual-Arm Robot: Setup và Calibration — sẽ mở rộng từ single-arm sang dual-arm, mở ra khả năng thực hiện các task cần hai tay phối hợp.

Bài viết liên quan

NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWTutorial
SimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPhần 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 phút đọc
NEWTutorial
SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Phần 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 phút đọc
NEWTutorial
PEFT/LoRA Fine-tune & Deploy VLA
lerobotpeftloradeploymentvlaPhần 15

PEFT/LoRA Fine-tune & Deploy VLA

Fine-tune VLA lớn bằng LoRA trên GPU nhỏ, deploy lên robot thật với Real-Time Chunking — production-ready workflow.

11/4/202612 phút đọc