← Back to Blog
ailerobotmulti-objectmanipulationsorting

Multi-Object Manipulation: Sorting, Stacking & Rearranging

Expanding from single-object to multi-object manipulation — sorting, stacking, rearranging with language-conditioned policies.

Nguyễn Anh Tuấn21 tháng 3, 20268 min read
Multi-Object Manipulation: Sorting, Stacking & Rearranging

Introduction: From One Object to Many

In the previous post, we successfully trained a policy for a single-object pick-and-place task. But the real world is rarely that simple — factory robots must sort hundreds of products, home robots must clear tables with various objects.

This post — the 4th in the VLA & LeRobot Mastery series — takes you into the world of multi-object manipulation: sorting by color/shape, block stacking, and rearranging objects to target positions. This represents a significant complexity jump, requiring policies to understand spatial context and execution order.

Multi-object manipulation scene

Challenges of Multi-Object Manipulation

Compared to single-object, multi-object manipulation is harder because:

Challenge Description Solution
Larger state space More objects = more configurations Need more demonstrations
Object interaction Objects occlude, collide with each other Careful camera placement
Task ambiguity Which object to grasp first? Language conditioning
Longer horizons More steps per episode Curriculum learning
Generalization Random positions each time Domain randomization

Designing Datasets for Multi-Object Tasks

Scaling Up Demonstrations

# Rule of thumb for demo counts:
# - Single object: 50-100 demos
# - 2-3 objects: 100-200 demos
# - 4+ objects: 200-500 demos
# - Language-conditioned: 2x more for each task variant

DEMO_REQUIREMENTS = {
    "sort_2_colors": {"min_demos": 100, "variants": 2},  # Red->bin1, Blue->bin2
    "sort_3_colors": {"min_demos": 200, "variants": 6},  # 3! permutations
    "stack_2_blocks": {"min_demos": 100, "variants": 2},  # AB, BA
    "stack_3_blocks": {"min_demos": 300, "variants": 6},  # 3! permutations
    "rearrange_pattern": {"min_demos": 150, "variants": 3},
}

Dataset Diversity Is Key

import numpy as np

def randomize_object_poses(n_objects=3, workspace_bounds=None):
    """Generate random initial poses for objects.
    
    IMPORTANT: Diversity in initial poses helps the policy
    generalize better. Never start from the same position!
    """
    if workspace_bounds is None:
        workspace_bounds = {
            "x": (-0.15, 0.15),
            "y": (-0.15, 0.15),
            "z": (0.02, 0.02),  # On the table
        }
    
    poses = []
    min_distance = 0.06  # Minimum 6cm between objects
    
    for i in range(n_objects):
        while True:
            x = np.random.uniform(*workspace_bounds["x"])
            y = np.random.uniform(*workspace_bounds["y"])
            z = workspace_bounds["z"][0]
            
            # Check no overlap with placed objects
            valid = True
            for prev_pose in poses:
                dist = np.sqrt((x - prev_pose[0])**2 + (y - prev_pose[1])**2)
                if dist < min_distance:
                    valid = False
                    break
            
            if valid:
                poses.append([x, y, z])
                break
    
    return np.array(poses)


def create_sorting_environment(n_objects=3, n_bins=3):
    """Create environment for sorting task.
    
    Objects: Colored cubes (red, green, blue)
    Bins: Target zones corresponding to each color
    """
    import robosuite as suite
    
    env = suite.make(
        env_name="PickPlace",
        robots="Panda",
        has_renderer=True,
        has_offscreen_renderer=True,
        use_camera_obs=True,
        camera_names=["agentview", "robot0_eye_in_hand"],
        camera_heights=480,
        camera_widths=640,
        num_objects=n_objects,
        object_type="cube",
        bin_type="colored",
    )
    return env

Language-Conditioned Policies

With multiple objects, the robot needs to know which one to grasp. Language conditioning allows you to instruct the robot using natural language.

Adding Language Embeddings to Policy

from transformers import AutoTokenizer, AutoModel
import torch

class LanguageConditionedPolicy:
    """Policy that can receive natural language instructions.
    
    Example instructions:
    - "Pick the red block and place it in the left bin"
    - "Stack the blue block on top of the green block"
    - "Sort all blocks by color"
    """
    
    def __init__(self, base_policy, language_model="sentence-transformers/all-MiniLM-L6-v2"):
        self.policy = base_policy
        self.tokenizer = AutoTokenizer.from_pretrained(language_model)
        self.language_encoder = AutoModel.from_pretrained(language_model)
        self.language_encoder.eval()
        
        # Projection layer: language embedding -> policy embedding space
        self.lang_proj = torch.nn.Linear(384, 512)  # MiniLM -> policy dim
    
    def encode_instruction(self, instruction):
        """Encode instruction into embedding vector."""
        tokens = self.tokenizer(
            instruction, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=64,
        )
        
        with torch.no_grad():
            output = self.language_encoder(**tokens)
            embedding = output.last_hidden_state.mean(dim=1)
        
        return self.lang_proj(embedding)
    
    def predict_action(self, observation, instruction):
        """Predict action based on observation + language instruction."""
        lang_embedding = self.encode_instruction(instruction)
        observation["language_embedding"] = lang_embedding
        
        with torch.no_grad():
            action = self.policy.select_action(observation)
        
        return action


