← Back to Blog
navigationfleetamrprogramming

Multi-Robot Coordination: Task Allocation Algorithms

Task allocation algorithms for robot teams — from Hungarian algorithm, auction-based to RL-based approaches.

Nguyen Anh Tuan20 tháng 3, 202611 min read
Multi-Robot Coordination: Task Allocation Algorithms

The Multi-Robot Task Allocation (MRTA) Problem

Multi-robot coordination starts with a simple question: given N robots and M tasks, assign each robot to a task such that the total cost (time, energy, distance) is minimized. This is the MRTA problem — the foundation of every modern fleet management system.

In a real warehouse scenario, MRTA is constant: 10 AMRs in a warehouse, with 5-10 new picking orders every minute. Which robot is closest? Which has enough battery? Which is idle? The allocation algorithm determines the entire fleet's performance — a small mistake can reduce throughput by 30-40%.

MRTA Classification (Gerkey-Matarić Taxonomy)

Gerkey and Matarić (2004) classify MRTA along 3 dimensions:

Most warehouse applications fall into ST-SR-IA: each robot does 1 task, each task needs 1 robot, assignments are made immediately when new tasks arrive. This is the simplest form with optimal solutions.

Autonomous AMR robots in modern logistics warehouse

Hungarian Algorithm — Optimal for 1-to-1 Assignment

The Hungarian algorithm (Kuhn-Munkres) solves the assignment problem in O(n³) time — finding the 1-to-1 assignment between N robots and N tasks that minimizes total cost. This is a globally optimal algorithm, meaning no better assignment exists.

Algorithm Concept

  1. Build a cost matrix C[i][j] = cost for robot i to perform task j
  2. Subtract the minimum value in each row from that row
  3. Subtract the minimum value in each column from that column
  4. Find an assignment where all selected values equal 0
  5. If not found, adjust matrix and repeat

Python Implementation with scipy

import numpy as np
from scipy.optimize import linear_sum_assignment

def allocate_tasks_hungarian(robot_positions, task_positions):
    """
    Allocate tasks to robots using Hungarian Algorithm.

    Args:
        robot_positions: list of (x, y) - current position of each robot
        task_positions: list of (x, y) - position of each task

    Returns:
        assignments: list of (robot_idx, task_idx) pairs
        total_cost: total travel distance
    """
    n_robots = len(robot_positions)
    n_tasks = len(task_positions)

    # Build cost matrix (Euclidean distance)
    cost_matrix = np.zeros((n_robots, n_tasks))
    for i, r_pos in enumerate(robot_positions):
        for j, t_pos in enumerate(task_positions):
            cost_matrix[i][j] = np.sqrt(
                (r_pos[0] - t_pos[0])**2 +
                (r_pos[1] - t_pos[1])**2
            )

    # Solve using scipy (Hungarian algorithm internally)
    row_ind, col_ind = linear_sum_assignment(cost_matrix)

    assignments = list(zip(row_ind.tolist(), col_ind.tolist()))
    total_cost = cost_matrix[row_ind, col_ind].sum()

    return assignments, total_cost

# Example: 4 robots, 4 tasks in factory
robots = [(0, 0), (10, 0), (0, 10), (10, 10)]
tasks = [(2, 3), (8, 1), (1, 8), (9, 9)]

assignments, cost = allocate_tasks_hungarian(robots, tasks)
print(f"Optimal allocation (total distance: {cost:.1f}m):")
for r_idx, t_idx in assignments:
    print(f"  Robot {r_idx} → Task {t_idx} "
          f"({robots[r_idx]} → {tasks[t_idx]})")

Output:

Optimal allocation (total distance: 10.6m):
  Robot 0 → Task 0 ((0, 0) → (2, 3))
  Robot 1 → Task 1 ((10, 0) → (8, 1))
  Robot 2 → Task 2 ((0, 10) → (1, 8))
  Robot 3 → Task 3 ((10, 10) → (9, 9))

Hungarian Algorithm Limitations

Auction-Based Allocation — Decentralized and Flexible

Auction-based allocation mimics an auction mechanism: each task is "announced", robots "bid" (propose prices), and the robot with the best bid wins the task. This is decentralized — each robot calculates its bid based on local information, without needing a central server to know the entire fleet state.

Sequential Single-Item Auction

This is the simplest form:

  1. Auctioneer (server or robot leader) announces a new task
  2. Each robot calculates its bid = estimated cost to complete the task
  3. Robots send bids to auctioneer
  4. Auctioneer picks robot with lowest bid, assigns task
  5. Repeat for next task

Python Implementation

import random
from dataclasses import dataclass, field
from typing import Optional
import math

@dataclass
class Robot:
    id: int
    x: float
    y: float
    battery: float  # 0.0 - 1.0
    current_task: Optional[int] = None

    def compute_bid(self, task) -> float:
        """Compute bid based on distance and battery level."""
        distance = math.sqrt(
            (self.x - task.x)**2 + (self.y - task.y)**2
        )
        # Penalty if battery low (higher bid = less want to accept task)
        battery_penalty = 1.0 / max(self.battery, 0.1)
        # Penalty if busy
        busy_penalty = 2.0 if self.current_task is not None else 1.0

        return distance * battery_penalty * busy_penalty

