Bài toán Multi-Robot Task Allocation (MRTA)
Multi-robot coordination bắt đầu từ một câu hỏi đơn giản: có N robot và M nhiệm vụ, gán robot nào cho nhiệm vụ nào để tổng chi phí (thời gian, năng lượng, quãng đường) là nhỏ nhất? Đây chính là bài toán MRTA — Multi-Robot Task Allocation — nền tảng của mọi hệ thống fleet management hiện đại.
Trong thực tế nhà máy, MRTA xuất hiện liên tục: 10 AMR trong kho hàng, mỗi phút có 5-10 đơn picking mới. Robot nào gần nhất? Robot nào còn đủ pin? Robot nào đang rảnh? Thuật toán phân công quyết định hiệu suất toàn bộ fleet — sai lầm nhỏ trong allocation có thể khiến throughput giảm 30-40%.
Phân loại MRTA theo taxonomy Gerkey-Matarić
Gerkey và Matarić (2004) phân loại MRTA theo 3 chiều:
- Single-Task (ST) vs Multi-Task (MT): Robot chỉ làm 1 task hay nhiều task cùng lúc?
- Single-Robot (SR) vs Multi-Robot (MR): Task cần 1 robot hay nhiều robot phối hợp?
- Instantaneous (IA) vs Time-extended (TA): Chỉ xét task hiện tại hay lập kế hoạch dài hạn?
Phần lớn ứng dụng kho hàng thuộc loại ST-SR-IA: mỗi robot làm 1 task, mỗi task cần 1 robot, gán tức thì khi task mới đến. Đây là dạng đơn giản nhất và có thuật toán tối ưu.
Hungarian Algorithm — Tối ưu cho bài toán gán 1-1
Hungarian algorithm (Kuhn-Munkres) giải bài toán assignment problem trong thời gian O(n³) — tìm phương án gán 1-1 giữa N robot và N task sao cho tổng chi phí nhỏ nhất. Đây là thuật toán tối ưu toàn cục, nghĩa là không có phương án gán nào tốt hơn.
Ý tưởng thuật toán
- Xây dựng ma trận chi phí C[i][j] = chi phí để robot i thực hiện task j
- Trừ mỗi hàng cho giá trị nhỏ nhất trong hàng đó
- Trừ mỗi cột cho giá trị nhỏ nhất trong cột đó
- Tìm phương án gán sao cho tất cả các giá trị được chọn đều bằng 0
- Nếu chưa tìm được, điều chỉnh ma trận và lặp lại
Python implementation với scipy
import numpy as np
from scipy.optimize import linear_sum_assignment
def allocate_tasks_hungarian(robot_positions, task_positions):
"""
Phân công nhiệm vụ cho robot sử dụng Hungarian Algorithm.
Args:
robot_positions: list of (x, y) - vị trí hiện tại của mỗi robot
task_positions: list of (x, y) - vị trí của mỗi task
Returns:
assignments: list of (robot_idx, task_idx) pairs
total_cost: tổng khoảng cách di chuyển
"""
n_robots = len(robot_positions)
n_tasks = len(task_positions)
# Xây dựng ma trận chi phí (Euclidean distance)
cost_matrix = np.zeros((n_robots, n_tasks))
for i, r_pos in enumerate(robot_positions):
for j, t_pos in enumerate(task_positions):
cost_matrix[i][j] = np.sqrt(
(r_pos[0] - t_pos[0])**2 +
(r_pos[1] - t_pos[1])**2
)
# Giải bằng scipy (Hungarian algorithm bên trong)
row_ind, col_ind = linear_sum_assignment(cost_matrix)
assignments = list(zip(row_ind.tolist(), col_ind.tolist()))
total_cost = cost_matrix[row_ind, col_ind].sum()
return assignments, total_cost
# Ví dụ: 4 robot, 4 task trong nhà máy
robots = [(0, 0), (10, 0), (0, 10), (10, 10)]
tasks = [(2, 3), (8, 1), (1, 8), (9, 9)]
assignments, cost = allocate_tasks_hungarian(robots, tasks)
print(f"Phân công tối ưu (tổng distance: {cost:.1f}m):")
for r_idx, t_idx in assignments:
print(f" Robot {r_idx} → Task {t_idx} "
f"({robots[r_idx]} → {tasks[t_idx]})")
Output:
Phân công tối ưu (tổng distance: 10.6m):
Robot 0 → Task 0 ((0, 0) → (2, 3))
Robot 1 → Task 1 ((10, 0) → (8, 1))
Robot 2 → Task 2 ((0, 10) → (1, 8))
Robot 3 → Task 3 ((10, 10) → (9, 9))
Hạn chế của Hungarian
- Yêu cầu số robot = số task (có thể pad bằng dummy)
- Chỉ xét cost hiện tại, không tính đến task sẽ đến trong tương lai
- Centralized — cần tính toán tại server, không scale tốt khi fleet > 100 robot
- Không xử lý được ràng buộc phức tạp (mức pin, loại robot, deadline)
Auction-based Allocation — Phân tán và linh hoạt
Auction-based allocation mô phỏng cơ chế đấu giá: mỗi task được "rao bán", các robot "đấu giá" (bid), robot đưa ra bid tốt nhất được gán task. Phương pháp này phân tán — mỗi robot tự tính bid dựa trên thông tin local, không cần server tập trung biết trạng thái toàn bộ fleet.
Sequential Single-Item Auction
Đây là dạng đơn giản nhất:
- Auctioneer (server hoặc robot leader) công bố task mới
- Mỗi robot tính bid = estimated cost để hoàn thành task
- Robot gửi bid về auctioneer
- Auctioneer chọn robot có bid thấp nhất, gán task
- Lặp lại cho task tiếp theo
Python implementation
import random
from dataclasses import dataclass, field
from typing import Optional
import math
@dataclass
class Robot:
id: int
x: float
y: float
battery: float # 0.0 - 1.0
current_task: Optional[int] = None
def compute_bid(self, task) -> float:
"""Tính bid dựa trên khoảng cách và mức pin."""
distance = math.sqrt(
(self.x - task.x)**2 + (self.y - task.y)**2
)
# Penalty nếu pin thấp (bid cao hơn = ít muốn nhận task)
battery_penalty = 1.0 / max(self.battery, 0.1)
# Penalty nếu đang bận
busy_penalty = 2.0 if self.current_task is not None else 1.0
return distance * battery_penalty * busy_penalty
@dataclass
class Task:
id: int
x: float
y: float
priority: int = 1 # 1=normal, 2=urgent, 3=critical
@dataclass
class AuctionResult:
task_id: int
robot_id: int
bid_value: float
def auction_allocate(robots: list[Robot],
tasks: list[Task]) -> list[AuctionResult]:
"""
Sequential single-item auction allocation.
Tasks được sắp xếp theo priority (critical trước).
"""
results = []
available_robots = {r.id: r for r in robots}
# Ưu tiên task critical trước
sorted_tasks = sorted(tasks, key=lambda t: -t.priority)
for task in sorted_tasks:
if not available_robots:
print(f" Task {task.id}: Không còn robot khả dụng!")
break
# Thu thập bids từ tất cả robot available
bids = {}
for robot in available_robots.values():
bid = robot.compute_bid(task)
bids[robot.id] = bid
print(f" Robot {robot.id} bids {bid:.2f} "
f"for Task {task.id}")
# Robot có bid thấp nhất thắng
winner_id = min(bids, key=bids.get)
winner = available_robots.pop(winner_id)
winner.current_task = task.id
results.append(AuctionResult(
task_id=task.id,
robot_id=winner_id,
bid_value=bids[winner_id]
))
print(f" → Robot {winner_id} wins Task {task.id} "
f"(bid: {bids[winner_id]:.2f})\n")
return results
# Demo
robots = [
Robot(0, x=1.0, y=1.0, battery=0.9),
Robot(1, x=8.0, y=2.0, battery=0.3), # pin thấp
Robot(2, x=5.0, y=7.0, battery=0.8),
Robot(3, x=9.0, y=9.0, battery=0.6),
]
tasks = [
Task(0, x=2.0, y=3.0, priority=1),
Task(1, x=7.0, y=1.0, priority=3), # critical
Task(2, x=4.0, y=8.0, priority=2), # urgent
]
print("=== Auction-based Task Allocation ===\n")
results = auction_allocate(robots, tasks)
print("\n=== Kết quả cuối cùng ===")
for r in results:
print(f"Task {r.task_id} → Robot {r.robot_id} "
f"(bid: {r.bid_value:.2f})")
Ưu điểm của auction-based
- Phân tán: Robot tự tính bid, giảm tải cho server
- Linh hoạt: Dễ thêm/bớt robot và task động (khi task mới đến, mở auction mới)
- Extensible: Bid function có thể tính thêm mức pin, deadline, loại hàng
- Robust: Nếu một robot offline, các robot còn lại vẫn đấu giá bình thường
Nhược điểm
- Không đảm bảo tối ưu toàn cục (suboptimal 10-15% so với Hungarian)
- Latency do phải chờ tất cả robot gửi bid
- Consensus protocol phức tạp hơn trong mạng không ổn định
Market-based Approach — Kết hợp tốt nhất hai thế giới
Market-based allocation mở rộng auction-based bằng cách cho phép robot trao đổi task sau khi đã gán. Ý tưởng: sau vòng auction ban đầu, nếu robot A nhận ra rằng hoán đổi task với robot B sẽ có lợi cho cả hai, chúng sẽ tự thương lượng và swap.
Thuật toán Consensus-Based Bundle Algorithm (CBBA)
CBBA là thuật toán market-based phổ biến nhất cho multi-robot:
- Bundle Phase: Mỗi robot tự xây dựng "bundle" — danh sách task nó muốn nhận, sắp theo mức ưu tiên
- Consensus Phase: Robot trao đổi thông tin với neighbors, giải quyết xung đột (2 robot muốn cùng 1 task)
- Lặp lại cho đến khi hội tụ (không ai muốn đổi nữa)
CBBA đảm bảo hội tụ trong O(N × M) vòng giao tiếp và cho kết quả gần tối ưu (within 50% of optimal trong worst case, nhưng thường tốt hơn nhiều).
Reinforcement Learning cho Dynamic Task Allocation
Các phương pháp trên đều giải bài toán "ảnh chụp tĩnh" — cho biết trạng thái hiện tại, tìm assignment tốt nhất. Nhưng trong thực tế, task đến liên tục và không thể dự đoán. Reinforcement Learning (RL) học chính sách phân công dài hạn — chấp nhận gán suboptimal ngay bây giờ nếu biết 5 phút nữa sẽ có task critical ở gần robot đó.
Formulation
- State: Vị trí tất cả robot, task queue, mức pin, occupancy map
- Action: Gán task T cho robot R, hoặc giữ robot idle
- Reward: -1 cho mỗi bước task chưa hoàn thành, +10 khi task xong, -100 khi deadline miss
Multi-Agent RL (MARL)
Với fleet lớn, single-agent RL không scale. MARL cho phép mỗi robot là một agent độc lập:
import numpy as np
class SimpleTaskAllocationEnv:
"""
Simplified multi-agent environment cho task allocation.
Mỗi robot (agent) quyết định nhận task nào.
"""
def __init__(self, n_robots=4, grid_size=10):
self.n_robots = n_robots
self.grid_size = grid_size
self.reset()
def reset(self):
# Random vị trí robot
self.robot_pos = np.random.randint(
0, self.grid_size, size=(self.n_robots, 2)
)
self.robot_battery = np.ones(self.n_robots)
# Random tasks
self.tasks = []
for _ in range(self.n_robots * 2):
self.tasks.append({
'pos': np.random.randint(
0, self.grid_size, size=2
),
'assigned': False,
'completed': False,
})
return self._get_obs()
def _get_obs(self):
"""Observation cho mỗi robot: vị trí mình + tất cả task."""
obs = {}
for i in range(self.n_robots):
obs[i] = {
'my_pos': self.robot_pos[i],
'battery': self.robot_battery[i],
'tasks': [
t['pos'] for t in self.tasks
if not t['assigned']
],
}
return obs
def step(self, actions: dict):
"""
actions: {robot_id: task_index}
Returns: obs, rewards, done
"""
rewards = {}
for robot_id, task_idx in actions.items():
if task_idx < len(self.tasks) \
and not self.tasks[task_idx]['assigned']:
task = self.tasks[task_idx]
dist = np.linalg.norm(
self.robot_pos[robot_id] - task['pos']
)
# Reward = negative distance (gần hơn = reward cao hơn)
rewards[robot_id] = -dist / self.grid_size
task['assigned'] = True
self.robot_pos[robot_id] = task['pos']
else:
rewards[robot_id] = -1.0 # penalty cho invalid action
done = all(t['assigned'] for t in self.tasks)
return self._get_obs(), rewards, done
# Demo: random policy vs greedy policy
env = SimpleTaskAllocationEnv(n_robots=3, grid_size=20)
# Greedy: mỗi robot chọn task gần nhất
obs = env.reset()
total_reward = 0
step = 0
while True:
actions = {}
assigned_tasks = set()
for robot_id in range(env.n_robots):
robot_obs = obs[robot_id]
if not robot_obs['tasks']:
continue
# Tìm task gần nhất chưa được chọn trong step này
min_dist = float('inf')
best_task = None
for idx, t in enumerate(env.tasks):
if t['assigned'] or idx in assigned_tasks:
continue
dist = np.linalg.norm(
robot_obs['my_pos'] - t['pos']
)
if dist < min_dist:
min_dist = dist
best_task = idx
if best_task is not None:
actions[robot_id] = best_task
assigned_tasks.add(best_task)
if not actions:
break
obs, rewards, done = env.step(actions)
step_reward = sum(rewards.values())
total_reward += step_reward
step += 1
if done:
break
print(f"Greedy policy: {step} steps, "
f"total reward: {total_reward:.2f}")
Kết quả nghiên cứu
Theo các paper gần đây (ICRA 2025, RSS 2025), MARL-based allocation vượt trội hơn auction-based 15-25% trong các kịch bản:
- Task arrival rate cao và không đều
- Robot có capability khác nhau (heterogeneous fleet)
- Môi trường có bottleneck (hành lang hẹp, thang máy)
Tuy nhiên, RL cần thời gian training lớn và khó giải thích quyết định (black box), nên trong production hiện tại, phần lớn hệ thống vẫn dùng auction-based hoặc hybrid.
Traffic Management: Conflict Detection và Resolution
Phân công task chỉ là nửa bài toán. Nửa còn lại: đảm bảo robot không va chạm trên đường đi.
Conflict Detection
Hai loại xung đột chính:
- Vertex conflict: 2 robot cùng đến 1 điểm tại cùng thời điểm
- Edge conflict: 2 robot đi ngược chiều trên cùng 1 đoạn đường
Priority-based Resolution
Cách đơn giản nhất: gán priority cho mỗi robot.
from enum import IntEnum
class Priority(IntEnum):
LOW = 1
NORMAL = 2
HIGH = 3
EMERGENCY = 4
def resolve_conflict(robot_a, robot_b, conflict_point):
"""
Giải quyết xung đột tại một điểm.
Robot có priority thấp hơn phải nhường.
"""
if robot_a.priority > robot_b.priority:
# Robot A đi trước, B chờ
robot_b.wait_at(robot_b.current_position)
return robot_a.id
elif robot_b.priority > robot_a.priority:
robot_a.wait_at(robot_a.current_position)
return robot_b.id
else:
# Cùng priority → robot gần hơn đi trước
dist_a = distance(robot_a.current_position,
conflict_point)
dist_b = distance(robot_b.current_position,
conflict_point)
if dist_a < dist_b:
robot_b.wait_at(robot_b.current_position)
return robot_a.id
else:
robot_a.wait_at(robot_a.current_position)
return robot_b.id
CBS — Conflict-Based Search
Cho kịch bản phức tạp hơn, CBS (Sharon et al., 2015) là thuật toán multi-agent pathfinding state-of-the-art:
- Low-level: Tìm đường ngắn nhất cho từng robot (A*)
- High-level: Phát hiện conflict, tạo constraint, tìm lại đường
- Lặp lại cho đến khi không còn conflict
CBS optimal cho bài toán MAPF (Multi-Agent Path Finding) nhưng exponential trong worst case. Các biến thể bounded-suboptimal như ECBS cho kết quả gần tối ưu trong thời gian polynomial.
Chọn thuật toán nào?
| Kịch bản | Thuật toán khuyến nghị |
|---|---|
| Fleet nhỏ (< 10), task đơn giản | Hungarian algorithm |
| Fleet vừa (10-50), task dynamic | Auction-based (sequential) |
| Fleet lớn (50+), heterogeneous | CBBA hoặc market-based |
| Nghiên cứu / task phức tạp | Multi-agent RL |
| Traffic management | CBS hoặc priority-based |
Trong thực tế, hệ thống production thường kết hợp: dùng auction cho task allocation và CBS cho path planning. Open-RMF sử dụng schedule-based traffic management kết hợp với configurable task dispatcher — một lựa chọn cân bằng tốt giữa hiệu suất và độ phức tạp.
VnRobo đang nghiên cứu hybrid approach: auction-based allocation cho real-time responsiveness, kết hợp RL fine-tuning cho các kịch bản lặp lại thường xuyên trong nhà máy.
Bài viết liên quan
- Quản lý đội robot trong nhà máy thông minh — Tổng quan kiến trúc Robot Fleet Management
- Open-RMF: Hệ thống mã nguồn mở quản lý đội robot — Framework multi-vendor fleet management với ROS 2
- Lập trình điều khiển robot với Python — Nền tảng Python cho robotics