← Quay lại Blog
navigationros2tutorialamr

ROS 2 từ A đến Z (Phần 2): Topics, Services và Actions

Hiểu rõ 3 primitives giao tiếp trong ROS 2 — khi nào dùng Topic, Service hay Action cho robot của bạn.

Nguyễn Anh Tuấn21 tháng 1, 202610 phút đọc
ROS 2 từ A đến Z (Phần 2): Topics, Services và Actions

Giao tiếp trong ROS 2 — Trái tim của mọi hệ thống robot

Một hệ thống robot bao gồm hàng chục, thậm chí hàng trăm node. Sensor node đọc LiDAR, camera node xử lý ảnh, planner node tính đường đi, motor node điều khiển bánh xe. Tất cả cần giao tiếp với nhau. ROS 2 cung cấp 3 cơ chế giao tiếp chính: Topics, ServicesActions. Hiểu rõ khi nào dùng cái gì là kỹ năng quan trọng nhất của một kỹ sư robotics.

Bài này nối tiếp Phần 1: Cài đặt và Node đầu tiên, nơi bạn đã viết publisher/subscriber đơn giản. Giờ chúng ta sẽ đi sâu hơn.

Network giao tiếp giữa các node ROS 2

Topics — Dòng chảy dữ liệu liên tục

Khi nào dùng Topic?

Topic phù hợp khi bạn cần stream dữ liệu liên tục từ một hoặc nhiều nguồn đến một hoặc nhiều nơi nhận. Đặc điểm:

Ví dụ thực tế: sensor data (LiDAR scan, camera frames, IMU readings), robot state (vị trí, vận tốc), log messages.

Custom Message

Trong Phần 1, chúng ta dùng std_msgs/Float32. Trong thực tế, bạn sẽ cần tạo message riêng. Tạo package chứa custom messages:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake my_robot_interfaces \
  --dependencies std_msgs geometry_msgs

Tạo file message:

# ~/ros2_ws/src/my_robot_interfaces/msg/SensorData.msg
# Dữ liệu tổng hợp từ nhiều sensor
std_msgs/Header header
float32 temperature
float32 humidity
float32 battery_voltage
bool emergency_stop
string status_message

Cập nhật CMakeLists.txt:

# Thêm vào CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  DEPENDENCIES std_msgs geometry_msgs
)

Cập nhật package.xml:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

Build và kiểm tra:

cd ~/ros2_ws
colcon build --packages-select my_robot_interfaces
source install/setup.bash

# Kiểm tra message đã được tạo
ros2 interface show my_robot_interfaces/msg/SensorData

Publisher với Custom Message

# ~/ros2_ws/src/my_first_robot/my_first_robot/sensor_data_publisher.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import SensorData
import random
from builtin_interfaces.msg import Time


class SensorDataPublisher(Node):
    def __init__(self):
        super().__init__('sensor_data_publisher')
        self.publisher_ = self.create_publisher(SensorData, '/sensor_data', 10)
        self.timer = self.create_timer(0.5, self.publish_sensor_data)
        self.get_logger().info('Sensor data publisher started (2 Hz)')

    def publish_sensor_data(self):
        msg = SensorData()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'base_link'
        msg.temperature = 25.0 + random.uniform(-3.0, 3.0)
        msg.humidity = 60.0 + random.uniform(-10.0, 10.0)
        msg.battery_voltage = 12.6 - random.uniform(0.0, 0.5)
        msg.emergency_stop = False
        msg.status_message = 'nominal'

        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = SensorDataPublisher()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

QoS — Quality of Service

QoS là một trong những tính năng mạnh nhất của ROS 2 so với ROS 1. Bạn có thể cấu hình cách dữ liệu được truyền cho từng topic:

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy

# QoS cho sensor data (chấp nhận mất vài message)
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,
    history=HistoryPolicy.KEEP_LAST,
    depth=5,
)