@dataclass
class Task:
    id: int
    x: float
    y: float
    priority: int = 1  # 1=normal, 2=urgent, 3=critical

@dataclass
class AuctionResult:
    task_id: int
    robot_id: int
    bid_value: float

def auction_allocate(robots: list[Robot],
                     tasks: list[Task]) -> list[AuctionResult]:
    """
    Sequential single-item auction allocation.
    Tasks sorted by priority (critical first).
    """
    results = []
    available_robots = {r.id: r for r in robots}

    # Prioritize critical tasks first
    sorted_tasks = sorted(tasks, key=lambda t: -t.priority)

    for task in sorted_tasks:
        if not available_robots:
            print(f"  Task {task.id}: No robots available!")
            break

        # Collect bids from all available robots
        bids = {}
        for robot in available_robots.values():
            bid = robot.compute_bid(task)
            bids[robot.id] = bid
            print(f"  Robot {robot.id} bids {bid:.2f} "
                  f"for Task {task.id}")

        # Robot with lowest bid wins
        winner_id = min(bids, key=bids.get)
        winner = available_robots.pop(winner_id)
        winner.current_task = task.id

        results.append(AuctionResult(
            task_id=task.id,
            robot_id=winner_id,
            bid_value=bids[winner_id]
        ))
        print(f"  → Robot {winner_id} wins Task {task.id} "
              f"(bid: {bids[winner_id]:.2f})\n")

    return results

# Demo
robots = [
    Robot(0, x=1.0, y=1.0, battery=0.9),
    Robot(1, x=8.0, y=2.0, battery=0.3),  # low battery
    Robot(2, x=5.0, y=7.0, battery=0.8),
    Robot(3, x=9.0, y=9.0, battery=0.6),
]

tasks = [
    Task(0, x=2.0, y=3.0, priority=1),
    Task(1, x=7.0, y=1.0, priority=3),   # critical
    Task(2, x=4.0, y=8.0, priority=2),   # urgent
]

print("=== Auction-based Task Allocation ===\n")
results = auction_allocate(robots, tasks)

print("\n=== Final Results ===")
for r in results:
    print(f"Task {r.task_id} → Robot {r.robot_id} "
          f"(bid: {r.bid_value:.2f})")

Advantages of Auction-Based

Disadvantages

Automated manufacturing facility with coordinated robots

Market-Based Approach — Best of Both Worlds

Market-based allocation extends auction-based by allowing robots to trade tasks after initial assignment. Idea: after the initial auction, if Robot A realizes swapping tasks with Robot B benefits both, they negotiate and swap.

Consensus-Based Bundle Algorithm (CBBA)

CBBA is the most popular market-based algorithm for multi-robot systems:

  1. Bundle Phase: Each robot builds its "bundle" — list of tasks it wants, sorted by priority
  2. Consensus Phase: Robots exchange info with neighbors, resolve conflicts (2 robots wanting same task)
  3. Repeat until convergence (nobody wants to trade)

CBBA guarantees convergence in O(N × M) communication rounds and produces near-optimal results (within 50% of optimal in worst case, but usually much better).

Reinforcement Learning for Dynamic Task Allocation

The above methods solve the "static snapshot" problem — given current state, find best assignment. But in reality, tasks arrive continuously and unpredictably. Reinforcement Learning (RL) learns a policy for long-term allocation — accepting suboptimal assignments now if we know critical tasks will arrive nearby in 5 minutes.

Formulation

Multi-Agent RL (MARL)

For large fleets, single-agent RL doesn't scale. MARL treats each robot as an independent agent:

import numpy as np

class SimpleTaskAllocationEnv:
    """
    Simplified multi-agent environment for task allocation.
    Each robot (agent) decides which task to accept.
    """

    def __init__(self, n_robots=4, grid_size=10):
        self.n_robots = n_robots
        self.grid_size = grid_size
        self.reset()

    def reset(self):
        # Random robot positions
        self.robot_pos = np.random.randint(
            0, self.grid_size, size=(self.n_robots, 2)
        )
        self.robot_battery = np.ones(self.n_robots)

        # Random tasks
        self.tasks = []
        for _ in range(self.n_robots * 2):
            self.tasks.append({
                'pos': np.random.randint(
                    0, self.grid_size, size=2
                ),
                'assigned': False,
                'completed': False,
            })

        return self._get_obs()

    def _get_obs(self):
        """Observation for each robot: own position + all tasks."""
        obs = {}
        for i in range(self.n_robots):
            obs[i] = {
                'my_pos': self.robot_pos[i],
                'battery': self.robot_battery[i],
                'tasks': [
                    t['pos'] for t in self.tasks
                    if not t['assigned']
                ],
            }
        return obs

    def step(self, actions: dict):
        """
        actions: {robot_id: task_index}
        Returns: obs, rewards, done
        """
        rewards = {}
        for robot_id, task_idx in actions.items():
            if task_idx < len(self.tasks) \
               and not self.tasks[task_idx]['assigned']:
                task = self.tasks[task_idx]
                dist = np.linalg.norm(
                    self.robot_pos[robot_id] - task['pos']
                )
                # Reward = negative distance (closer = higher reward)
                rewards[robot_id] = -dist / self.grid_size
                task['assigned'] = True
                self.robot_pos[robot_id] = task['pos']
            else:
                rewards[robot_id] = -1.0  # penalty for invalid action

        done = all(t['assigned'] for t in self.tasks)
        return self._get_obs(), rewards, done

