navigationros2tutorialamr

ROS 2 A to Z (Part 2): Topics, Services, and Actions

Master the 3 core communication primitives in ROS 2 — when to use Topic, Service, or Action for your robot.

Nguyen Anh Tuan21 tháng 1, 20269 phút đọc
ROS 2 A to Z (Part 2): Topics, Services, and Actions

Communication in ROS 2 — The Heart of Every Robot System

A robot system comprises dozens, even hundreds of nodes. Sensor nodes read LiDAR, camera nodes process images, planner nodes compute paths, motor nodes control wheels. They all need to communicate with each other. ROS 2 provides 3 primary communication mechanisms: Topics, Services, and Actions. Understanding when to use each is the most critical skill for a robotics engineer.

This post continues from Part 1: Setup and First Node, where you wrote simple publisher/subscriber code. Now we'll go deeper.

Network communication between ROS 2 nodes

Topics — Continuous Data Streams

When to Use Topics?

Topics are ideal when you need continuous data streaming from one or more sources to one or more receivers. Characteristics:

  • Publish/Subscribe (1-to-many, many-to-many)
  • Asynchronous — publisher doesn't know who's subscribing
  • No response — fire and forget

Real-world examples: sensor data (LiDAR scans, camera frames, IMU readings), robot state (position, velocity), log messages.

Custom Messages

In Part 1, we used std_msgs/Float32. In practice, you'll create custom messages. Create a package for custom messages:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake my_robot_interfaces \
  --dependencies std_msgs geometry_msgs

Create a message file:

# ~/ros2_ws/src/my_robot_interfaces/msg/SensorData.msg
# Composite data from multiple sensors
std_msgs/Header header
float32 temperature
float32 humidity
float32 battery_voltage
bool emergency_stop
string status_message

Update CMakeLists.txt:

# Add to CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  DEPENDENCIES std_msgs geometry_msgs
)

Update package.xml:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

Build and verify:

cd ~/ros2_ws
colcon build --packages-select my_robot_interfaces
source install/setup.bash

# Verify message was created
ros2 interface show my_robot_interfaces/msg/SensorData

Publisher with Custom Message

# ~/ros2_ws/src/my_first_robot/my_first_robot/sensor_data_publisher.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import SensorData
import random
from builtin_interfaces.msg import Time


class SensorDataPublisher(Node):
    def __init__(self):
        super().__init__('sensor_data_publisher')
        self.publisher_ = self.create_publisher(SensorData, '/sensor_data', 10)
        self.timer = self.create_timer(0.5, self.publish_sensor_data)
        self.get_logger().info('Sensor data publisher started (2 Hz)')

    def publish_sensor_data(self):
        msg = SensorData()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'base_link'
        msg.temperature = 25.0 + random.uniform(-3.0, 3.0)
        msg.humidity = 60.0 + random.uniform(-10.0, 10.0)
        msg.battery_voltage = 12.6 - random.uniform(0.0, 0.5)
        msg.emergency_stop = False
        msg.status_message = 'nominal'

        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = SensorDataPublisher()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

QoS — Quality of Service

QoS is one of ROS 2's strongest features compared to ROS 1. You can configure how data is transmitted for each topic:

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy

# QoS for sensor data (acceptable to lose some messages)
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,
    history=HistoryPolicy.KEEP_LAST,
    depth=5,
)

# QoS for control commands (must not lose messages)
command_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10,
)

# QoS for map data (subscriber receives even if joining late)
map_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.TRANSIENT_LOCAL,
    history=HistoryPolicy.KEEP_LAST,
    depth=1,
)

# Usage
self.publisher_ = self.create_publisher(LaserScan, '/scan', sensor_qos)

How to Choose QoS?

Situation Reliability Durability Depth
Sensor data (LiDAR, camera) BEST_EFFORT VOLATILE 5-10
Control commands (cmd_vel) RELIABLE VOLATILE 10
Map, TF static RELIABLE TRANSIENT_LOCAL 1
Logs, diagnostics BEST_EFFORT VOLATILE 20

Services — Ask and Answer

When to Use Services?

Services are ideal when you need to request a specific result and wait for a response:

  • Request/Response (1-to-1)
  • Synchronous — client sends request, waits for server reply
  • Not suitable for long-running tasks (will block)

Real-world examples: get battery status, trigger photo capture, reset sensor, change operating mode.

Create Custom Service

# ~/ros2_ws/src/my_robot_interfaces/srv/GetBatteryStatus.srv
# Request (above ---)
string robot_id
---
# Response (below ---)
float32 voltage
float32 percentage
float32 estimated_runtime_minutes
string health_status
bool is_charging

Update CMakeLists.txt:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "srv/GetBatteryStatus.srv"
  DEPENDENCIES std_msgs geometry_msgs
)

Service Server

# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_server.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus


class BatteryServiceServer(Node):
    def __init__(self):
        super().__init__('battery_service_server')
        self.srv = self.create_service(
            GetBatteryStatus,
            '/get_battery_status',
            self.handle_battery_request
        )
        # Simulate battery state
        self.voltage = 12.4
        self.is_charging = False
        self.get_logger().info('Battery service server ready')

    def handle_battery_request(self, request, response):
        self.get_logger().info(f'Battery status requested for: {request.robot_id}')

        response.voltage = self.voltage
        response.percentage = max(0.0, (self.voltage - 10.0) / 2.6 * 100.0)
        response.estimated_runtime_minutes = response.percentage * 1.2
        response.is_charging = self.is_charging

        if response.percentage > 50.0:
            response.health_status = 'good'
        elif response.percentage > 20.0:
            response.health_status = 'low'
        else:
            response.health_status = 'critical'

        self.get_logger().info(
            f'Response: {response.voltage}V ({response.percentage:.0f}%)'
        )
        return response


def main(args=None):
    rclpy.init(args=args)
    node = BatteryServiceServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Service Client (async)

# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_client.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus


class BatteryServiceClient(Node):
    def __init__(self):
        super().__init__('battery_service_client')
        self.client = self.create_client(
            GetBatteryStatus,
            '/get_battery_status'
        )
        # Wait for server to be ready
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for battery service...')

    def send_request(self, robot_id):
        request = GetBatteryStatus.Request()
        request.robot_id = robot_id

        # Call async — doesn't block event loop
        future = self.client.call_async(request)
        return future


def main(args=None):
    rclpy.init(args=args)
    client = BatteryServiceClient()

    future = client.send_request('robot_01')
    rclpy.spin_until_future_complete(client, future)

    result = future.result()
    client.get_logger().info(
        f'Battery: {result.voltage}V | {result.percentage:.0f}% | '
        f'Status: {result.health_status} | Charging: {result.is_charging}'
    )

    client.destroy_node()
    rclpy.shutdown()

Test from command line:

# Terminal 1: Run server
ros2 run my_first_robot battery_server

# Terminal 2: Call service from CLI
ros2 service call /get_battery_status \
  my_robot_interfaces/srv/GetBatteryStatus \
  "{robot_id: 'robot_01'}"

ROS 2 service request-response diagram

Actions — Long-Running Tasks with Feedback

When to Use Actions?

Actions are for long-running tasks that need intermediate feedback and can be canceled mid-way:

  • Goal/Feedback/Result pattern
  • Asynchronous — client sends goal, receives continuous feedback, finally gets result
  • Cancelable — can abort task mid-way

Real-world examples: navigate to waypoint (feedback = current position), map sweep (feedback = % complete), move robot arm (feedback = current joint angles).

Create Custom Action

# ~/ros2_ws/src/my_robot_interfaces/action/MoveToPosition.action
# Goal
float32 target_x
float32 target_y
float32 target_theta
---
# Result
bool success
float32 final_x
float32 final_y
float32 total_time_seconds
string message
---
# Feedback
float32 current_x
float32 current_y
float32 distance_remaining
float32 estimated_time_remaining

Update CMakeLists.txt:

find_package(action_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "srv/GetBatteryStatus.srv"
  "action/MoveToPosition.action"
  DEPENDENCIES std_msgs geometry_msgs action_msgs
)

Action Server

# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_server.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from my_robot_interfaces.action import MoveToPosition
import math
import time


class MoveActionServer(Node):
    def __init__(self):
        super().__init__('move_action_server')

        self._action_server = ActionServer(
            self,
            MoveToPosition,
            '/move_to_position',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback,
        )

        # Current robot position
        self.current_x = 0.0
        self.current_y = 0.0
        self.speed = 0.5  # m/s

        self.get_logger().info('Move action server ready')

    def goal_callback(self, goal_request):
        self.get_logger().info(
            f'Received goal: ({goal_request.target_x}, {goal_request.target_y})'
        )
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle):
        self.get_logger().info('Received cancel request')
        return CancelResponse.ACCEPT

    async def execute_callback(self, goal_handle):
        self.get_logger().info('Executing move...')

        target_x = goal_handle.request.target_x
        target_y = goal_handle.request.target_y
        start_time = time.time()

        feedback_msg = MoveToPosition.Feedback()

        while True:
            # Calculate remaining distance
            dx = target_x - self.current_x
            dy = target_y - self.current_y
            distance = math.sqrt(dx * dx + dy * dy)

            # Reached destination?
            if distance < 0.05:
                break

            # Check for cancel
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = MoveToPosition.Result()
                result.success = False
                result.message = 'Movement canceled'
                return result

            # Move one step
            step = min(self.speed * 0.1, distance)
            self.current_x += (dx / distance) * step
            self.current_y += (dy / distance) * step

            # Send feedback
            feedback_msg.current_x = self.current_x
            feedback_msg.current_y = self.current_y
            feedback_msg.distance_remaining = distance
            feedback_msg.estimated_time_remaining = distance / self.speed
            goal_handle.publish_feedback(feedback_msg)

            time.sleep(0.1)  # 10 Hz update

        # Complete
        goal_handle.succeed()
        result = MoveToPosition.Result()
        result.success = True
        result.final_x = self.current_x
        result.final_y = self.current_y
        result.total_time_seconds = time.time() - start_time
        result.message = 'Reached target position'

        self.get_logger().info(
            f'Move completed in {result.total_time_seconds:.1f}s'
        )
        return result


