ailerobotmulti-objectmanipulationsorting

Multi-Object Manipulation: Sorting, Stacking và Rearranging

Mở rộng từ single-object sang multi-object manipulation — sorting, stacking, rearranging với language-conditioned policies.

Nguyễn Anh Tuấn21 tháng 3, 20269 phút đọc
Multi-Object Manipulation: Sorting, Stacking và Rearranging

Giới thiệu: Từ một vật thể đến nhiều vật thể

Trong bài trước, chúng ta đã thành công train policy cho task pick-and-place một vật thể duy nhất. Nhưng thế giới thực hiếm khi đơn giản như vậy — robot trong nhà máy phải phân loại hàng trăm sản phẩm, robot gia đình phải dọn bàn với nhiều đồ vật khác nhau.

Bài viết này — bài thứ 4 trong series VLA & LeRobot Mastery — sẽ đưa bạn vào thế giới multi-object manipulation: sorting theo màu sắc/hình dạng, stacking khối, và rearranging vật thể đến vị trí đích. Đây là bước nhảy quan trọng về độ phức tạp, đòi hỏi policy phải hiểu ngữ cảnh không gian và thứ tự thực hiện.

Multi-object manipulation scene

Thách thức của Multi-Object Manipulation

So với single-object, multi-object manipulation khó hơn vì:

Thách thức Mô tả Giải pháp
Larger state space Nhiều vật thể = nhiều configurations hơn Cần nhiều demonstrations hơn
Object interaction Vật thể che khuất, va chạm nhau Camera placement cẩn thận
Task ambiguity Gắp cái nào trước? Language conditioning
Longer horizons Nhiều bước hơn per episode Curriculum learning
Generalization Vị trí random mỗi lần Domain randomization

Thiết kế Dataset cho Multi-Object

Tăng số lượng demonstrations

# Rule of thumb cho số demos:
# - Single object: 50-100 demos
# - 2-3 objects: 100-200 demos
# - 4+ objects: 200-500 demos
# - Language-conditioned: 2x thêm cho mỗi task variant

DEMO_REQUIREMENTS = {
    "sort_2_colors": {"min_demos": 100, "variants": 2},  # Red→bin1, Blue→bin2
    "sort_3_colors": {"min_demos": 200, "variants": 6},  # 3! permutations
    "stack_2_blocks": {"min_demos": 100, "variants": 2},  # AB, BA
    "stack_3_blocks": {"min_demos": 300, "variants": 6},  # 3! permutations
    "rearrange_pattern": {"min_demos": 150, "variants": 3},
}

Dataset diversity là chìa khóa

import numpy as np

def randomize_object_poses(n_objects=3, workspace_bounds=None):
    """Tạo vị trí ban đầu ngẫu nhiên cho vật thể.
    
    QUAN TRỌNG: Diversity trong initial poses giúp policy
    generalize tốt hơn. Không bao giờ bắt đầu từ cùng 1 vị trí!
    """
    if workspace_bounds is None:
        workspace_bounds = {
            "x": (-0.15, 0.15),
            "y": (-0.15, 0.15),
            "z": (0.02, 0.02),  # Trên mặt bàn
        }
    
    poses = []
    min_distance = 0.06  # Tối thiểu 6cm giữa các vật thể
    
    for i in range(n_objects):
        while True:
            x = np.random.uniform(*workspace_bounds["x"])
            y = np.random.uniform(*workspace_bounds["y"])
            z = workspace_bounds["z"][0]
            
            # Kiểm tra không overlap với vật thể đã đặt
            valid = True
            for prev_pose in poses:
                dist = np.sqrt((x - prev_pose[0])**2 + (y - prev_pose[1])**2)
                if dist < min_distance:
                    valid = False
                    break
            
            if valid:
                poses.append([x, y, z])
                break
    
    return np.array(poses)