# Usage
policy = LanguageConditionedPolicy(base_act_policy)
action = policy.predict_action(
    observation=obs,
    instruction="Pick the red cube and place it in the left bin"
)

Dataset with Language Annotations

from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

def create_language_conditioned_dataset(repo_id):
    """Create dataset with language annotations for each episode."""
    dataset = LeRobotDataset.create(
        repo_id=repo_id,
        fps=30,
        robot_type="franka",
        features={
            "observation.image": {
                "dtype": "video",
                "shape": (480, 640, 3),
                "names": ["height", "width", "channels"],
            },
            "observation.state": {
                "dtype": "float32",
                "shape": (7,),
                "names": ["joint_positions"],
            },
            "action": {
                "dtype": "float32",
                "shape": (7,),
                "names": ["joint_velocities"],
            },
            "language_instruction": {
                "dtype": "string",
                "shape": (1,),
                "names": ["instruction"],
            },
        },
    )
    return dataset


# Sample instructions for sorting task
SORTING_INSTRUCTIONS = [
    "Sort the red block into the red bin",
    "Sort the blue block into the blue bin",
    "Sort the green block into the green bin",
    "Sort all blocks by color into their matching bins",
    "Pick the red block first, then sort the remaining blocks",
]

Language-conditioned robot manipulation

Example 1: Sorting 3 Colored Blocks into Bins

Environment Setup

import numpy as np

class ColorSortingEnv:
    """Environment for sorting 3 colored blocks."""
    
    def __init__(self):
        self.n_objects = 3
        self.colors = ["red", "green", "blue"]
        self.bin_positions = {
            "red": np.array([-0.15, 0.2, 0.0]),
            "green": np.array([0.0, 0.2, 0.0]),
            "blue": np.array([0.15, 0.2, 0.0]),
        }
        self.workspace = {"x": (-0.2, 0.2), "y": (-0.1, 0.1)}
    
    def reset(self):
        """Reset with randomized object positions."""
        self.object_poses = randomize_object_poses(
            self.n_objects, 
            {"x": self.workspace["x"], "y": self.workspace["y"], "z": (0.02, 0.02)}
        )
        self.sorted = [False] * self.n_objects
        return self._get_obs()
    
    def _get_obs(self):
        """Return observation."""
        return {
            "image": self._render(),
            "state": np.concatenate([
                self.robot_state,            # 7D joint positions
                self.object_poses.flatten(),  # 3*3 = 9D object positions
                np.array(self.sorted, dtype=np.float32),  # 3D sorted flags
            ]),
        }
    
    def check_success(self):
        """Check if all objects are correctly sorted."""
        for i, color in enumerate(self.colors):
            obj_pos = self.object_poses[i]
            bin_pos = self.bin_positions[color]
            distance = np.linalg.norm(obj_pos[:2] - bin_pos[:2])
            if distance > 0.03:  # 3cm threshold
                return False
        return True

Training Pipeline for Sorting

def train_sorting_policy(dataset_repo_id, num_epochs=200):
    """Train policy for multi-object sorting task."""
    from lerobot.common.policies.act.configuration_act import ACTConfig
    from lerobot.common.policies.act.modeling_act import ACTPolicy
    
    dataset = LeRobotDataset(dataset_repo_id)
    
    config = ACTConfig(
        input_shapes={
            "observation.image": [3, 480, 640],
            "observation.state": [19],  # 7 joints + 9 obj pos + 3 sorted flags
        },
        output_shapes={
            "action": [7],
        },
        input_normalization_modes={
            "observation.image": "mean_std",
            "observation.state": "min_max",
        },
        output_normalization_modes={
            "action": "min_max",
        },
        chunk_size=50,       # Shorter chunks for precision
        dim_model=512,
        n_heads=8,
        n_layers=4,          # More layers for complex tasks
        use_vae=True,
        kl_weight=10.0,
    )
    
    policy = ACTPolicy(config)
    device = torch.device("cuda")
    policy.to(device)
    
    optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-5)
    dataloader = torch.utils.data.DataLoader(
        dataset, batch_size=8, shuffle=True, num_workers=4
    )
    
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        n_batches = 0
        
        for batch in dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            output = policy.forward(batch)
            loss = output["loss"]
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy.parameters(), 10.0)
            optimizer.step()
            
            epoch_loss += loss.item()
            n_batches += 1
        
        avg_loss = epoch_loss / n_batches
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(policy.state_dict(), "best_sorting_policy.pt")
        
        if (epoch + 1) % 20 == 0:
            print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | Best: {best_loss:.4f}")
    
    return policy

Example 2: Block Stacking

Stacking is harder than sorting because it demands high precision — placing a block on another without toppling.