# Demo: random vs greedy policy
env = SimpleTaskAllocationEnv(n_robots=3, grid_size=20)

# Greedy: each robot picks closest task
obs = env.reset()
total_reward = 0
step = 0
while True:
    actions = {}
    assigned_tasks = set()
    for robot_id in range(env.n_robots):
        robot_obs = obs[robot_id]
        if not robot_obs['tasks']:
            continue

        # Find closest unassigned task in this step
        min_dist = float('inf')
        best_task = None
        for idx, t in enumerate(env.tasks):
            if t['assigned'] or idx in assigned_tasks:
                continue
            dist = np.linalg.norm(
                robot_obs['my_pos'] - t['pos']
            )
            if dist < min_dist:
                min_dist = dist
                best_task = idx

        if best_task is not None:
            actions[robot_id] = best_task
            assigned_tasks.add(best_task)

    if not actions:
        break

    obs, rewards, done = env.step(actions)
    step_reward = sum(rewards.values())
    total_reward += step_reward
    step += 1

    if done:
        break

print(f"Greedy policy: {step} steps, "
      f"total reward: {total_reward:.2f}")

Research Results

Recent papers (ICRA 2025, RSS 2025) show MARL-based allocation outperforms auction-based by 15-25% in scenarios with:

However, RL requires significant training time and is hard to interpret (black box), so most production systems still use auction-based or hybrid approaches.

Traffic Management: Conflict Detection and Resolution

Task allocation is only half the problem. The other half: ensuring robots don't collide on their routes.

Conflict Detection

Two main conflict types:

Priority-Based Resolution

Simplest approach: assign priority to each robot.

from enum import IntEnum

class Priority(IntEnum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    EMERGENCY = 4

def resolve_conflict(robot_a, robot_b, conflict_point):
    """
    Resolve conflict at a point.
    Lower-priority robot yields.
    """
    if robot_a.priority > robot_b.priority:
        # Robot A goes first, B waits
        robot_b.wait_at(robot_b.current_position)
        return robot_a.id
    elif robot_b.priority > robot_a.priority:
        robot_a.wait_at(robot_a.current_position)
        return robot_b.id
    else:
        # Equal priority → closer robot goes first
        dist_a = distance(robot_a.current_position,
                         conflict_point)
        dist_b = distance(robot_b.current_position,
                         conflict_point)
        if dist_a < dist_b:
            robot_b.wait_at(robot_b.current_position)
            return robot_a.id
        else:
            robot_a.wait_at(robot_a.current_position)
            return robot_b.id

CBS — Conflict-Based Search

For more complex scenarios, CBS (Sharon et al., 2015) is state-of-the-art multi-agent pathfinding:

  1. Low-level: Find shortest path for each robot (A*)
  2. High-level: Detect conflicts, create constraints, replan
  3. Repeat until no conflicts

CBS is optimal for MAPF (Multi-Agent Path Finding) but exponential in worst case. Bounded-suboptimal variants like ECBS give near-optimal results in polynomial time.

Engineers programming and monitoring autonomous robot systems

Choosing the Right Algorithm

Scenario Recommended Algorithm
Small fleet (< 10), simple tasks Hungarian algorithm
Medium fleet (10-50), dynamic tasks Sequential auction-based
Large fleet (50+), heterogeneous CBBA or market-based
Research / complex tasks Multi-agent RL
Traffic management CBS or priority-based

In practice, production systems often combine approaches: use auction for task allocation and CBS for path planning. Open-RMF uses schedule-based traffic management combined with configurable task dispatcher — a good balance between performance and complexity.


Related Articles

Related Posts

Docker + K3s trên edge: GitOps cho robot fleet
devopsfleetkubernetes

Docker + K3s trên edge: GitOps cho robot fleet

Hướng dẫn triển khai Docker và K3s trên edge device — quản lý, cập nhật OTA và giám sát hàng trăm robot với GitOps workflow.

10/3/20268 min read
Deep DiveImitation Learning: BC, DAgger và DAPG cho robot
ai-perceptionimitation-learningprogrammingPart 2

Imitation Learning: BC, DAgger và DAPG cho robot

Tại sao imitation learning quan trọng hơn RL cho nhiều bài toán manipulation — cách thu thập data và train policy.

8/3/202611 min read
TutorialRL cho Robotics: PPO, SAC và cách chọn algorithm
ai-perceptionrlprogrammingPart 1

RL cho Robotics: PPO, SAC và cách chọn algorithm

Tổng quan RL algorithms cho robotics — PPO, SAC, TD3 và hướng dẫn chọn algorithm phù hợp cho từng bài toán robot.

5/3/202610 min read