def create_sorting_environment(n_objects=3, n_bins=3):
    """Tạo environment cho task sorting.
    
    Objects: Khối màu (red, green, blue)
    Bins: Vùng đích tương ứng mỗi màu
    """
    import robosuite as suite
    
    env = suite.make(
        env_name="PickPlace",
        robots="Panda",
        has_renderer=True,
        has_offscreen_renderer=True,
        use_camera_obs=True,
        camera_names=["agentview", "robot0_eye_in_hand"],
        camera_heights=480,
        camera_widths=640,
        # Multi-object settings
        num_objects=n_objects,
        object_type="cube",
        bin_type="colored",
    )
    return env

Language-Conditioned Policies

Khi có nhiều vật thể, robot cần biết nên gắp cái nào. Language conditioning cho phép bạn chỉ dẫn robot bằng ngôn ngữ tự nhiên.

Thêm language embeddings vào policy

from transformers import AutoTokenizer, AutoModel
import torch

class LanguageConditionedPolicy:
    """Policy có thể nhận instruction bằng ngôn ngữ.
    
    Ví dụ instructions:
    - "Pick the red block and place it in the left bin"
    - "Stack the blue block on top of the green block"
    - "Sort all blocks by color"
    """
    
    def __init__(self, base_policy, language_model="sentence-transformers/all-MiniLM-L6-v2"):
        self.policy = base_policy
        self.tokenizer = AutoTokenizer.from_pretrained(language_model)
        self.language_encoder = AutoModel.from_pretrained(language_model)
        self.language_encoder.eval()
        
        # Projection layer: language embedding → policy embedding space
        self.lang_proj = torch.nn.Linear(384, 512)  # MiniLM → policy dim
    
    def encode_instruction(self, instruction):
        """Mã hóa instruction thành embedding vector."""
        tokens = self.tokenizer(
            instruction, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=64,
        )
        
        with torch.no_grad():
            output = self.language_encoder(**tokens)
            # Mean pooling
            embedding = output.last_hidden_state.mean(dim=1)
        
        return self.lang_proj(embedding)
    
    def predict_action(self, observation, instruction):
        """Predict action dựa trên observation + language instruction."""
        lang_embedding = self.encode_instruction(instruction)
        
        # Concatenate language embedding vào observation
        observation["language_embedding"] = lang_embedding
        
        with torch.no_grad():
            action = self.policy.select_action(observation)
        
        return action


# Sử dụng
policy = LanguageConditionedPolicy(base_act_policy)
action = policy.predict_action(
    observation=obs,
    instruction="Pick the red cube and place it in the left bin"
)

Dataset với language annotations

from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

def create_language_conditioned_dataset(repo_id):
    """Tạo dataset với language annotations cho mỗi episode."""
    dataset = LeRobotDataset.create(
        repo_id=repo_id,
        fps=30,
        robot_type="franka",
        features={
            "observation.image": {
                "dtype": "video",
                "shape": (480, 640, 3),
                "names": ["height", "width", "channels"],
            },
            "observation.state": {
                "dtype": "float32",
                "shape": (7,),
                "names": ["joint_positions"],
            },
            "action": {
                "dtype": "float32",
                "shape": (7,),
                "names": ["joint_velocities"],
            },
            "language_instruction": {
                "dtype": "string",
                "shape": (1,),
                "names": ["instruction"],
            },
        },
    )
    return dataset


# Các instructions mẫu cho sorting task
SORTING_INSTRUCTIONS = [
    "Sort the red block into the red bin",
    "Sort the blue block into the blue bin",
    "Sort the green block into the green bin",
    "Sort all blocks by color into their matching bins",
    "Pick the red block first, then sort the remaining blocks",
]

Language-conditioned robot manipulation

Ví dụ 1: Sorting 3 khối màu vào bins

Setup environment

import numpy as np
import gymnasium as gym

