Giới thiệu: Khi một hành động không đủ
Trong các bài trước của series, chúng ta đã xử lý pick-and-place đơn lẻ (bài 3) và multi-object sorting (bài 4). Nhưng hãy nghĩ đến các tác vụ thực sự trong đời thường: pha cà phê (mở nắp, đổ bột, rót nước, khuấy), dọn bàn (gom đĩa, xếp vào máy rửa, lau bàn), hoặc lắp ráp (lấy ốc, căn chỉnh, vặn).
Tất cả đều là long-horizon tasks — chuỗi hành động kéo dài hàng trăm thậm chí hàng ngàn bước, đòi hỏi robot phải lên kế hoạch, thực thi tuần tự, và xử lý sai sót giữa chừng. Đây là bài toán khó nhất trong manipulation research hiện tại.
Tại sao Long-Horizon khó?
| Thách thức | Ảnh hưởng | Ví dụ |
|---|---|---|
| Error accumulation | Sai sót nhỏ tích lũy thành thất bại lớn | Gắp lệch 1mm → đổ toàn bộ stack |
| Credit assignment | Khó biết bước nào gây lỗi | Thất bại ở bước 50, lỗi từ bước 12 |
| Exponential search | Số tổ hợp hành động tăng exponential | 10 bước × 100 actions = 100^10 |
| Partial observability | Không thấy hết mọi thứ | Object bị che khuất sau khi xoay |
| Recovery | Phải biết quay lại khi sai | Làm rơi đồ → nhặt lại → tiếp tục |
Phương pháp 1: Hierarchical Policies
Ý tưởng: chia task lớn thành nhiều sub-tasks, mỗi sub-task có policy riêng, và có một high-level planner quyết định thứ tự.
Thiết kế Hierarchical System
import numpy as np
import torch
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Dict
class SubTask(Enum):
"""Các sub-tasks cho task "Make Sandwich"."""
PICK_BREAD = "pick_bread"
PLACE_BREAD = "place_bread"
PICK_LETTUCE = "pick_lettuce"
PLACE_LETTUCE = "place_lettuce"
SPREAD_BUTTER = "spread_butter"
CLOSE_SANDWICH = "close_sandwich"
@dataclass
class TaskPlan:
"""Kế hoạch thực hiện task."""
subtasks: List[SubTask]
current_step: int = 0
completed: List[bool] = None
def __post_init__(self):
if self.completed is None:
self.completed = [False] * len(self.subtasks)
@property
def current_subtask(self) -> Optional[SubTask]:
if self.current_step < len(self.subtasks):
return self.subtasks[self.current_step]
return None
@property
def is_done(self) -> bool:
return self.current_step >= len(self.subtasks)
def advance(self):
"""Chuyển sang sub-task tiếp theo."""
self.completed[self.current_step] = True
self.current_step += 1
class HierarchicalPolicy:
"""Policy phân cấp cho long-horizon tasks.
Gồm 2 tầng:
- High-level: Task planner — quyết định sub-task nào tiếp theo
- Low-level: Sub-task policies — thực hiện từng sub-task
"""
def __init__(self, subtask_policies: Dict[SubTask, any]):
self.subtask_policies = subtask_policies
self.current_plan = None
self.subtask_detector = SubTaskCompletionDetector()
def plan(self, task_description: str, observation: dict) -> TaskPlan:
"""High-level planning: phân tích task và tạo kế hoạch."""
# Trong thực tế, dùng LLM hoặc TAMP solver
if "sandwich" in task_description.lower():
plan = TaskPlan(subtasks=[
SubTask.PICK_BREAD,
SubTask.PLACE_BREAD,
SubTask.SPREAD_BUTTER,
SubTask.PICK_LETTUCE,
SubTask.PLACE_LETTUCE,
SubTask.CLOSE_SANDWICH,
])
else:
raise ValueError(f"Unknown task: {task_description}")
self.current_plan = plan
return plan
def act(self, observation: dict) -> np.ndarray:
"""Chọn action dựa trên sub-task hiện tại."""
if self.current_plan is None or self.current_plan.is_done:
return np.zeros(7) # No-op
current_subtask = self.current_plan.current_subtask
# Dùng policy tương ứng
policy = self.subtask_policies[current_subtask]
action = policy.predict(observation)
# Kiểm tra sub-task đã hoàn thành chưa
if self.subtask_detector.is_complete(current_subtask, observation):
print(f"Sub-task {current_subtask.value} completed!")
self.current_plan.advance()
return action
class SubTaskCompletionDetector:
"""Phát hiện khi sub-task hoàn thành.
Có thể dùng:
1. Rule-based: Kiểm tra vị trí object
2. Learned: Train classifier từ data
3. Vision: Dùng vision model nhận diện
"""
def is_complete(self, subtask: SubTask, observation: dict) -> bool:
"""Kiểm tra sub-task đã hoàn thành."""
state = observation.get("state", np.zeros(10))
if subtask == SubTask.PICK_BREAD:
gripper_closed = state[-1] < 0.5
object_lifted = state[2] > 0.1 # z > 10cm
return gripper_closed and object_lifted
elif subtask == SubTask.PLACE_BREAD:
target_pos = np.array([0.0, 0.0, 0.02])
current_pos = state[:3]
return np.linalg.norm(current_pos - target_pos) < 0.03
# ... các sub-tasks khác
return False
Phương pháp 2: Task and Motion Planning (TAMP)
TAMP kết hợp symbolic planning (AI cổ điển) với motion planning (robotics):
class TAMPSolver:
"""Task and Motion Planning solver.
Symbolic level: PDDL-like planning
Motion level: RRT/PRM path planning
"""
def __init__(self):
self.symbolic_state = {}
self.action_primitives = {
"pick": self._plan_pick,
"place": self._plan_place,
"push": self._plan_push,
"pour": self._plan_pour,
}
def solve(self, initial_state: dict, goal: dict) -> List[dict]:
"""Tìm chuỗi actions từ state hiện tại đến goal.
Returns:
List[dict]: Chuỗi (action_name, params, motion_plan)
"""
# Bước 1: Symbolic search — tìm skeleton plan
skeleton = self._symbolic_search(initial_state, goal)
# Bước 2: Motion planning — fill in trajectories
full_plan = []
current_state = initial_state.copy()
for action_name, params in skeleton:
motion_plan = self.action_primitives[action_name](
current_state, params
)
if motion_plan is None:
# Backtrack nếu motion planning fail
return self._backtrack_and_replan(
initial_state, goal, full_plan
)
full_plan.append({
"action": action_name,
"params": params,
"trajectory": motion_plan,
})
# Update symbolic state
current_state = self._apply_action(current_state, action_name, params)
return full_plan
def _symbolic_search(self, state, goal):
"""PDDL-like symbolic search."""
# Simplified: BFS over action space
queue = [(state, [])]
visited = set()
while queue:
current, actions = queue.pop(0)
if self._satisfies_goal(current, goal):
return actions
state_key = str(sorted(current.items()))
if state_key in visited:
continue
visited.add(state_key)
for action_name, planner in self.action_primitives.items():
for params in self._get_valid_params(current, action_name):
next_state = self._apply_action(current, action_name, params)
queue.append((next_state, actions + [(action_name, params)]))
return None # No plan found
def _plan_pick(self, state, params):
"""Motion planning cho pick action."""
object_pos = state[f"object_{params['object_id']}_pos"]
approach_pos = object_pos + np.array([0, 0, 0.1]) # 10cm above
trajectory = []
# Phase 1: Approach from above
trajectory.extend(self._interpolate(state["ee_pos"], approach_pos, n_steps=30))
# Phase 2: Descend
trajectory.extend(self._interpolate(approach_pos, object_pos, n_steps=20))
# Phase 3: Close gripper
trajectory.append({"pos": object_pos, "gripper": -1.0})
# Phase 4: Lift
trajectory.extend(self._interpolate(object_pos, approach_pos, n_steps=20))
return trajectory
# Sử dụng TAMP solver
solver = TAMPSolver()
plan = solver.solve(
initial_state={
"bread_pos": np.array([0.1, 0.0, 0.02]),
"lettuce_pos": np.array([-0.1, 0.05, 0.02]),
"butter_pos": np.array([0.0, -0.1, 0.02]),
"ee_pos": np.array([0.0, 0.0, 0.3]),
},
goal={
"sandwich_assembled": True,
},
)
for step in plan:
print(f"Action: {step['action']}({step['params']})")
print(f" Trajectory: {len(step['trajectory'])} waypoints")
Phương pháp 3: Curriculum Learning
Thay vì học toàn bộ long-horizon task cùng lúc, curriculum learning bắt đầu từ sub-tasks đơn giản rồi ghép lại dần:
class CurriculumTrainer:
"""Huấn luyện policy theo curriculum từ dễ đến khó.
Stage 1: Học từng sub-task riêng biệt
Stage 2: Ghép 2 sub-tasks liên tiếp
Stage 3: Ghép 3+ sub-tasks
Stage 4: Full task
"""
def __init__(self, policy, device="cuda"):
self.policy = policy
self.device = device
self.stage = 1
self.stage_thresholds = {
1: 0.85, # 85% success → lên stage 2
2: 0.80,
3: 0.75,
4: 0.70,
}
def get_curriculum_dataset(self, stage: int) -> LeRobotDataset:
"""Tạo dataset phù hợp với stage hiện tại."""
if stage == 1:
# Chỉ lấy episodes ngắn (1 sub-task)
return LeRobotDataset(
"my-user/sandwich-subtasks",
episodes=[0, 50], # First 50 episodes = individual subtasks
)
elif stage == 2:
# Episodes 2 sub-tasks
return LeRobotDataset(
"my-user/sandwich-2steps",
episodes=[0, 100],
)
elif stage == 3:
# Episodes 3-4 sub-tasks
return LeRobotDataset(
"my-user/sandwich-multistep",
)
else:
# Full task
return LeRobotDataset("my-user/sandwich-full")
def train_stage(self, stage: int, max_epochs=100):
"""Train một stage của curriculum."""
dataset = self.get_curriculum_dataset(stage)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=8, shuffle=True
)
optimizer = torch.optim.AdamW(self.policy.parameters(), lr=1e-5)
for epoch in range(max_epochs):
total_loss = 0
n_batches = 0
for batch in dataloader:
batch = {k: v.to(self.device) for k, v in batch.items()}
output = self.policy.forward(batch)
loss = output["loss"]
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
n_batches += 1
avg_loss = total_loss / n_batches
if (epoch + 1) % 10 == 0:
print(f"Stage {stage} | Epoch {epoch+1} | Loss: {avg_loss:.4f}")
def evaluate_and_advance(self, env, n_episodes=50):
"""Đánh giá và tự động lên stage nếu đủ điều kiện."""
success_rate = self._evaluate(env, n_episodes)
threshold = self.stage_thresholds.get(self.stage, 0.7)
print(f"Stage {self.stage}: {success_rate:.1%} "
f"(threshold: {threshold:.1%})")
if success_rate >= threshold:
self.stage += 1
print(f"Advancing to stage {self.stage}!")
return True
return False
def run_full_curriculum(self, env, max_stages=4):
"""Chạy toàn bộ curriculum."""
for stage in range(1, max_stages + 1):
self.stage = stage
print(f"\n{'='*50}")
print(f"Starting Stage {stage}")
print(f"{'='*50}")
self.train_stage(stage)
if not self.evaluate_and_advance(env):
print(f"Failed to pass stage {stage}. Retraining...")
self.train_stage(stage, max_epochs=200)
Phương pháp 4: LLM-based Task Planning
Sử dụng Large Language Models để phân tích task và tạo plan:
class LLMTaskPlanner:
"""Dùng LLM để phân tích task thành sub-tasks.
Inspired by SayCan (Ahn et al. 2022) và Inner Monologue (Huang et al. 2022).
"""
def __init__(self, available_skills: List[str]):
self.skills = available_skills
self.history = []
def create_prompt(self, task: str, observation_description: str) -> str:
"""Tạo prompt cho LLM."""
skills_str = "\n".join(f"- {s}" for s in self.skills)
history_str = "\n".join(
f"Step {i+1}: {h['action']} -> {'Success' if h['success'] else 'Failed'}"
for i, h in enumerate(self.history)
)
prompt = f"""You are a robot task planner. Given the current scene and available skills,
plan the next action to complete the task.
Task: {task}
Available skills:
{skills_str}
Current scene: {observation_description}
Previous actions:
{history_str if history_str else "None yet"}
What should the robot do next? Reply with exactly one skill name and parameters.
Format: skill_name(param1, param2)
"""
return prompt
def get_next_action(self, task: str, observation: dict) -> tuple:
"""Hỏi LLM để lấy action tiếp theo."""
# Mô tả observation bằng text (hoặc dùng VLM)
obs_description = self._describe_observation(observation)
prompt = self.create_prompt(task, obs_description)
# Gọi LLM API (ví dụ: OpenAI, Claude, etc.)
response = self._call_llm(prompt)
# Parse response thành skill + params
skill_name, params = self._parse_response(response)
return skill_name, params
def _describe_observation(self, observation: dict) -> str:
"""Mô tả observation bằng text."""
state = observation.get("state", {})
description = []
# Object positions
for key, value in state.items():
if "pos" in key:
obj_name = key.replace("_pos", "")
description.append(
f"{obj_name} is at position ({value[0]:.2f}, {value[1]:.2f}, {value[2]:.2f})"
)
# Gripper state
gripper = state.get("gripper", 1.0)
description.append(f"Gripper is {'open' if gripper > 0.5 else 'closed'}")
return ". ".join(description)
# Sử dụng
planner = LLMTaskPlanner(
available_skills=[
"pick(object_name)",
"place(target_position)",
"push(object_name, direction)",
"pour(source, target)",
"spread(tool, surface)",
"open_gripper()",
"close_gripper()",
]
)
skill, params = planner.get_next_action(
task="Make a peanut butter sandwich",
observation=current_obs,
)
print(f"Next action: {skill}({params})")
Failure Recovery
Long-horizon tasks sẽ fail. Quan trọng là biết recovery:
class FailureRecoveryPolicy:
"""Policy với khả năng phát hiện và recovery từ failures."""
def __init__(self, main_policy, recovery_policies: dict):
self.main_policy = main_policy
self.recovery_policies = recovery_policies
self.failure_detector = FailureDetector()
def act_with_recovery(self, observation: dict, max_retries=3) -> np.ndarray:
"""Execute action với failure detection và recovery."""
for attempt in range(max_retries):
# Kiểm tra failure
failure_type = self.failure_detector.detect(observation)
if failure_type is None:
# Không có failure — dùng main policy
return self.main_policy.predict(observation)
print(f"Detected failure: {failure_type} (attempt {attempt+1}/{max_retries})")
# Dùng recovery policy tương ứng
if failure_type in self.recovery_policies:
recovery = self.recovery_policies[failure_type]
action = recovery.predict(observation)
return action
else:
# Không có recovery → dừng
print(f"No recovery policy for {failure_type}")
return np.zeros(7)
print("Max retries exceeded!")
return np.zeros(7)
class FailureDetector:
"""Phát hiện các loại failure phổ biến."""
def detect(self, observation: dict) -> Optional[str]:
"""Phát hiện failure từ observation."""
state = observation.get("state", np.zeros(10))
# Failure 1: Object dropped
if self._object_dropped(state):
return "object_dropped"
# Failure 2: Collision detected
if self._collision_detected(state):
return "collision"
# Failure 3: Timeout (stuck)
if self._is_stuck(state):
return "stuck"
return None
def _object_dropped(self, state):
"""Kiểm tra object có bị rơi không."""
gripper_state = state[-1]
object_z = state[2]
return gripper_state < 0.5 and object_z < 0.01 # Gripper đóng nhưng object trên sàn
def _collision_detected(self, state):
"""Kiểm tra va chạm."""
contact_force = state[-2] if len(state) > 8 else 0
return abs(contact_force) > 50.0 # Force threshold
def _is_stuck(self, state):
"""Kiểm tra robot bị kẹt."""
# So sánh vị trí hiện tại với vị trí 10 bước trước
if not hasattr(self, '_position_history'):
self._position_history = []
self._position_history.append(state[:3].copy())
if len(self._position_history) > 30:
self._position_history.pop(0)
if len(self._position_history) >= 30:
movement = np.linalg.norm(
self._position_history[-1] - self._position_history[0]
)
return movement < 0.005 # Di chuyển < 5mm trong 30 bước
return False
Papers tham khảo
- SayCan: Do As I Can, Not As I Say — Ahn et al., 2022 — Dùng LLM + affordance để lập kế hoạch robot
- Inner Monologue: Embodied Reasoning through Planning with Language Models — Huang et al., 2022 — Feedback loop giữa LLM planner và robot
- TAMP Survey: Integrated Task and Motion Planning — Garrett et al., 2021 — Survey về TAMP methods
Kết luận và bước tiếp theo
Long-horizon tasks đòi hỏi tư duy hệ thống — không chỉ policy tốt mà còn cần planning, monitoring, và recovery. Hierarchical decomposition và curriculum learning là hai phương pháp hiệu quả nhất hiện tại.
Bài tiếp theo — Dual-Arm Robot: Setup và Calibration — sẽ mở rộng từ single-arm sang dual-arm, mở ra khả năng thực hiện các task cần hai tay phối hợp.
Bài viết liên quan
- Multi-Object Manipulation — Nền tảng cho long-horizon tasks
- VLA Models cho Robot — Language conditioning cho task planning
- Sim-to-Real Pipeline — Deploy policy phức tạp lên robot thật