← Quay lại Blog
navigationfleetamrprogramming

Multi-robot Coordination: Thuật toán phân công task

Các thuật toán phân công nhiệm vụ cho đội robot — từ Hungarian algorithm, auction-based đến RL-based task allocation.

Nguyễn Anh Tuấn20 tháng 3, 202612 phút đọc
Multi-robot Coordination: Thuật toán phân công task

Bài toán Multi-Robot Task Allocation (MRTA)

Multi-robot coordination bắt đầu từ một câu hỏi đơn giản: có N robot và M nhiệm vụ, gán robot nào cho nhiệm vụ nào để tổng chi phí (thời gian, năng lượng, quãng đường) là nhỏ nhất? Đây chính là bài toán MRTA — Multi-Robot Task Allocation — nền tảng của mọi hệ thống fleet management hiện đại.

Trong thực tế nhà máy, MRTA xuất hiện liên tục: 10 AMR trong kho hàng, mỗi phút có 5-10 đơn picking mới. Robot nào gần nhất? Robot nào còn đủ pin? Robot nào đang rảnh? Thuật toán phân công quyết định hiệu suất toàn bộ fleet — sai lầm nhỏ trong allocation có thể khiến throughput giảm 30-40%.

Phân loại MRTA theo taxonomy Gerkey-Matarić

Gerkey và Matarić (2004) phân loại MRTA theo 3 chiều:

Phần lớn ứng dụng kho hàng thuộc loại ST-SR-IA: mỗi robot làm 1 task, mỗi task cần 1 robot, gán tức thì khi task mới đến. Đây là dạng đơn giản nhất và có thuật toán tối ưu.

Robot AMR tự hành trong kho hàng logistics hiện đại

Hungarian Algorithm — Tối ưu cho bài toán gán 1-1

Hungarian algorithm (Kuhn-Munkres) giải bài toán assignment problem trong thời gian O(n³) — tìm phương án gán 1-1 giữa N robot và N task sao cho tổng chi phí nhỏ nhất. Đây là thuật toán tối ưu toàn cục, nghĩa là không có phương án gán nào tốt hơn.

Ý tưởng thuật toán

  1. Xây dựng ma trận chi phí C[i][j] = chi phí để robot i thực hiện task j
  2. Trừ mỗi hàng cho giá trị nhỏ nhất trong hàng đó
  3. Trừ mỗi cột cho giá trị nhỏ nhất trong cột đó
  4. Tìm phương án gán sao cho tất cả các giá trị được chọn đều bằng 0
  5. Nếu chưa tìm được, điều chỉnh ma trận và lặp lại

Python implementation với scipy

import numpy as np
from scipy.optimize import linear_sum_assignment

def allocate_tasks_hungarian(robot_positions, task_positions):
    """
    Phân công nhiệm vụ cho robot sử dụng Hungarian Algorithm.

    Args:
        robot_positions: list of (x, y) - vị trí hiện tại của mỗi robot
        task_positions: list of (x, y) - vị trí của mỗi task

    Returns:
        assignments: list of (robot_idx, task_idx) pairs
        total_cost: tổng khoảng cách di chuyển
    """
    n_robots = len(robot_positions)
    n_tasks = len(task_positions)

    # Xây dựng ma trận chi phí (Euclidean distance)
    cost_matrix = np.zeros((n_robots, n_tasks))
    for i, r_pos in enumerate(robot_positions):
        for j, t_pos in enumerate(task_positions):
            cost_matrix[i][j] = np.sqrt(
                (r_pos[0] - t_pos[0])**2 +
                (r_pos[1] - t_pos[1])**2
            )

    # Giải bằng scipy (Hungarian algorithm bên trong)
    row_ind, col_ind = linear_sum_assignment(cost_matrix)

    assignments = list(zip(row_ind.tolist(), col_ind.tolist()))
    total_cost = cost_matrix[row_ind, col_ind].sum()

    return assignments, total_cost

# Ví dụ: 4 robot, 4 task trong nhà máy
robots = [(0, 0), (10, 0), (0, 10), (10, 10)]
tasks = [(2, 3), (8, 1), (1, 8), (9, 9)]

assignments, cost = allocate_tasks_hungarian(robots, tasks)
print(f"Phân công tối ưu (tổng distance: {cost:.1f}m):")
for r_idx, t_idx in assignments:
    print(f"  Robot {r_idx} → Task {t_idx} "
          f"({robots[r_idx]} → {tasks[t_idx]})")

Output:

Phân công tối ưu (tổng distance: 10.6m):
  Robot 0 → Task 0 ((0, 0) → (2, 3))
  Robot 1 → Task 1 ((10, 0) → (8, 1))
  Robot 2 → Task 2 ((0, 10) → (1, 8))
  Robot 3 → Task 3 ((10, 10) → (9, 9))

Hạn chế của Hungarian

Auction-based Allocation — Phân tán và linh hoạt

Auction-based allocation mô phỏng cơ chế đấu giá: mỗi task được "rao bán", các robot "đấu giá" (bid), robot đưa ra bid tốt nhất được gán task. Phương pháp này phân tán — mỗi robot tự tính bid dựa trên thông tin local, không cần server tập trung biết trạng thái toàn bộ fleet.

Sequential Single-Item Auction

Đây là dạng đơn giản nhất:

  1. Auctioneer (server hoặc robot leader) công bố task mới
  2. Mỗi robot tính bid = estimated cost để hoàn thành task
  3. Robot gửi bid về auctioneer
  4. Auctioneer chọn robot có bid thấp nhất, gán task
  5. Lặp lại cho task tiếp theo

Python implementation

import random
from dataclasses import dataclass, field
from typing import Optional
import math

@dataclass
class Robot:
    id: int
    x: float
    y: float
    battery: float  # 0.0 - 1.0
    current_task: Optional[int] = None

    def compute_bid(self, task) -> float:
        """Tính bid dựa trên khoảng cách và mức pin."""
        distance = math.sqrt(
            (self.x - task.x)**2 + (self.y - task.y)**2
        )
        # Penalty nếu pin thấp (bid cao hơn = ít muốn nhận task)
        battery_penalty = 1.0 / max(self.battery, 0.1)
        # Penalty nếu đang bận
        busy_penalty = 2.0 if self.current_task is not None else 1.0

        return distance * battery_penalty * busy_penalty

@dataclass
class Task:
    id: int
    x: float
    y: float
    priority: int = 1  # 1=normal, 2=urgent, 3=critical

@dataclass
class AuctionResult:
    task_id: int
    robot_id: int
    bid_value: float

def auction_allocate(robots: list[Robot],
                     tasks: list[Task]) -> list[AuctionResult]:
    """
    Sequential single-item auction allocation.
    Tasks được sắp xếp theo priority (critical trước).
    """
    results = []
    available_robots = {r.id: r for r in robots}

    # Ưu tiên task critical trước
    sorted_tasks = sorted(tasks, key=lambda t: -t.priority)

    for task in sorted_tasks:
        if not available_robots:
            print(f"  Task {task.id}: Không còn robot khả dụng!")
            break

        # Thu thập bids từ tất cả robot available
        bids = {}
        for robot in available_robots.values():
            bid = robot.compute_bid(task)
            bids[robot.id] = bid
            print(f"  Robot {robot.id} bids {bid:.2f} "
                  f"for Task {task.id}")

        # Robot có bid thấp nhất thắng
        winner_id = min(bids, key=bids.get)
        winner = available_robots.pop(winner_id)
        winner.current_task = task.id

        results.append(AuctionResult(
            task_id=task.id,
            robot_id=winner_id,
            bid_value=bids[winner_id]
        ))
        print(f"  → Robot {winner_id} wins Task {task.id} "
              f"(bid: {bids[winner_id]:.2f})\n")

    return results

# Demo
robots = [
    Robot(0, x=1.0, y=1.0, battery=0.9),
    Robot(1, x=8.0, y=2.0, battery=0.3),  # pin thấp
    Robot(2, x=5.0, y=7.0, battery=0.8),
    Robot(3, x=9.0, y=9.0, battery=0.6),
]

tasks = [
    Task(0, x=2.0, y=3.0, priority=1),
    Task(1, x=7.0, y=1.0, priority=3),   # critical
    Task(2, x=4.0, y=8.0, priority=2),   # urgent
]

print("=== Auction-based Task Allocation ===\n")
results = auction_allocate(robots, tasks)

print("\n=== Kết quả cuối cùng ===")
for r in results:
    print(f"Task {r.task_id} → Robot {r.robot_id} "
          f"(bid: {r.bid_value:.2f})")

Ưu điểm của auction-based

Nhược điểm

Hệ thống tự động hóa trong nhà máy sản xuất

Market-based Approach — Kết hợp tốt nhất hai thế giới