class StackingTaskConfig:
    """Configuration for stacking tasks."""
    
    DIFFICULTY_LEVELS = {
        "easy": {
            "n_blocks": 2,
            "block_size": 0.04,      # 4cm — easy to grasp
            "tolerance": 0.02,        # 2cm tolerance
            "min_demos": 100,
        },
        "medium": {
            "n_blocks": 3,
            "block_size": 0.03,      # 3cm — needs more precision
            "tolerance": 0.015,
            "min_demos": 200,
        },
        "hard": {
            "n_blocks": 4,
            "block_size": 0.025,     # 2.5cm — very challenging
            "tolerance": 0.01,       # 1cm tolerance
            "min_demos": 400,
        },
    }


def evaluate_stacking(policy, env, n_episodes=50):
    """Evaluate stacking policy.
    
    Metrics:
    - Success rate: All blocks correctly stacked
    - Partial success: At least 1 block correctly stacked
    - Average height: Average stack height
    """
    results = {
        "full_success": 0,
        "partial_success": 0,
        "avg_blocks_stacked": 0,
        "avg_steps": 0,
    }
    
    for ep in range(n_episodes):
        obs, info = env.reset()
        done = False
        steps = 0
        
        while not done:
            action = policy.predict(obs)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            steps += 1
        
        blocks_stacked = info.get("blocks_stacked", 0)
        total_blocks = info.get("total_blocks", 3)
        
        if blocks_stacked == total_blocks:
            results["full_success"] += 1
        if blocks_stacked > 0:
            results["partial_success"] += 1
        
        results["avg_blocks_stacked"] += blocks_stacked
        results["avg_steps"] += steps
    
    results["full_success"] /= n_episodes
    results["partial_success"] /= n_episodes
    results["avg_blocks_stacked"] /= n_episodes
    results["avg_steps"] /= n_episodes
    
    print(f"Full success rate: {results['full_success']:.1%}")
    print(f"Partial success rate: {results['partial_success']:.1%}")
    print(f"Avg blocks stacked: {results['avg_blocks_stacked']:.1f}")
    print(f"Avg steps: {results['avg_steps']:.0f}")
    
    return results

Evaluation with Randomized Object Poses

def evaluate_with_randomization(policy, env, n_episodes=100, n_pose_variants=5):
    """Evaluate policy with multiple initial configurations.
    
    Each episode runs with n_pose_variants different initial positions
    to measure generalization ability.
    """
    per_variant_results = {i: [] for i in range(n_pose_variants)}
    
    for variant in range(n_pose_variants):
        np.random.seed(42 + variant)
        
        successes = 0
        for ep in range(n_episodes):
            obs, info = env.reset()
            done = False
            
            while not done:
                action = policy.predict(obs)
                obs, reward, terminated, truncated, info = env.step(action)
                done = terminated or truncated
            
            if info.get("is_success", False):
                successes += 1
        
        success_rate = successes / n_episodes
        per_variant_results[variant] = success_rate
        print(f"Variant {variant}: {success_rate:.1%} success rate")
    
    rates = list(per_variant_results.values())
    print(f"\nOverall: {np.mean(rates):.1%} +/- {np.std(rates):.1%}")
    
    return per_variant_results

Reference Papers

  1. CLIPort: What and Where Pathways for Robotic ManipulationShridhar et al., CoRL 2022 — Language-conditioned manipulation combining CLIP
  2. PerAct: Perceiver-Actor for Multi-Task Robotic ManipulationShridhar et al., CoRL 2023 — Multi-task manipulation in 3D voxel space
  3. RVT: Robotic View TransformerGoyal et al., CoRL 2023 — Multi-view rendering for manipulation

Conclusion and Next Steps

Multi-object manipulation is a major complexity jump. Key takeaways:

The next post — Long-Horizon Tasks: Chaining Actions for Multi-Step Manipulation — will level up further as we combine multiple sub-tasks into long, complex action sequences.

Related Posts

Related Posts

ComparisonSimpleVLA-RL (5): So sánh với LeRobot
ai-perceptionvlareinforcement-learninglerobotresearchPart 5

SimpleVLA-RL (5): So sánh với LeRobot

So sánh chi tiết SimpleVLA-RL và LeRobot: RL approach, VLA models, sim vs real, data efficiency — hai framework bổ trợ nhau.

11/4/202612 min read
TutorialSimpleVLA-RL (6): OpenArm — Phân tích Lộ trình
openarmvlareinforcement-learninglerobotpi0Part 6

SimpleVLA-RL (6): OpenArm — Phân tích Lộ trình

Phân tích chi tiết cách tiếp cận training robot OpenArm 7-DoF gắp hộp carton — so sánh 2 lộ trình: LeRobot native vs SimpleVLA-RL.

11/4/202613 min read
TutorialSimpleVLA-RL (7): Collect Data cho OpenArm
openarmlerobotdata-collectionteleoperationPart 7

SimpleVLA-RL (7): Collect Data cho OpenArm

Hướng dẫn từng bước setup OpenArm, calibrate, teleoperate và thu thập 50 episodes gắp hộp carton với LeRobot.

11/4/202616 min read