# QoS cho lệnh điều khiển (không được mất message)
command_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10,
)

# QoS cho map data (subscriber nhận được dù join muộn)
map_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.TRANSIENT_LOCAL,
    history=HistoryPolicy.KEEP_LAST,
    depth=1,
)

# Sử dụng
self.publisher_ = self.create_publisher(LaserScan, '/scan', sensor_qos)

Chọn QoS thế nào?

Tình huống Reliability Durability Depth
Sensor data (LiDAR, camera) BEST_EFFORT VOLATILE 5-10
Lệnh điều khiển (cmd_vel) RELIABLE VOLATILE 10
Map, TF static RELIABLE TRANSIENT_LOCAL 1
Log, diagnostics BEST_EFFORT VOLATILE 20

Services — Hỏi và trả lời

Khi nào dùng Service?

Service phù hợp khi bạn cần request một kết quả cụ thểchờ response:

Ví dụ thực tế: lấy trạng thái pin, trigger chụp ảnh, reset sensor, thay đổi mode hoạt động.

Tạo Custom Service

# ~/ros2_ws/src/my_robot_interfaces/srv/GetBatteryStatus.srv
# Request (trên dấu ---)
string robot_id
---
# Response (dưới dấu ---)
float32 voltage
float32 percentage
float32 estimated_runtime_minutes
string health_status
bool is_charging

Cập nhật CMakeLists.txt:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "srv/GetBatteryStatus.srv"
  DEPENDENCIES std_msgs geometry_msgs
)

Service Server

# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_server.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus


class BatteryServiceServer(Node):
    def __init__(self):
        super().__init__('battery_service_server')
        self.srv = self.create_service(
            GetBatteryStatus,
            '/get_battery_status',
            self.handle_battery_request
        )
        # Giả lập trạng thái pin
        self.voltage = 12.4
        self.is_charging = False
        self.get_logger().info('Battery service server ready')

    def handle_battery_request(self, request, response):
        self.get_logger().info(f'Battery status requested for: {request.robot_id}')

        response.voltage = self.voltage
        response.percentage = max(0.0, (self.voltage - 10.0) / 2.6 * 100.0)
        response.estimated_runtime_minutes = response.percentage * 1.2
        response.is_charging = self.is_charging

        if response.percentage > 50.0:
            response.health_status = 'good'
        elif response.percentage > 20.0:
            response.health_status = 'low'
        else:
            response.health_status = 'critical'

        self.get_logger().info(
            f'Response: {response.voltage}V ({response.percentage:.0f}%)'
        )
        return response


def main(args=None):
    rclpy.init(args=args)
    node = BatteryServiceServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Service Client (async)

# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_client.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus


class BatteryServiceClient(Node):
    def __init__(self):
        super().__init__('battery_service_client')
        self.client = self.create_client(
            GetBatteryStatus,
            '/get_battery_status'
        )
        # Đợi server sẵn sàng
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for battery service...')

    def send_request(self, robot_id):
        request = GetBatteryStatus.Request()
        request.robot_id = robot_id

        # Gọi async — không block event loop
        future = self.client.call_async(request)
        return future


def main(args=None):
    rclpy.init(args=args)
    client = BatteryServiceClient()

    future = client.send_request('robot_01')
    rclpy.spin_until_future_complete(client, future)

    result = future.result()
    client.get_logger().info(
        f'Battery: {result.voltage}V | {result.percentage:.0f}% | '
        f'Status: {result.health_status} | Charging: {result.is_charging}'
    )

    client.destroy_node()
    rclpy.shutdown()

Test từ command line:

# Terminal 1: Chạy server
ros2 run my_first_robot battery_server

# Terminal 2: Gọi service từ CLI
ros2 service call /get_battery_status \
  my_robot_interfaces/srv/GetBatteryStatus \
  "{robot_id: 'robot_01'}"

Sơ đồ request-response của ROS 2 service