class ColorSortingEnv:
    """Environment cho task sorting 3 khối màu."""
    
    def __init__(self):
        self.n_objects = 3
        self.colors = ["red", "green", "blue"]
        self.bin_positions = {
            "red": np.array([-0.15, 0.2, 0.0]),
            "green": np.array([0.0, 0.2, 0.0]),
            "blue": np.array([0.15, 0.2, 0.0]),
        }
        self.workspace = {"x": (-0.2, 0.2), "y": (-0.1, 0.1)}
    
    def reset(self):
        """Reset với vị trí object random."""
        self.object_poses = randomize_object_poses(
            self.n_objects, 
            {"x": self.workspace["x"], "y": self.workspace["y"], "z": (0.02, 0.02)}
        )
        self.sorted = [False] * self.n_objects
        return self._get_obs()
    
    def _get_obs(self):
        """Trả về observation."""
        return {
            "image": self._render(),
            "state": np.concatenate([
                self.robot_state,           # 7D joint positions
                self.object_poses.flatten(), # 3*3 = 9D object positions
                np.array(self.sorted, dtype=np.float32),  # 3D sorted flags
            ]),
        }
    
    def check_success(self):
        """Kiểm tra tất cả objects đã sorted đúng chưa."""
        for i, color in enumerate(self.colors):
            obj_pos = self.object_poses[i]
            bin_pos = self.bin_positions[color]
            distance = np.linalg.norm(obj_pos[:2] - bin_pos[:2])
            if distance > 0.03:  # Threshold 3cm
                return False
        return True

Training pipeline cho sorting

def train_sorting_policy(dataset_repo_id, num_epochs=200):
    """Train policy cho multi-object sorting task."""
    from lerobot.common.policies.act.configuration_act import ACTConfig
    from lerobot.common.policies.act.modeling_act import ACTPolicy
    
    dataset = LeRobotDataset(dataset_repo_id)
    
    config = ACTConfig(
        input_shapes={
            "observation.image": [3, 480, 640],
            "observation.state": [19],  # 7 joints + 9 obj pos + 3 sorted flags
        },
        output_shapes={
            "action": [7],
        },
        input_normalization_modes={
            "observation.image": "mean_std",
            "observation.state": "min_max",
        },
        output_normalization_modes={
            "action": "min_max",
        },
        # Larger model cho multi-object
        chunk_size=50,       # Shorter chunks cho precision
        dim_model=512,
        n_heads=8,
        n_layers=4,          # Nhiều layers hơn cho task phức tạp
        use_vae=True,
        kl_weight=10.0,
    )
    
    policy = ACTPolicy(config)
    device = torch.device("cuda")
    policy.to(device)
    
    optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-5)
    dataloader = torch.utils.data.DataLoader(
        dataset, batch_size=8, shuffle=True, num_workers=4
    )
    
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        n_batches = 0
        
        for batch in dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            output = policy.forward(batch)
            loss = output["loss"]
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy.parameters(), 10.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            n_batches += 1
        
        avg_loss = epoch_loss / n_batches
        
        # Save best model
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(policy.state_dict(), "best_sorting_policy.pt")
        
        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | Best: {best_loss:.4f}")
    
    return policy

Ví dụ 2: Block Stacking

Stacking khó hơn sorting vì đòi hỏi precision cao — đặt khối lên khối khác mà không làm đổ.

class StackingTaskConfig:
    """Configuration cho stacking tasks."""
    
    # Thứ tự stacking ảnh hưởng lớn đến difficulty
    DIFFICULTY_LEVELS = {
        "easy": {
            "n_blocks": 2,
            "block_size": 0.04,      # 4cm — dễ gắp
            "tolerance": 0.02,        # 2cm tolerance
            "min_demos": 100,
        },
        "medium": {
            "n_blocks": 3,
            "block_size": 0.03,      # 3cm — cần precision hơn
            "tolerance": 0.015,
            "min_demos": 200,
        },
        "hard": {
            "n_blocks": 4,
            "block_size": 0.025,     # 2.5cm — rất khó
            "tolerance": 0.01,       # 1cm tolerance
            "min_demos": 400,
        },
    }