Market-based allocation mở rộng auction-based bằng cách cho phép robot trao đổi task sau khi đã gán. Ý tưởng: sau vòng auction ban đầu, nếu robot A nhận ra rằng hoán đổi task với robot B sẽ có lợi cho cả hai, chúng sẽ tự thương lượng và swap.

Thuật toán Consensus-Based Bundle Algorithm (CBBA)

CBBA là thuật toán market-based phổ biến nhất cho multi-robot:

  1. Bundle Phase: Mỗi robot tự xây dựng "bundle" — danh sách task nó muốn nhận, sắp theo mức ưu tiên
  2. Consensus Phase: Robot trao đổi thông tin với neighbors, giải quyết xung đột (2 robot muốn cùng 1 task)
  3. Lặp lại cho đến khi hội tụ (không ai muốn đổi nữa)

CBBA đảm bảo hội tụ trong O(N × M) vòng giao tiếp và cho kết quả gần tối ưu (within 50% of optimal trong worst case, nhưng thường tốt hơn nhiều).

Reinforcement Learning cho Dynamic Task Allocation

Các phương pháp trên đều giải bài toán "ảnh chụp tĩnh" — cho biết trạng thái hiện tại, tìm assignment tốt nhất. Nhưng trong thực tế, task đến liên tục và không thể dự đoán. Reinforcement Learning (RL) học chính sách phân công dài hạn — chấp nhận gán suboptimal ngay bây giờ nếu biết 5 phút nữa sẽ có task critical ở gần robot đó.

Formulation

Multi-Agent RL (MARL)

Với fleet lớn, single-agent RL không scale. MARL cho phép mỗi robot là một agent độc lập:

import numpy as np

class SimpleTaskAllocationEnv:
    """
    Simplified multi-agent environment cho task allocation.
    Mỗi robot (agent) quyết định nhận task nào.
    """

    def __init__(self, n_robots=4, grid_size=10):
        self.n_robots = n_robots
        self.grid_size = grid_size
        self.reset()

    def reset(self):
        # Random vị trí robot
        self.robot_pos = np.random.randint(
            0, self.grid_size, size=(self.n_robots, 2)
        )
        self.robot_battery = np.ones(self.n_robots)

        # Random tasks
        self.tasks = []
        for _ in range(self.n_robots * 2):
            self.tasks.append({
                'pos': np.random.randint(
                    0, self.grid_size, size=2
                ),
                'assigned': False,
                'completed': False,
            })

        return self._get_obs()

    def _get_obs(self):
        """Observation cho mỗi robot: vị trí mình + tất cả task."""
        obs = {}
        for i in range(self.n_robots):
            obs[i] = {
                'my_pos': self.robot_pos[i],
                'battery': self.robot_battery[i],
                'tasks': [
                    t['pos'] for t in self.tasks
                    if not t['assigned']
                ],
            }
        return obs

    def step(self, actions: dict):
        """
        actions: {robot_id: task_index}
        Returns: obs, rewards, done
        """
        rewards = {}
        for robot_id, task_idx in actions.items():
            if task_idx < len(self.tasks) \
               and not self.tasks[task_idx]['assigned']:
                task = self.tasks[task_idx]
                dist = np.linalg.norm(
                    self.robot_pos[robot_id] - task['pos']
                )
                # Reward = negative distance (gần hơn = reward cao hơn)
                rewards[robot_id] = -dist / self.grid_size
                task['assigned'] = True
                self.robot_pos[robot_id] = task['pos']
            else:
                rewards[robot_id] = -1.0  # penalty cho invalid action

        done = all(t['assigned'] for t in self.tasks)
        return self._get_obs(), rewards, done

# Demo: random policy vs greedy policy
env = SimpleTaskAllocationEnv(n_robots=3, grid_size=20)

# Greedy: mỗi robot chọn task gần nhất
obs = env.reset()
total_reward = 0
step = 0
while True:
    actions = {}
    assigned_tasks = set()
    for robot_id in range(env.n_robots):
        robot_obs = obs[robot_id]
        if not robot_obs['tasks']:
            continue

        # Tìm task gần nhất chưa được chọn trong step này
        min_dist = float('inf')
        best_task = None
        for idx, t in enumerate(env.tasks):
            if t['assigned'] or idx in assigned_tasks:
                continue
            dist = np.linalg.norm(
                robot_obs['my_pos'] - t['pos']
            )
            if dist < min_dist:
                min_dist = dist
                best_task = idx

        if best_task is not None:
            actions[robot_id] = best_task
            assigned_tasks.add(best_task)

    if not actions:
        break

    obs, rewards, done = env.step(actions)
    step_reward = sum(rewards.values())
    total_reward += step_reward
    step += 1

    if done:
        break