Actions — Nhiệm vụ dài hạn với feedback

Khi nào dùng Action?

Action dùng cho long-running tasks cần feedback trung gian và có thể cancel giữa chừng:

Ví dụ thực tế: navigation đến điểm đích (feedback = vị trí hiện tại), quét bản đồ (feedback = % hoàn thành), di chuyển cánh tay robot (feedback = joint angles hiện tại).

Tạo Custom Action

# ~/ros2_ws/src/my_robot_interfaces/action/MoveToPosition.action
# Goal
float32 target_x
float32 target_y
float32 target_theta
---
# Result
bool success
float32 final_x
float32 final_y
float32 total_time_seconds
string message
---
# Feedback
float32 current_x
float32 current_y
float32 distance_remaining
float32 estimated_time_remaining

Cập nhật CMakeLists.txt:

find_package(action_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "srv/GetBatteryStatus.srv"
  "action/MoveToPosition.action"
  DEPENDENCIES std_msgs geometry_msgs action_msgs
)

Action Server

# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_server.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from my_robot_interfaces.action import MoveToPosition
import math
import time


class MoveActionServer(Node):
    def __init__(self):
        super().__init__('move_action_server')

        self._action_server = ActionServer(
            self,
            MoveToPosition,
            '/move_to_position',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback,
        )

        # Vị trí hiện tại của robot
        self.current_x = 0.0
        self.current_y = 0.0
        self.speed = 0.5  # m/s

        self.get_logger().info('Move action server ready')

    def goal_callback(self, goal_request):
        self.get_logger().info(
            f'Received goal: ({goal_request.target_x}, {goal_request.target_y})'
        )
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle):
        self.get_logger().info('Received cancel request')
        return CancelResponse.ACCEPT

    async def execute_callback(self, goal_handle):
        self.get_logger().info('Executing move...')

        target_x = goal_handle.request.target_x
        target_y = goal_handle.request.target_y
        start_time = time.time()

        feedback_msg = MoveToPosition.Feedback()

        while True:
            # Tính khoảng cách còn lại
            dx = target_x - self.current_x
            dy = target_y - self.current_y
            distance = math.sqrt(dx * dx + dy * dy)

            # Đã đến nơi?
            if distance < 0.05:
                break

            # Kiểm tra cancel
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = MoveToPosition.Result()
                result.success = False
                result.message = 'Movement canceled'
                return result

            # Di chuyển một bước
            step = min(self.speed * 0.1, distance)
            self.current_x += (dx / distance) * step
            self.current_y += (dy / distance) * step

            # Gửi feedback
            feedback_msg.current_x = self.current_x
            feedback_msg.current_y = self.current_y
            feedback_msg.distance_remaining = distance
            feedback_msg.estimated_time_remaining = distance / self.speed
            goal_handle.publish_feedback(feedback_msg)

            time.sleep(0.1)  # 10 Hz update

        # Hoàn thành
        goal_handle.succeed()
        result = MoveToPosition.Result()
        result.success = True
        result.final_x = self.current_x
        result.final_y = self.current_y
        result.total_time_seconds = time.time() - start_time
        result.message = 'Reached target position'

        self.get_logger().info(
            f'Move completed in {result.total_time_seconds:.1f}s'
        )
        return result


def main(args=None):
    rclpy.init(args=args)
    node = MoveActionServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Action Client

# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_client.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from my_robot_interfaces.action import MoveToPosition