def evaluate_stacking(policy, env, n_episodes=50):
    """Đánh giá stacking policy.
    
    Metrics:
    - Success rate: Tất cả blocks stacked đúng
    - Partial success: Ít nhất 1 block stacked đúng
    - Average height: Chiều cao trung bình của stack
    """
    results = {
        "full_success": 0,
        "partial_success": 0,
        "avg_blocks_stacked": 0,
        "avg_steps": 0,
    }
    
    for ep in range(n_episodes):
        obs, info = env.reset()
        done = False
        steps = 0
        
        while not done:
            action = policy.predict(obs)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            steps += 1
        
        blocks_stacked = info.get("blocks_stacked", 0)
        total_blocks = info.get("total_blocks", 3)
        
        if blocks_stacked == total_blocks:
            results["full_success"] += 1
        if blocks_stacked > 0:
            results["partial_success"] += 1
        
        results["avg_blocks_stacked"] += blocks_stacked
        results["avg_steps"] += steps
    
    results["full_success"] /= n_episodes
    results["partial_success"] /= n_episodes
    results["avg_blocks_stacked"] /= n_episodes
    results["avg_steps"] /= n_episodes
    
    print(f"Full success rate: {results['full_success']:.1%}")
    print(f"Partial success rate: {results['partial_success']:.1%}")
    print(f"Avg blocks stacked: {results['avg_blocks_stacked']:.1f}")
    print(f"Avg steps: {results['avg_steps']:.0f}")
    
    return results

Evaluation với Randomized Object Poses

def evaluate_with_randomization(policy, env, n_episodes=100, n_pose_variants=5):
    """Đánh giá policy với nhiều initial configurations khác nhau.
    
    Mỗi episode chạy với n_pose_variants vị trí ban đầu khác nhau
    để đo generalization ability.
    """
    per_variant_results = {i: [] for i in range(n_pose_variants)}
    
    for variant in range(n_pose_variants):
        # Set random seed khác nhau cho mỗi variant
        np.random.seed(42 + variant)
        
        successes = 0
        for ep in range(n_episodes):
            obs, info = env.reset()
            done = False
            
            while not done:
                action = policy.predict(obs)
                obs, reward, terminated, truncated, info = env.step(action)
                done = terminated or truncated
            
            if info.get("is_success", False):
                successes += 1
        
        success_rate = successes / n_episodes
        per_variant_results[variant] = success_rate
        print(f"Variant {variant}: {success_rate:.1%} success rate")
    
    # Tổng hợp
    rates = list(per_variant_results.values())
    print(f"\nOverall: {np.mean(rates):.1%} +/- {np.std(rates):.1%}")
    
    return per_variant_results

Papers tham khảo

  1. CLIPort: What and Where Pathways for Robotic ManipulationShridhar et al., CoRL 2022 — Language-conditioned manipulation kết hợp CLIP
  2. PerAct: Perceiver-Actor for Multi-Task Robotic ManipulationShridhar et al., CoRL 2023 — Multi-task manipulation trong 3D voxel space
  3. RVT: Robotic View TransformerGoyal et al., CoRL 2023 — Multi-view rendering cho manipulation

Kết luận và bước tiếp theo

Multi-object manipulation là bước nhảy lớn về độ phức tạp. Key takeaways:

  • Dataset diversity quan trọng hơn bao giờ hết — randomize object poses
  • Language conditioning giúp policy biết phải làm gì trong tình huống ambiguous
  • Evaluation phải kỹ lưỡng — test nhiều configurations, không chỉ 1

Bài tiếp theo — Long-Horizon Tasks: Chuỗi hành động cho Multi-Step Manipulation — sẽ nâng level tiếp khi chúng ta kết hợp nhiều sub-tasks thành một chuỗi hành động dài và phức tạp.

Bài viết liên quan

NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWTutorial
Hướng dẫn GigaBrain-0: VLA + World Model + RL
vlaworld-modelreinforcement-learninggigabrainroboticsmanipulation

Hướng dẫn GigaBrain-0: VLA + World Model + RL

Hướng dẫn chi tiết huấn luyện VLA bằng World Model và Reinforcement Learning với framework RAMP từ GigaBrain — open-source, 3.5B params.

12/4/202611 phút đọc
NEWTutorial
SimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPhần 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 phút đọc
NEWTutorial
SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Phần 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 phút đọc