def main(args=None):
    rclpy.init(args=args)
    node = MoveActionServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Action Client

# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_client.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from my_robot_interfaces.action import MoveToPosition


class MoveActionClient(Node):
    def __init__(self):
        super().__init__('move_action_client')
        self._action_client = ActionClient(
            self, MoveToPosition, '/move_to_position'
        )

    def send_goal(self, x, y, theta=0.0):
        goal_msg = MoveToPosition.Goal()
        goal_msg.target_x = x
        goal_msg.target_y = y
        goal_msg.target_theta = theta

        self._action_client.wait_for_server()
        self.get_logger().info(f'Sending goal: ({x}, {y})')

        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return

        self.get_logger().info('Goal accepted!')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def feedback_callback(self, feedback_msg):
        fb = feedback_msg.feedback
        self.get_logger().info(
            f'Position: ({fb.current_x:.2f}, {fb.current_y:.2f}) | '
            f'Remaining: {fb.distance_remaining:.2f}m'
        )

    def get_result_callback(self, future):
        result = future.result().result
        if result.success:
            self.get_logger().info(
                f'Arrived at ({result.final_x:.2f}, {result.final_y:.2f}) '
                f'in {result.total_time_seconds:.1f}s'
            )
        else:
            self.get_logger().warn(f'Failed: {result.message}')
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)
    client = MoveActionClient()
    client.send_goal(3.0, 4.0)
    rclpy.spin(client)


if __name__ == '__main__':
    main()

Comparison: When to Use What?

Here's a summary table to help you decide quickly:

Criterion Topic Service Action
Pattern Pub/Sub Request/Response Goal/Feedback/Result
Synchronous Asynchronous Synchronous (blocking) Asynchronous
Feedback Continuous (stream) Single response only Continuous feedback + 1 result
Cancel N/A No Yes
Use case Sensor data, state Query, trigger Navigation, long task
Example /scan, /cmd_vel /get_map, /set_mode /navigate_to_pose

Simple rules:

  • Data flowing continuously? => Topic
  • Quick question/answer (< 1 second)? => Service
  • Long-running task, need progress tracking? => Action

Topic, Service, and Action comparison diagram in ROS 2

Debug Communication with CLI

Some useful CLI commands for debugging:

# === Topics ===
ros2 topic list -t                     # List with message type
ros2 topic info /sensor_data --verbose # Detailed QoS info
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  "{linear: {x: 0.5}, angular: {z: 0.3}}"  # Publish manually

# === Services ===
ros2 service list -t
ros2 service type /get_battery_status
ros2 service call /get_battery_status \
  my_robot_interfaces/srv/GetBatteryStatus \
  "{robot_id: 'robot_01'}"

# === Actions ===
ros2 action list -t
ros2 action info /move_to_position
ros2 action send_goal /move_to_position \
  my_robot_interfaces/action/MoveToPosition \
  "{target_x: 2.0, target_y: 3.0, target_theta: 0.0}" \
  --feedback  # Display feedback

Summary

You now understand the 3 core communication primitives of ROS 2:

  • Topics for continuous data streams with QoS control
  • Services for quick request/response
  • Actions for long-running tasks with feedback and cancellation

Mastering these 3 mechanisms is the foundation for designing any robot system. In Part 3, we'll apply all of this to a real-world application: using Nav2 for your robot to auto-map with SLAM and move autonomously.


NT

Nguyễn Anh Tuấn

Robotics & AI Engineer. Building VnRobo — sharing knowledge about robot learning, VLA models, and automation.

Bài viết liên quan

NEWSo sánh
Lựa chọn thay thế InOrbit 2026: So sánh 5 nền tảng Fleet AMR
inorbitamrwarehouserobot-fleetcomparisonvda-5050

Lựa chọn thay thế InOrbit 2026: So sánh 5 nền tảng Fleet AMR

Tìm alternative InOrbit cho fleet AMR warehouse? So sánh thẳng thắn 5 nền tảng — giá, VDA 5050, hỗ trợ MiR/OTTO.

16/4/20267 phút đọc
NEWSo sánh
Lựa chọn thay thế Freedom Robotics 2026: So sánh 5 nền tảng Fleet Robot
freedom-roboticsrobot-fleetfleet-managementros2comparison

Lựa chọn thay thế Freedom Robotics 2026: So sánh 5 nền tảng Fleet Robot

Freedom Robotics tốt, nhưng có thể không hợp bạn. So sánh thẳng thắn 5 alternatives — giá, ROS 2, free tier, video live. Gồm VnRobo, Formant, InOrbit.

15/4/20268 phút đọc
NEWTutorial
Cách giám sát robot ROS 2 từ xa: Hướng dẫn đầy đủ 2026
ros2monitoringrobot-fleettutorialpythoniot

Cách giám sát robot ROS 2 từ xa: Hướng dẫn đầy đủ 2026

Hướng dẫn từng bước giám sát robot ROS 2 từ bất kỳ đâu — pin, CPU, trạng thái, heartbeat, alerts. Code chạy được, setup 10 phút. Miễn phí 3 robots.

14/4/20267 phút đọc