class MoveActionClient(Node):
    def __init__(self):
        super().__init__('move_action_client')
        self._action_client = ActionClient(
            self, MoveToPosition, '/move_to_position'
        )

    def send_goal(self, x, y, theta=0.0):
        goal_msg = MoveToPosition.Goal()
        goal_msg.target_x = x
        goal_msg.target_y = y
        goal_msg.target_theta = theta

        self._action_client.wait_for_server()
        self.get_logger().info(f'Sending goal: ({x}, {y})')

        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return

        self.get_logger().info('Goal accepted!')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def feedback_callback(self, feedback_msg):
        fb = feedback_msg.feedback
        self.get_logger().info(
            f'Position: ({fb.current_x:.2f}, {fb.current_y:.2f}) | '
            f'Remaining: {fb.distance_remaining:.2f}m'
        )

    def get_result_callback(self, future):
        result = future.result().result
        if result.success:
            self.get_logger().info(
                f'Arrived at ({result.final_x:.2f}, {result.final_y:.2f}) '
                f'in {result.total_time_seconds:.1f}s'
            )
        else:
            self.get_logger().warn(f'Failed: {result.message}')
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)
    client = MoveActionClient()
    client.send_goal(3.0, 4.0)
    rclpy.spin(client)


if __name__ == '__main__':
    main()

So sánh: Khi nào dùng gì?

Đây là bảng tổng hợp giúp bạn quyết định nhanh:

Tiêu chí Topic Service Action
Pattern Pub/Sub Request/Response Goal/Feedback/Result
Đồng bộ Bất đồng bộ Đồng bộ (blocking) Bất đồng bộ
Feedback Liên tục (stream) Chỉ 1 response Feedback liên tục + 1 result
Cancel N/A Không
Use case Sensor data, state Query, trigger Navigation, long task
Ví dụ /scan, /cmd_vel /get_map, /set_mode /navigate_to_pose

Quy tắc đơn giản:

Sơ đồ so sánh Topic, Service và Action trong ROS 2

Debug giao tiếp với CLI

Một số lệnh CLI hữu ích cho debug:

# === Topics ===
ros2 topic list -t                     # List kèm message type
ros2 topic info /sensor_data --verbose # Chi tiết QoS
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  "{linear: {x: 0.5}, angular: {z: 0.3}}"  # Publish thủ công

# === Services ===
ros2 service list -t
ros2 service type /get_battery_status
ros2 service call /get_battery_status \
  my_robot_interfaces/srv/GetBatteryStatus \
  "{robot_id: 'robot_01'}"

# === Actions ===
ros2 action list -t
ros2 action info /move_to_position
ros2 action send_goal /move_to_position \
  my_robot_interfaces/action/MoveToPosition \
  "{target_x: 2.0, target_y: 3.0, target_theta: 0.0}" \
  --feedback  # Hiển thị feedback

Tổng kết

Bạn đã nắm được 3 primitives giao tiếp cốt lõi của ROS 2:

Hiểu rõ 3 cơ chế này là nền tảng để thiết kế bất kỳ hệ thống robot nào. Trong Phần 3, chúng ta sẽ áp dụng tất cả vào một ứng dụng thực tế: dùng Nav2 để robot tự lập bản đồ SLAM và di chuyển tự động.


Bài viết liên quan

Bài viết liên quan

Deep DiveDigital Twins và ROS 2: Simulation trong sản xuất
simulationros2digital-twinPhần 6

Digital Twins và ROS 2: Simulation trong sản xuất

Ứng dụng simulation trong công nghiệp — digital twins, ROS 2 + Gazebo/Isaac integration cho nhà máy thông minh.

3/4/202611 phút đọc
TutorialSim-to-Real Pipeline: Từ training đến robot thật
simulationsim2realtutorialPhần 5

Sim-to-Real Pipeline: Từ training đến robot thật

End-to-end guide: train policy trong sim, evaluate, domain randomization, deploy lên robot thật và iterate.

2/4/202615 phút đọc
TutorialBắt đầu với MuJoCo: Cài đặt đến mô phỏng robot đầu tiên
simulationmujocotutorialPhần 2

Bắt đầu với MuJoCo: Cài đặt đến mô phỏng robot đầu tiên

Hands-on tutorial MuJoCo — cài đặt, tạo MJCF model, simulate robot arm và visualize kết quả với Python.

30/3/202611 phút đọc