print(f"Greedy policy: {step} steps, "
      f"total reward: {total_reward:.2f}")

Kết quả nghiên cứu

Theo các paper gần đây (ICRA 2025, RSS 2025), MARL-based allocation vượt trội hơn auction-based 15-25% trong các kịch bản:

Tuy nhiên, RL cần thời gian training lớn và khó giải thích quyết định (black box), nên trong production hiện tại, phần lớn hệ thống vẫn dùng auction-based hoặc hybrid.

Traffic Management: Conflict Detection và Resolution

Phân công task chỉ là nửa bài toán. Nửa còn lại: đảm bảo robot không va chạm trên đường đi.

Conflict Detection

Hai loại xung đột chính:

Priority-based Resolution

Cách đơn giản nhất: gán priority cho mỗi robot.

from enum import IntEnum

class Priority(IntEnum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    EMERGENCY = 4

def resolve_conflict(robot_a, robot_b, conflict_point):
    """
    Giải quyết xung đột tại một điểm.
    Robot có priority thấp hơn phải nhường.
    """
    if robot_a.priority > robot_b.priority:
        # Robot A đi trước, B chờ
        robot_b.wait_at(robot_b.current_position)
        return robot_a.id
    elif robot_b.priority > robot_a.priority:
        robot_a.wait_at(robot_a.current_position)
        return robot_b.id
    else:
        # Cùng priority → robot gần hơn đi trước
        dist_a = distance(robot_a.current_position,
                         conflict_point)
        dist_b = distance(robot_b.current_position,
                         conflict_point)
        if dist_a < dist_b:
            robot_b.wait_at(robot_b.current_position)
            return robot_a.id
        else:
            robot_a.wait_at(robot_a.current_position)
            return robot_b.id

CBS — Conflict-Based Search

Cho kịch bản phức tạp hơn, CBS (Sharon et al., 2015) là thuật toán multi-agent pathfinding state-of-the-art:

  1. Low-level: Tìm đường ngắn nhất cho từng robot (A*)
  2. High-level: Phát hiện conflict, tạo constraint, tìm lại đường
  3. Lặp lại cho đến khi không còn conflict

CBS optimal cho bài toán MAPF (Multi-Agent Path Finding) nhưng exponential trong worst case. Các biến thể bounded-suboptimal như ECBS cho kết quả gần tối ưu trong thời gian polynomial.

Kỹ sư lập trình và giám sát hệ thống robot tự động

Chọn thuật toán nào?

Kịch bản Thuật toán khuyến nghị
Fleet nhỏ (< 10), task đơn giản Hungarian algorithm
Fleet vừa (10-50), task dynamic Auction-based (sequential)
Fleet lớn (50+), heterogeneous CBBA hoặc market-based
Nghiên cứu / task phức tạp Multi-agent RL
Traffic management CBS hoặc priority-based

Trong thực tế, hệ thống production thường kết hợp: dùng auction cho task allocation và CBS cho path planning. Open-RMF sử dụng schedule-based traffic management kết hợp với configurable task dispatcher — một lựa chọn cân bằng tốt giữa hiệu suất và độ phức tạp.

VnRobo đang nghiên cứu hybrid approach: auction-based allocation cho real-time responsiveness, kết hợp RL fine-tuning cho các kịch bản lặp lại thường xuyên trong nhà máy.


Bài viết liên quan

Bài viết liên quan

Docker + K3s trên edge: GitOps cho robot fleet
devopsfleetkubernetes

Docker + K3s trên edge: GitOps cho robot fleet

Hướng dẫn triển khai Docker và K3s trên edge device — quản lý, cập nhật OTA và giám sát hàng trăm robot với GitOps workflow.

10/3/20268 phút đọc
Deep DiveImitation Learning: BC, DAgger và DAPG cho robot
ai-perceptionimitation-learningprogrammingPhần 2

Imitation Learning: BC, DAgger và DAPG cho robot

Tại sao imitation learning quan trọng hơn RL cho nhiều bài toán manipulation — cách thu thập data và train policy.

8/3/202611 phút đọc
TutorialRL cho Robotics: PPO, SAC và cách chọn algorithm
ai-perceptionrlprogrammingPhần 1

RL cho Robotics: PPO, SAC và cách chọn algorithm

Tổng quan RL algorithms cho robotics — PPO, SAC, TD3 và hướng dẫn chọn algorithm phù hợp cho từng bài toán robot.

5/3/202610 phút đọc