Giới thiệu: Từ một vật thể đến nhiều vật thể
Trong bài trước, chúng ta đã thành công train policy cho task pick-and-place một vật thể duy nhất. Nhưng thế giới thực hiếm khi đơn giản như vậy — robot trong nhà máy phải phân loại hàng trăm sản phẩm, robot gia đình phải dọn bàn với nhiều đồ vật khác nhau.
Bài viết này — bài thứ 4 trong series VLA & LeRobot Mastery — sẽ đưa bạn vào thế giới multi-object manipulation: sorting theo màu sắc/hình dạng, stacking khối, và rearranging vật thể đến vị trí đích. Đây là bước nhảy quan trọng về độ phức tạp, đòi hỏi policy phải hiểu ngữ cảnh không gian và thứ tự thực hiện.
Thách thức của Multi-Object Manipulation
So với single-object, multi-object manipulation khó hơn vì:
| Thách thức | Mô tả | Giải pháp |
|---|---|---|
| Larger state space | Nhiều vật thể = nhiều configurations hơn | Cần nhiều demonstrations hơn |
| Object interaction | Vật thể che khuất, va chạm nhau | Camera placement cẩn thận |
| Task ambiguity | Gắp cái nào trước? | Language conditioning |
| Longer horizons | Nhiều bước hơn per episode | Curriculum learning |
| Generalization | Vị trí random mỗi lần | Domain randomization |
Thiết kế Dataset cho Multi-Object
Tăng số lượng demonstrations
# Rule of thumb cho số demos:
# - Single object: 50-100 demos
# - 2-3 objects: 100-200 demos
# - 4+ objects: 200-500 demos
# - Language-conditioned: 2x thêm cho mỗi task variant
DEMO_REQUIREMENTS = {
"sort_2_colors": {"min_demos": 100, "variants": 2}, # Red→bin1, Blue→bin2
"sort_3_colors": {"min_demos": 200, "variants": 6}, # 3! permutations
"stack_2_blocks": {"min_demos": 100, "variants": 2}, # AB, BA
"stack_3_blocks": {"min_demos": 300, "variants": 6}, # 3! permutations
"rearrange_pattern": {"min_demos": 150, "variants": 3},
}
Dataset diversity là chìa khóa
import numpy as np
def randomize_object_poses(n_objects=3, workspace_bounds=None):
"""Tạo vị trí ban đầu ngẫu nhiên cho vật thể.
QUAN TRỌNG: Diversity trong initial poses giúp policy
generalize tốt hơn. Không bao giờ bắt đầu từ cùng 1 vị trí!
"""
if workspace_bounds is None:
workspace_bounds = {
"x": (-0.15, 0.15),
"y": (-0.15, 0.15),
"z": (0.02, 0.02), # Trên mặt bàn
}
poses = []
min_distance = 0.06 # Tối thiểu 6cm giữa các vật thể
for i in range(n_objects):
while True:
x = np.random.uniform(*workspace_bounds["x"])
y = np.random.uniform(*workspace_bounds["y"])
z = workspace_bounds["z"][0]
# Kiểm tra không overlap với vật thể đã đặt
valid = True
for prev_pose in poses:
dist = np.sqrt((x - prev_pose[0])**2 + (y - prev_pose[1])**2)
if dist < min_distance:
valid = False
break
if valid:
poses.append([x, y, z])
break
return np.array(poses)
def create_sorting_environment(n_objects=3, n_bins=3):
"""Tạo environment cho task sorting.
Objects: Khối màu (red, green, blue)
Bins: Vùng đích tương ứng mỗi màu
"""
import robosuite as suite
env = suite.make(
env_name="PickPlace",
robots="Panda",
has_renderer=True,
has_offscreen_renderer=True,
use_camera_obs=True,
camera_names=["agentview", "robot0_eye_in_hand"],
camera_heights=480,
camera_widths=640,
# Multi-object settings
num_objects=n_objects,
object_type="cube",
bin_type="colored",
)
return env
Language-Conditioned Policies
Khi có nhiều vật thể, robot cần biết nên gắp cái nào. Language conditioning cho phép bạn chỉ dẫn robot bằng ngôn ngữ tự nhiên.
Thêm language embeddings vào policy
from transformers import AutoTokenizer, AutoModel
import torch
class LanguageConditionedPolicy:
"""Policy có thể nhận instruction bằng ngôn ngữ.
Ví dụ instructions:
- "Pick the red block and place it in the left bin"
- "Stack the blue block on top of the green block"
- "Sort all blocks by color"
"""
def __init__(self, base_policy, language_model="sentence-transformers/all-MiniLM-L6-v2"):
self.policy = base_policy
self.tokenizer = AutoTokenizer.from_pretrained(language_model)
self.language_encoder = AutoModel.from_pretrained(language_model)
self.language_encoder.eval()
# Projection layer: language embedding → policy embedding space
self.lang_proj = torch.nn.Linear(384, 512) # MiniLM → policy dim
def encode_instruction(self, instruction):
"""Mã hóa instruction thành embedding vector."""
tokens = self.tokenizer(
instruction,
return_tensors="pt",
padding=True,
truncation=True,
max_length=64,
)
with torch.no_grad():
output = self.language_encoder(**tokens)
# Mean pooling
embedding = output.last_hidden_state.mean(dim=1)
return self.lang_proj(embedding)
def predict_action(self, observation, instruction):
"""Predict action dựa trên observation + language instruction."""
lang_embedding = self.encode_instruction(instruction)
# Concatenate language embedding vào observation
observation["language_embedding"] = lang_embedding
with torch.no_grad():
action = self.policy.select_action(observation)
return action
# Sử dụng
policy = LanguageConditionedPolicy(base_act_policy)
action = policy.predict_action(
observation=obs,
instruction="Pick the red cube and place it in the left bin"
)
Dataset với language annotations
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
def create_language_conditioned_dataset(repo_id):
"""Tạo dataset với language annotations cho mỗi episode."""
dataset = LeRobotDataset.create(
repo_id=repo_id,
fps=30,
robot_type="franka",
features={
"observation.image": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
},
"observation.state": {
"dtype": "float32",
"shape": (7,),
"names": ["joint_positions"],
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": ["joint_velocities"],
},
"language_instruction": {
"dtype": "string",
"shape": (1,),
"names": ["instruction"],
},
},
)
return dataset
# Các instructions mẫu cho sorting task
SORTING_INSTRUCTIONS = [
"Sort the red block into the red bin",
"Sort the blue block into the blue bin",
"Sort the green block into the green bin",
"Sort all blocks by color into their matching bins",
"Pick the red block first, then sort the remaining blocks",
]
Ví dụ 1: Sorting 3 khối màu vào bins
Setup environment
import numpy as np
import gymnasium as gym
class ColorSortingEnv:
"""Environment cho task sorting 3 khối màu."""
def __init__(self):
self.n_objects = 3
self.colors = ["red", "green", "blue"]
self.bin_positions = {
"red": np.array([-0.15, 0.2, 0.0]),
"green": np.array([0.0, 0.2, 0.0]),
"blue": np.array([0.15, 0.2, 0.0]),
}
self.workspace = {"x": (-0.2, 0.2), "y": (-0.1, 0.1)}
def reset(self):
"""Reset với vị trí object random."""
self.object_poses = randomize_object_poses(
self.n_objects,
{"x": self.workspace["x"], "y": self.workspace["y"], "z": (0.02, 0.02)}
)
self.sorted = [False] * self.n_objects
return self._get_obs()
def _get_obs(self):
"""Trả về observation."""
return {
"image": self._render(),
"state": np.concatenate([
self.robot_state, # 7D joint positions
self.object_poses.flatten(), # 3*3 = 9D object positions
np.array(self.sorted, dtype=np.float32), # 3D sorted flags
]),
}
def check_success(self):
"""Kiểm tra tất cả objects đã sorted đúng chưa."""
for i, color in enumerate(self.colors):
obj_pos = self.object_poses[i]
bin_pos = self.bin_positions[color]
distance = np.linalg.norm(obj_pos[:2] - bin_pos[:2])
if distance > 0.03: # Threshold 3cm
return False
return True
Training pipeline cho sorting
def train_sorting_policy(dataset_repo_id, num_epochs=200):
"""Train policy cho multi-object sorting task."""
from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
dataset = LeRobotDataset(dataset_repo_id)
config = ACTConfig(
input_shapes={
"observation.image": [3, 480, 640],
"observation.state": [19], # 7 joints + 9 obj pos + 3 sorted flags
},
output_shapes={
"action": [7],
},
input_normalization_modes={
"observation.image": "mean_std",
"observation.state": "min_max",
},
output_normalization_modes={
"action": "min_max",
},
# Larger model cho multi-object
chunk_size=50, # Shorter chunks cho precision
dim_model=512,
n_heads=8,
n_layers=4, # Nhiều layers hơn cho task phức tạp
use_vae=True,
kl_weight=10.0,
)
policy = ACTPolicy(config)
device = torch.device("cuda")
policy.to(device)
optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-5)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=8, shuffle=True, num_workers=4
)
best_loss = float('inf')
for epoch in range(num_epochs):
epoch_loss = 0
n_batches = 0
for batch in dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
output = policy.forward(batch)
loss = output["loss"]
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(policy.parameters(), 10.0)
optimizer.step()
epoch_loss += loss.item()
n_batches += 1
avg_loss = epoch_loss / n_batches
# Save best model
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(policy.state_dict(), "best_sorting_policy.pt")
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | Best: {best_loss:.4f}")
return policy
Ví dụ 2: Block Stacking
Stacking khó hơn sorting vì đòi hỏi precision cao — đặt khối lên khối khác mà không làm đổ.
class StackingTaskConfig:
"""Configuration cho stacking tasks."""
# Thứ tự stacking ảnh hưởng lớn đến difficulty
DIFFICULTY_LEVELS = {
"easy": {
"n_blocks": 2,
"block_size": 0.04, # 4cm — dễ gắp
"tolerance": 0.02, # 2cm tolerance
"min_demos": 100,
},
"medium": {
"n_blocks": 3,
"block_size": 0.03, # 3cm — cần precision hơn
"tolerance": 0.015,
"min_demos": 200,
},
"hard": {
"n_blocks": 4,
"block_size": 0.025, # 2.5cm — rất khó
"tolerance": 0.01, # 1cm tolerance
"min_demos": 400,
},
}
def evaluate_stacking(policy, env, n_episodes=50):
"""Đánh giá stacking policy.
Metrics:
- Success rate: Tất cả blocks stacked đúng
- Partial success: Ít nhất 1 block stacked đúng
- Average height: Chiều cao trung bình của stack
"""
results = {
"full_success": 0,
"partial_success": 0,
"avg_blocks_stacked": 0,
"avg_steps": 0,
}
for ep in range(n_episodes):
obs, info = env.reset()
done = False
steps = 0
while not done:
action = policy.predict(obs)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
steps += 1
blocks_stacked = info.get("blocks_stacked", 0)
total_blocks = info.get("total_blocks", 3)
if blocks_stacked == total_blocks:
results["full_success"] += 1
if blocks_stacked > 0:
results["partial_success"] += 1
results["avg_blocks_stacked"] += blocks_stacked
results["avg_steps"] += steps
results["full_success"] /= n_episodes
results["partial_success"] /= n_episodes
results["avg_blocks_stacked"] /= n_episodes
results["avg_steps"] /= n_episodes
print(f"Full success rate: {results['full_success']:.1%}")
print(f"Partial success rate: {results['partial_success']:.1%}")
print(f"Avg blocks stacked: {results['avg_blocks_stacked']:.1f}")
print(f"Avg steps: {results['avg_steps']:.0f}")
return results
Evaluation với Randomized Object Poses
def evaluate_with_randomization(policy, env, n_episodes=100, n_pose_variants=5):
"""Đánh giá policy với nhiều initial configurations khác nhau.
Mỗi episode chạy với n_pose_variants vị trí ban đầu khác nhau
để đo generalization ability.
"""
per_variant_results = {i: [] for i in range(n_pose_variants)}
for variant in range(n_pose_variants):
# Set random seed khác nhau cho mỗi variant
np.random.seed(42 + variant)
successes = 0
for ep in range(n_episodes):
obs, info = env.reset()
done = False
while not done:
action = policy.predict(obs)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
if info.get("is_success", False):
successes += 1
success_rate = successes / n_episodes
per_variant_results[variant] = success_rate
print(f"Variant {variant}: {success_rate:.1%} success rate")
# Tổng hợp
rates = list(per_variant_results.values())
print(f"\nOverall: {np.mean(rates):.1%} +/- {np.std(rates):.1%}")
return per_variant_results
Papers tham khảo
- CLIPort: What and Where Pathways for Robotic Manipulation — Shridhar et al., CoRL 2022 — Language-conditioned manipulation kết hợp CLIP
- PerAct: Perceiver-Actor for Multi-Task Robotic Manipulation — Shridhar et al., CoRL 2023 — Multi-task manipulation trong 3D voxel space
- RVT: Robotic View Transformer — Goyal et al., CoRL 2023 — Multi-view rendering cho manipulation
Kết luận và bước tiếp theo
Multi-object manipulation là bước nhảy lớn về độ phức tạp. Key takeaways:
- Dataset diversity quan trọng hơn bao giờ hết — randomize object poses
- Language conditioning giúp policy biết phải làm gì trong tình huống ambiguous
- Evaluation phải kỹ lưỡng — test nhiều configurations, không chỉ 1
Bài tiếp theo — Long-Horizon Tasks: Chuỗi hành động cho Multi-Step Manipulation — sẽ nâng level tiếp khi chúng ta kết hợp nhiều sub-tasks thành một chuỗi hành động dài và phức tạp.
Bài viết liên quan
- Train Policy cho Single-Arm — Foundation trước khi xử lý multi-object
- VLA Models: Vision-Language-Action — Language conditioning cho robot
- Imitation Learning cho Manipulation — Nền tảng lý thuyết