Introduction: From One Object to Many
In the previous post, we successfully trained a policy for a single-object pick-and-place task. But the real world is rarely that simple — factory robots must sort hundreds of products, home robots must clear tables with various objects.
This post — the 4th in the VLA & LeRobot Mastery series — takes you into the world of multi-object manipulation: sorting by color/shape, block stacking, and rearranging objects to target positions. This represents a significant complexity jump, requiring policies to understand spatial context and execution order.
Challenges of Multi-Object Manipulation
Compared to single-object, multi-object manipulation is harder because:
| Challenge | Description | Solution |
|---|---|---|
| Larger state space | More objects = more configurations | Need more demonstrations |
| Object interaction | Objects occlude, collide with each other | Careful camera placement |
| Task ambiguity | Which object to grasp first? | Language conditioning |
| Longer horizons | More steps per episode | Curriculum learning |
| Generalization | Random positions each time | Domain randomization |
Designing Datasets for Multi-Object Tasks
Scaling Up Demonstrations
# Rule of thumb for demo counts:
# - Single object: 50-100 demos
# - 2-3 objects: 100-200 demos
# - 4+ objects: 200-500 demos
# - Language-conditioned: 2x more for each task variant
DEMO_REQUIREMENTS = {
"sort_2_colors": {"min_demos": 100, "variants": 2}, # Red->bin1, Blue->bin2
"sort_3_colors": {"min_demos": 200, "variants": 6}, # 3! permutations
"stack_2_blocks": {"min_demos": 100, "variants": 2}, # AB, BA
"stack_3_blocks": {"min_demos": 300, "variants": 6}, # 3! permutations
"rearrange_pattern": {"min_demos": 150, "variants": 3},
}
Dataset Diversity Is Key
import numpy as np
def randomize_object_poses(n_objects=3, workspace_bounds=None):
"""Generate random initial poses for objects.
IMPORTANT: Diversity in initial poses helps the policy
generalize better. Never start from the same position!
"""
if workspace_bounds is None:
workspace_bounds = {
"x": (-0.15, 0.15),
"y": (-0.15, 0.15),
"z": (0.02, 0.02), # On the table
}
poses = []
min_distance = 0.06 # Minimum 6cm between objects
for i in range(n_objects):
while True:
x = np.random.uniform(*workspace_bounds["x"])
y = np.random.uniform(*workspace_bounds["y"])
z = workspace_bounds["z"][0]
# Check no overlap with placed objects
valid = True
for prev_pose in poses:
dist = np.sqrt((x - prev_pose[0])**2 + (y - prev_pose[1])**2)
if dist < min_distance:
valid = False
break
if valid:
poses.append([x, y, z])
break
return np.array(poses)
def create_sorting_environment(n_objects=3, n_bins=3):
"""Create environment for sorting task.
Objects: Colored cubes (red, green, blue)
Bins: Target zones corresponding to each color
"""
import robosuite as suite
env = suite.make(
env_name="PickPlace",
robots="Panda",
has_renderer=True,
has_offscreen_renderer=True,
use_camera_obs=True,
camera_names=["agentview", "robot0_eye_in_hand"],
camera_heights=480,
camera_widths=640,
num_objects=n_objects,
object_type="cube",
bin_type="colored",
)
return env
Language-Conditioned Policies
With multiple objects, the robot needs to know which one to grasp. Language conditioning allows you to instruct the robot using natural language.
Adding Language Embeddings to Policy
from transformers import AutoTokenizer, AutoModel
import torch
class LanguageConditionedPolicy:
"""Policy that can receive natural language instructions.
Example instructions:
- "Pick the red block and place it in the left bin"
- "Stack the blue block on top of the green block"
- "Sort all blocks by color"
"""
def __init__(self, base_policy, language_model="sentence-transformers/all-MiniLM-L6-v2"):
self.policy = base_policy
self.tokenizer = AutoTokenizer.from_pretrained(language_model)
self.language_encoder = AutoModel.from_pretrained(language_model)
self.language_encoder.eval()
# Projection layer: language embedding -> policy embedding space
self.lang_proj = torch.nn.Linear(384, 512) # MiniLM -> policy dim
def encode_instruction(self, instruction):
"""Encode instruction into embedding vector."""
tokens = self.tokenizer(
instruction,
return_tensors="pt",
padding=True,
truncation=True,
max_length=64,
)
with torch.no_grad():
output = self.language_encoder(**tokens)
embedding = output.last_hidden_state.mean(dim=1)
return self.lang_proj(embedding)
def predict_action(self, observation, instruction):
"""Predict action based on observation + language instruction."""
lang_embedding = self.encode_instruction(instruction)
observation["language_embedding"] = lang_embedding
with torch.no_grad():
action = self.policy.select_action(observation)
return action
# Usage
policy = LanguageConditionedPolicy(base_act_policy)
action = policy.predict_action(
observation=obs,
instruction="Pick the red cube and place it in the left bin"
)
Dataset with Language Annotations
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
def create_language_conditioned_dataset(repo_id):
"""Create dataset with language annotations for each episode."""
dataset = LeRobotDataset.create(
repo_id=repo_id,
fps=30,
robot_type="franka",
features={
"observation.image": {
"dtype": "video",
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
},
"observation.state": {
"dtype": "float32",
"shape": (7,),
"names": ["joint_positions"],
},
"action": {
"dtype": "float32",
"shape": (7,),
"names": ["joint_velocities"],
},
"language_instruction": {
"dtype": "string",
"shape": (1,),
"names": ["instruction"],
},
},
)
return dataset
# Sample instructions for sorting task
SORTING_INSTRUCTIONS = [
"Sort the red block into the red bin",
"Sort the blue block into the blue bin",
"Sort the green block into the green bin",
"Sort all blocks by color into their matching bins",
"Pick the red block first, then sort the remaining blocks",
]
Example 1: Sorting 3 Colored Blocks into Bins
Environment Setup
import numpy as np
class ColorSortingEnv:
"""Environment for sorting 3 colored blocks."""
def __init__(self):
self.n_objects = 3
self.colors = ["red", "green", "blue"]
self.bin_positions = {
"red": np.array([-0.15, 0.2, 0.0]),
"green": np.array([0.0, 0.2, 0.0]),
"blue": np.array([0.15, 0.2, 0.0]),
}
self.workspace = {"x": (-0.2, 0.2), "y": (-0.1, 0.1)}
def reset(self):
"""Reset with randomized object positions."""
self.object_poses = randomize_object_poses(
self.n_objects,
{"x": self.workspace["x"], "y": self.workspace["y"], "z": (0.02, 0.02)}
)
self.sorted = [False] * self.n_objects
return self._get_obs()
def _get_obs(self):
"""Return observation."""
return {
"image": self._render(),
"state": np.concatenate([
self.robot_state, # 7D joint positions
self.object_poses.flatten(), # 3*3 = 9D object positions
np.array(self.sorted, dtype=np.float32), # 3D sorted flags
]),
}
def check_success(self):
"""Check if all objects are correctly sorted."""
for i, color in enumerate(self.colors):
obj_pos = self.object_poses[i]
bin_pos = self.bin_positions[color]
distance = np.linalg.norm(obj_pos[:2] - bin_pos[:2])
if distance > 0.03: # 3cm threshold
return False
return True
Training Pipeline for Sorting
def train_sorting_policy(dataset_repo_id, num_epochs=200):
"""Train policy for multi-object sorting task."""
from lerobot.common.policies.act.configuration_act import ACTConfig
from lerobot.common.policies.act.modeling_act import ACTPolicy
dataset = LeRobotDataset(dataset_repo_id)
config = ACTConfig(
input_shapes={
"observation.image": [3, 480, 640],
"observation.state": [19], # 7 joints + 9 obj pos + 3 sorted flags
},
output_shapes={
"action": [7],
},
input_normalization_modes={
"observation.image": "mean_std",
"observation.state": "min_max",
},
output_normalization_modes={
"action": "min_max",
},
chunk_size=50, # Shorter chunks for precision
dim_model=512,
n_heads=8,
n_layers=4, # More layers for complex tasks
use_vae=True,
kl_weight=10.0,
)
policy = ACTPolicy(config)
device = torch.device("cuda")
policy.to(device)
optimizer = torch.optim.AdamW(policy.parameters(), lr=1e-5)
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=8, shuffle=True, num_workers=4
)
best_loss = float('inf')
for epoch in range(num_epochs):
epoch_loss = 0
n_batches = 0
for batch in dataloader:
batch = {k: v.to(device) for k, v in batch.items()}
output = policy.forward(batch)
loss = output["loss"]
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(policy.parameters(), 10.0)
optimizer.step()
epoch_loss += loss.item()
n_batches += 1
avg_loss = epoch_loss / n_batches
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(policy.state_dict(), "best_sorting_policy.pt")
if (epoch + 1) % 20 == 0:
print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f} | Best: {best_loss:.4f}")
return policy
Example 2: Block Stacking
Stacking is harder than sorting because it demands high precision — placing a block on another without toppling.
class StackingTaskConfig:
"""Configuration for stacking tasks."""
DIFFICULTY_LEVELS = {
"easy": {
"n_blocks": 2,
"block_size": 0.04, # 4cm — easy to grasp
"tolerance": 0.02, # 2cm tolerance
"min_demos": 100,
},
"medium": {
"n_blocks": 3,
"block_size": 0.03, # 3cm — needs more precision
"tolerance": 0.015,
"min_demos": 200,
},
"hard": {
"n_blocks": 4,
"block_size": 0.025, # 2.5cm — very challenging
"tolerance": 0.01, # 1cm tolerance
"min_demos": 400,
},
}
def evaluate_stacking(policy, env, n_episodes=50):
"""Evaluate stacking policy.
Metrics:
- Success rate: All blocks correctly stacked
- Partial success: At least 1 block correctly stacked
- Average height: Average stack height
"""
results = {
"full_success": 0,
"partial_success": 0,
"avg_blocks_stacked": 0,
"avg_steps": 0,
}
for ep in range(n_episodes):
obs, info = env.reset()
done = False
steps = 0
while not done:
action = policy.predict(obs)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
steps += 1
blocks_stacked = info.get("blocks_stacked", 0)
total_blocks = info.get("total_blocks", 3)
if blocks_stacked == total_blocks:
results["full_success"] += 1
if blocks_stacked > 0:
results["partial_success"] += 1
results["avg_blocks_stacked"] += blocks_stacked
results["avg_steps"] += steps
results["full_success"] /= n_episodes
results["partial_success"] /= n_episodes
results["avg_blocks_stacked"] /= n_episodes
results["avg_steps"] /= n_episodes
print(f"Full success rate: {results['full_success']:.1%}")
print(f"Partial success rate: {results['partial_success']:.1%}")
print(f"Avg blocks stacked: {results['avg_blocks_stacked']:.1f}")
print(f"Avg steps: {results['avg_steps']:.0f}")
return results
Evaluation with Randomized Object Poses
def evaluate_with_randomization(policy, env, n_episodes=100, n_pose_variants=5):
"""Evaluate policy with multiple initial configurations.
Each episode runs with n_pose_variants different initial positions
to measure generalization ability.
"""
per_variant_results = {i: [] for i in range(n_pose_variants)}
for variant in range(n_pose_variants):
np.random.seed(42 + variant)
successes = 0
for ep in range(n_episodes):
obs, info = env.reset()
done = False
while not done:
action = policy.predict(obs)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
if info.get("is_success", False):
successes += 1
success_rate = successes / n_episodes
per_variant_results[variant] = success_rate
print(f"Variant {variant}: {success_rate:.1%} success rate")
rates = list(per_variant_results.values())
print(f"\nOverall: {np.mean(rates):.1%} +/- {np.std(rates):.1%}")
return per_variant_results
Reference Papers
- CLIPort: What and Where Pathways for Robotic Manipulation — Shridhar et al., CoRL 2022 — Language-conditioned manipulation combining CLIP
- PerAct: Perceiver-Actor for Multi-Task Robotic Manipulation — Shridhar et al., CoRL 2023 — Multi-task manipulation in 3D voxel space
- RVT: Robotic View Transformer — Goyal et al., CoRL 2023 — Multi-view rendering for manipulation
Conclusion and Next Steps
Multi-object manipulation is a major complexity jump. Key takeaways:
- Dataset diversity matters more than ever — randomize object poses
- Language conditioning helps policies know what to do in ambiguous situations
- Evaluation must be thorough — test many configurations, not just one
The next post — Long-Horizon Tasks: Chaining Actions for Multi-Step Manipulation — will level up further as we combine multiple sub-tasks into long, complex action sequences.
Related Posts
- Training Single-Arm Policies — Foundation before handling multi-object
- VLA Models: Vision-Language-Action — Language conditioning for robots
- Imitation Learning for Manipulation — Theoretical foundations