The Multi-Robot Task Allocation (MRTA) Problem
Multi-robot coordination starts with a simple question: given N robots and M tasks, assign each robot to a task such that the total cost (time, energy, distance) is minimized. This is the MRTA problem — the foundation of every modern fleet management system.
In a real warehouse scenario, MRTA is constant: 10 AMRs in a warehouse, with 5-10 new picking orders every minute. Which robot is closest? Which has enough battery? Which is idle? The allocation algorithm determines the entire fleet's performance — a small mistake can reduce throughput by 30-40%.
MRTA Classification (Gerkey-Matarić Taxonomy)
Gerkey and Matarić (2004) classify MRTA along 3 dimensions:
- Single-Task (ST) vs Multi-Task (MT): Can a robot do only 1 task or multiple tasks simultaneously?
- Single-Robot (SR) vs Multi-Robot (MR): Does each task need 1 robot or multiple robots cooperating?
- Instantaneous (IA) vs Time-extended (TA): Do we optimize for current assignment or plan long-term?
Most warehouse applications fall into ST-SR-IA: each robot does 1 task, each task needs 1 robot, assignments are made immediately when new tasks arrive. This is the simplest form with optimal solutions.
Hungarian Algorithm — Optimal for 1-to-1 Assignment
The Hungarian algorithm (Kuhn-Munkres) solves the assignment problem in O(n³) time — finding the 1-to-1 assignment between N robots and N tasks that minimizes total cost. This is a globally optimal algorithm, meaning no better assignment exists.
Algorithm Concept
- Build a cost matrix C[i][j] = cost for robot i to perform task j
- Subtract the minimum value in each row from that row
- Subtract the minimum value in each column from that column
- Find an assignment where all selected values equal 0
- If not found, adjust matrix and repeat
Python Implementation with scipy
import numpy as np
from scipy.optimize import linear_sum_assignment
def allocate_tasks_hungarian(robot_positions, task_positions):
"""
Allocate tasks to robots using Hungarian Algorithm.
Args:
robot_positions: list of (x, y) - current position of each robot
task_positions: list of (x, y) - position of each task
Returns:
assignments: list of (robot_idx, task_idx) pairs
total_cost: total travel distance
"""
n_robots = len(robot_positions)
n_tasks = len(task_positions)
# Build cost matrix (Euclidean distance)
cost_matrix = np.zeros((n_robots, n_tasks))
for i, r_pos in enumerate(robot_positions):
for j, t_pos in enumerate(task_positions):
cost_matrix[i][j] = np.sqrt(
(r_pos[0] - t_pos[0])**2 +
(r_pos[1] - t_pos[1])**2
)
# Solve using scipy (Hungarian algorithm internally)
row_ind, col_ind = linear_sum_assignment(cost_matrix)
assignments = list(zip(row_ind.tolist(), col_ind.tolist()))
total_cost = cost_matrix[row_ind, col_ind].sum()
return assignments, total_cost
# Example: 4 robots, 4 tasks in factory
robots = [(0, 0), (10, 0), (0, 10), (10, 10)]
tasks = [(2, 3), (8, 1), (1, 8), (9, 9)]
assignments, cost = allocate_tasks_hungarian(robots, tasks)
print(f"Optimal allocation (total distance: {cost:.1f}m):")
for r_idx, t_idx in assignments:
print(f" Robot {r_idx} → Task {t_idx} "
f"({robots[r_idx]} → {tasks[t_idx]})")
Output:
Optimal allocation (total distance: 10.6m):
Robot 0 → Task 0 ((0, 0) → (2, 3))
Robot 1 → Task 1 ((10, 0) → (8, 1))
Robot 2 → Task 2 ((0, 10) → (1, 8))
Robot 3 → Task 3 ((10, 10) → (9, 9))
Hungarian Algorithm Limitations
- Requires number of robots = number of tasks (can pad with dummy)
- Only considers current costs, doesn't account for future tasks
- Centralized — requires computation at server, doesn't scale for fleet > 100 robots
- Cannot handle complex constraints (battery level, robot type, deadline)
Auction-Based Allocation — Decentralized and Flexible
Auction-based allocation mimics an auction mechanism: each task is "announced", robots "bid" (propose prices), and the robot with the best bid wins the task. This is decentralized — each robot calculates its bid based on local information, without needing a central server to know the entire fleet state.
Sequential Single-Item Auction
This is the simplest form:
- Auctioneer (server or robot leader) announces a new task
- Each robot calculates its bid = estimated cost to complete the task
- Robots send bids to auctioneer
- Auctioneer picks robot with lowest bid, assigns task
- Repeat for next task
Python Implementation
import random
from dataclasses import dataclass, field
from typing import Optional
import math
@dataclass
class Robot:
id: int
x: float
y: float
battery: float # 0.0 - 1.0
current_task: Optional[int] = None
def compute_bid(self, task) -> float:
"""Compute bid based on distance and battery level."""
distance = math.sqrt(
(self.x - task.x)**2 + (self.y - task.y)**2
)
# Penalty if battery low (higher bid = less want to accept task)
battery_penalty = 1.0 / max(self.battery, 0.1)
# Penalty if busy
busy_penalty = 2.0 if self.current_task is not None else 1.0
return distance * battery_penalty * busy_penalty
@dataclass
class Task:
id: int
x: float
y: float
priority: int = 1 # 1=normal, 2=urgent, 3=critical
@dataclass
class AuctionResult:
task_id: int
robot_id: int
bid_value: float
def auction_allocate(robots: list[Robot],
tasks: list[Task]) -> list[AuctionResult]:
"""
Sequential single-item auction allocation.
Tasks sorted by priority (critical first).
"""
results = []
available_robots = {r.id: r for r in robots}
# Prioritize critical tasks first
sorted_tasks = sorted(tasks, key=lambda t: -t.priority)
for task in sorted_tasks:
if not available_robots:
print(f" Task {task.id}: No robots available!")
break
# Collect bids from all available robots
bids = {}
for robot in available_robots.values():
bid = robot.compute_bid(task)
bids[robot.id] = bid
print(f" Robot {robot.id} bids {bid:.2f} "
f"for Task {task.id}")
# Robot with lowest bid wins
winner_id = min(bids, key=bids.get)
winner = available_robots.pop(winner_id)
winner.current_task = task.id
results.append(AuctionResult(
task_id=task.id,
robot_id=winner_id,
bid_value=bids[winner_id]
))
print(f" → Robot {winner_id} wins Task {task.id} "
f"(bid: {bids[winner_id]:.2f})\n")
return results
# Demo
robots = [
Robot(0, x=1.0, y=1.0, battery=0.9),
Robot(1, x=8.0, y=2.0, battery=0.3), # low battery
Robot(2, x=5.0, y=7.0, battery=0.8),
Robot(3, x=9.0, y=9.0, battery=0.6),
]
tasks = [
Task(0, x=2.0, y=3.0, priority=1),
Task(1, x=7.0, y=1.0, priority=3), # critical
Task(2, x=4.0, y=8.0, priority=2), # urgent
]
print("=== Auction-based Task Allocation ===\n")
results = auction_allocate(robots, tasks)
print("\n=== Final Results ===")
for r in results:
print(f"Task {r.task_id} → Robot {r.robot_id} "
f"(bid: {r.bid_value:.2f})")
Advantages of Auction-Based
- Decentralized: Robots calculate bids independently, reduces server load
- Flexible: Easy to add/remove robots and tasks dynamically (new task = new auction)
- Extensible: Bid function can include battery level, deadline, cargo type
- Robust: If one robot goes offline, others continue bidding normally
Disadvantages
- Not guaranteed globally optimal (suboptimal by 10-15% vs Hungarian)
- Latency waiting for all robots to submit bids
- Complex consensus protocol in unstable networks
Market-Based Approach — Best of Both Worlds
Market-based allocation extends auction-based by allowing robots to trade tasks after initial assignment. Idea: after the initial auction, if Robot A realizes swapping tasks with Robot B benefits both, they negotiate and swap.
Consensus-Based Bundle Algorithm (CBBA)
CBBA is the most popular market-based algorithm for multi-robot systems:
- Bundle Phase: Each robot builds its "bundle" — list of tasks it wants, sorted by priority
- Consensus Phase: Robots exchange info with neighbors, resolve conflicts (2 robots wanting same task)
- Repeat until convergence (nobody wants to trade)
CBBA guarantees convergence in O(N × M) communication rounds and produces near-optimal results (within 50% of optimal in worst case, but usually much better).
Reinforcement Learning for Dynamic Task Allocation
The above methods solve the "static snapshot" problem — given current state, find best assignment. But in reality, tasks arrive continuously and unpredictably. Reinforcement Learning (RL) learns a policy for long-term allocation — accepting suboptimal assignments now if we know critical tasks will arrive nearby in 5 minutes.
Formulation
- State: Position of all robots, task queue, battery levels, occupancy map
- Action: Assign task T to robot R, or keep robot idle
- Reward: -1 per step task incomplete, +10 when task done, -100 on deadline miss
Multi-Agent RL (MARL)
For large fleets, single-agent RL doesn't scale. MARL treats each robot as an independent agent:
import numpy as np
class SimpleTaskAllocationEnv:
"""
Simplified multi-agent environment for task allocation.
Each robot (agent) decides which task to accept.
"""
def __init__(self, n_robots=4, grid_size=10):
self.n_robots = n_robots
self.grid_size = grid_size
self.reset()
def reset(self):
# Random robot positions
self.robot_pos = np.random.randint(
0, self.grid_size, size=(self.n_robots, 2)
)
self.robot_battery = np.ones(self.n_robots)
# Random tasks
self.tasks = []
for _ in range(self.n_robots * 2):
self.tasks.append({
'pos': np.random.randint(
0, self.grid_size, size=2
),
'assigned': False,
'completed': False,
})
return self._get_obs()
def _get_obs(self):
"""Observation for each robot: own position + all tasks."""
obs = {}
for i in range(self.n_robots):
obs[i] = {
'my_pos': self.robot_pos[i],
'battery': self.robot_battery[i],
'tasks': [
t['pos'] for t in self.tasks
if not t['assigned']
],
}
return obs
def step(self, actions: dict):
"""
actions: {robot_id: task_index}
Returns: obs, rewards, done
"""
rewards = {}
for robot_id, task_idx in actions.items():
if task_idx < len(self.tasks) \
and not self.tasks[task_idx]['assigned']:
task = self.tasks[task_idx]
dist = np.linalg.norm(
self.robot_pos[robot_id] - task['pos']
)
# Reward = negative distance (closer = higher reward)
rewards[robot_id] = -dist / self.grid_size
task['assigned'] = True
self.robot_pos[robot_id] = task['pos']
else:
rewards[robot_id] = -1.0 # penalty for invalid action
done = all(t['assigned'] for t in self.tasks)
return self._get_obs(), rewards, done
# Demo: random vs greedy policy
env = SimpleTaskAllocationEnv(n_robots=3, grid_size=20)
# Greedy: each robot picks closest task
obs = env.reset()
total_reward = 0
step = 0
while True:
actions = {}
assigned_tasks = set()
for robot_id in range(env.n_robots):
robot_obs = obs[robot_id]
if not robot_obs['tasks']:
continue
# Find closest unassigned task in this step
min_dist = float('inf')
best_task = None
for idx, t in enumerate(env.tasks):
if t['assigned'] or idx in assigned_tasks:
continue
dist = np.linalg.norm(
robot_obs['my_pos'] - t['pos']
)
if dist < min_dist:
min_dist = dist
best_task = idx
if best_task is not None:
actions[robot_id] = best_task
assigned_tasks.add(best_task)
if not actions:
break
obs, rewards, done = env.step(actions)
step_reward = sum(rewards.values())
total_reward += step_reward
step += 1
if done:
break
print(f"Greedy policy: {step} steps, "
f"total reward: {total_reward:.2f}")
Research Results
Recent papers (ICRA 2025, RSS 2025) show MARL-based allocation outperforms auction-based by 15-25% in scenarios with:
- High and irregular task arrival rates
- Heterogeneous fleet (robots with different capabilities)
- Bottlenecks (narrow corridors, elevators)
However, RL requires significant training time and is hard to interpret (black box), so most production systems still use auction-based or hybrid approaches.
Traffic Management: Conflict Detection and Resolution
Task allocation is only half the problem. The other half: ensuring robots don't collide on their routes.
Conflict Detection
Two main conflict types:
- Vertex conflict: 2 robots reach same point at same time
- Edge conflict: 2 robots travel opposite directions on same path segment
Priority-Based Resolution
Simplest approach: assign priority to each robot.
from enum import IntEnum
class Priority(IntEnum):
LOW = 1
NORMAL = 2
HIGH = 3
EMERGENCY = 4
def resolve_conflict(robot_a, robot_b, conflict_point):
"""
Resolve conflict at a point.
Lower-priority robot yields.
"""
if robot_a.priority > robot_b.priority:
# Robot A goes first, B waits
robot_b.wait_at(robot_b.current_position)
return robot_a.id
elif robot_b.priority > robot_a.priority:
robot_a.wait_at(robot_a.current_position)
return robot_b.id
else:
# Equal priority → closer robot goes first
dist_a = distance(robot_a.current_position,
conflict_point)
dist_b = distance(robot_b.current_position,
conflict_point)
if dist_a < dist_b:
robot_b.wait_at(robot_b.current_position)
return robot_a.id
else:
robot_a.wait_at(robot_a.current_position)
return robot_b.id
CBS — Conflict-Based Search
For more complex scenarios, CBS (Sharon et al., 2015) is state-of-the-art multi-agent pathfinding:
- Low-level: Find shortest path for each robot (A*)
- High-level: Detect conflicts, create constraints, replan
- Repeat until no conflicts
CBS is optimal for MAPF (Multi-Agent Path Finding) but exponential in worst case. Bounded-suboptimal variants like ECBS give near-optimal results in polynomial time.
Choosing the Right Algorithm
| Scenario | Recommended Algorithm |
|---|---|
| Small fleet (< 10), simple tasks | Hungarian algorithm |
| Medium fleet (10-50), dynamic tasks | Sequential auction-based |
| Large fleet (50+), heterogeneous | CBBA or market-based |
| Research / complex tasks | Multi-agent RL |
| Traffic management | CBS or priority-based |
In practice, production systems often combine approaches: use auction for task allocation and CBS for path planning. Open-RMF uses schedule-based traffic management combined with configurable task dispatcher — a good balance between performance and complexity.
Related Articles
- Robot Fleet Management in Smart Factories — Fleet management system architecture overview
- Open-RMF: Open-Source Robot Fleet Management — Multi-vendor fleet management framework with ROS 2
- Robot Control Programming with Python — Python fundamentals for robotics