← Back to Blog
navigationros2tutorialamr

ROS 2 A to Z (Part 2): Topics, Services, and Actions

Master the 3 core communication primitives in ROS 2 — when to use Topic, Service, or Action for your robot.

Nguyen Anh Tuan21 tháng 1, 20269 min read
ROS 2 A to Z (Part 2): Topics, Services, and Actions

Communication in ROS 2 — The Heart of Every Robot System

A robot system comprises dozens, even hundreds of nodes. Sensor nodes read LiDAR, camera nodes process images, planner nodes compute paths, motor nodes control wheels. They all need to communicate with each other. ROS 2 provides 3 primary communication mechanisms: Topics, Services, and Actions. Understanding when to use each is the most critical skill for a robotics engineer.

This post continues from Part 1: Setup and First Node, where you wrote simple publisher/subscriber code. Now we'll go deeper.

Network communication between ROS 2 nodes

Topics — Continuous Data Streams

When to Use Topics?

Topics are ideal when you need continuous data streaming from one or more sources to one or more receivers. Characteristics:

Real-world examples: sensor data (LiDAR scans, camera frames, IMU readings), robot state (position, velocity), log messages.

Custom Messages

In Part 1, we used std_msgs/Float32. In practice, you'll create custom messages. Create a package for custom messages:

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake my_robot_interfaces \
  --dependencies std_msgs geometry_msgs

Create a message file:

# ~/ros2_ws/src/my_robot_interfaces/msg/SensorData.msg
# Composite data from multiple sensors
std_msgs/Header header
float32 temperature
float32 humidity
float32 battery_voltage
bool emergency_stop
string status_message

Update CMakeLists.txt:

# Add to CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  DEPENDENCIES std_msgs geometry_msgs
)

Update package.xml:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>

Build and verify:

cd ~/ros2_ws
colcon build --packages-select my_robot_interfaces
source install/setup.bash

# Verify message was created
ros2 interface show my_robot_interfaces/msg/SensorData

Publisher with Custom Message

# ~/ros2_ws/src/my_first_robot/my_first_robot/sensor_data_publisher.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import SensorData
import random
from builtin_interfaces.msg import Time


class SensorDataPublisher(Node):
    def __init__(self):
        super().__init__('sensor_data_publisher')
        self.publisher_ = self.create_publisher(SensorData, '/sensor_data', 10)
        self.timer = self.create_timer(0.5, self.publish_sensor_data)
        self.get_logger().info('Sensor data publisher started (2 Hz)')

    def publish_sensor_data(self):
        msg = SensorData()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'base_link'
        msg.temperature = 25.0 + random.uniform(-3.0, 3.0)
        msg.humidity = 60.0 + random.uniform(-10.0, 10.0)
        msg.battery_voltage = 12.6 - random.uniform(0.0, 0.5)
        msg.emergency_stop = False
        msg.status_message = 'nominal'

        self.publisher_.publish(msg)


def main(args=None):
    rclpy.init(args=args)
    node = SensorDataPublisher()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

QoS — Quality of Service

QoS is one of ROS 2's strongest features compared to ROS 1. You can configure how data is transmitted for each topic:

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy

# QoS for sensor data (acceptable to lose some messages)
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,
    history=HistoryPolicy.KEEP_LAST,
    depth=5,
)

# QoS for control commands (must not lose messages)
command_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    history=HistoryPolicy.KEEP_LAST,
    depth=10,
)

# QoS for map data (subscriber receives even if joining late)
map_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.TRANSIENT_LOCAL,
    history=HistoryPolicy.KEEP_LAST,
    depth=1,
)

# Usage
self.publisher_ = self.create_publisher(LaserScan, '/scan', sensor_qos)

How to Choose QoS?

Situation Reliability Durability Depth
Sensor data (LiDAR, camera) BEST_EFFORT VOLATILE 5-10
Control commands (cmd_vel) RELIABLE VOLATILE 10
Map, TF static RELIABLE TRANSIENT_LOCAL 1
Logs, diagnostics BEST_EFFORT VOLATILE 20

Services — Ask and Answer

When to Use Services?

Services are ideal when you need to request a specific result and wait for a response:

Real-world examples: get battery status, trigger photo capture, reset sensor, change operating mode.

Create Custom Service

# ~/ros2_ws/src/my_robot_interfaces/srv/GetBatteryStatus.srv
# Request (above ---)
string robot_id
---
# Response (below ---)
float32 voltage
float32 percentage
float32 estimated_runtime_minutes
string health_status
bool is_charging

Update CMakeLists.txt:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "srv/GetBatteryStatus.srv"
  DEPENDENCIES std_msgs geometry_msgs
)

Service Server

# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_server.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus


class BatteryServiceServer(Node):
    def __init__(self):
        super().__init__('battery_service_server')
        self.srv = self.create_service(
            GetBatteryStatus,
            '/get_battery_status',
            self.handle_battery_request
        )
        # Simulate battery state
        self.voltage = 12.4
        self.is_charging = False
        self.get_logger().info('Battery service server ready')

    def handle_battery_request(self, request, response):
        self.get_logger().info(f'Battery status requested for: {request.robot_id}')

        response.voltage = self.voltage
        response.percentage = max(0.0, (self.voltage - 10.0) / 2.6 * 100.0)
        response.estimated_runtime_minutes = response.percentage * 1.2
        response.is_charging = self.is_charging

        if response.percentage > 50.0:
            response.health_status = 'good'
        elif response.percentage > 20.0:
            response.health_status = 'low'
        else:
            response.health_status = 'critical'

        self.get_logger().info(
            f'Response: {response.voltage}V ({response.percentage:.0f}%)'
        )
        return response


def main(args=None):
    rclpy.init(args=args)
    node = BatteryServiceServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Service Client (async)

# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_client.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus


class BatteryServiceClient(Node):
    def __init__(self):
        super().__init__('battery_service_client')
        self.client = self.create_client(
            GetBatteryStatus,
            '/get_battery_status'
        )
        # Wait for server to be ready
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for battery service...')

    def send_request(self, robot_id):
        request = GetBatteryStatus.Request()
        request.robot_id = robot_id

        # Call async — doesn't block event loop
        future = self.client.call_async(request)
        return future


def main(args=None):
    rclpy.init(args=args)
    client = BatteryServiceClient()

    future = client.send_request('robot_01')
    rclpy.spin_until_future_complete(client, future)

    result = future.result()
    client.get_logger().info(
        f'Battery: {result.voltage}V | {result.percentage:.0f}% | '
        f'Status: {result.health_status} | Charging: {result.is_charging}'
    )

    client.destroy_node()
    rclpy.shutdown()

Test from command line:

# Terminal 1: Run server
ros2 run my_first_robot battery_server

# Terminal 2: Call service from CLI
ros2 service call /get_battery_status \
  my_robot_interfaces/srv/GetBatteryStatus \
  "{robot_id: 'robot_01'}"

ROS 2 service request-response diagram

Actions — Long-Running Tasks with Feedback

When to Use Actions?

Actions are for long-running tasks that need intermediate feedback and can be canceled mid-way:

Real-world examples: navigate to waypoint (feedback = current position), map sweep (feedback = % complete), move robot arm (feedback = current joint angles).

Create Custom Action

# ~/ros2_ws/src/my_robot_interfaces/action/MoveToPosition.action
# Goal
float32 target_x
float32 target_y
float32 target_theta
---
# Result
bool success
float32 final_x
float32 final_y
float32 total_time_seconds
string message
---
# Feedback
float32 current_x
float32 current_y
float32 distance_remaining
float32 estimated_time_remaining

Update CMakeLists.txt:

find_package(action_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SensorData.msg"
  "srv/GetBatteryStatus.srv"
  "action/MoveToPosition.action"
  DEPENDENCIES std_msgs geometry_msgs action_msgs
)

Action Server

# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_server.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from my_robot_interfaces.action import MoveToPosition
import math
import time


class MoveActionServer(Node):
    def __init__(self):
        super().__init__('move_action_server')

        self._action_server = ActionServer(
            self,
            MoveToPosition,
            '/move_to_position',
            execute_callback=self.execute_callback,
            goal_callback=self.goal_callback,
            cancel_callback=self.cancel_callback,
        )

        # Current robot position
        self.current_x = 0.0
        self.current_y = 0.0
        self.speed = 0.5  # m/s

        self.get_logger().info('Move action server ready')

    def goal_callback(self, goal_request):
        self.get_logger().info(
            f'Received goal: ({goal_request.target_x}, {goal_request.target_y})'
        )
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle):
        self.get_logger().info('Received cancel request')
        return CancelResponse.ACCEPT

    async def execute_callback(self, goal_handle):
        self.get_logger().info('Executing move...')

        target_x = goal_handle.request.target_x
        target_y = goal_handle.request.target_y
        start_time = time.time()

        feedback_msg = MoveToPosition.Feedback()

        while True:
            # Calculate remaining distance
            dx = target_x - self.current_x
            dy = target_y - self.current_y
            distance = math.sqrt(dx * dx + dy * dy)

            # Reached destination?
            if distance < 0.05:
                break

            # Check for cancel
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = MoveToPosition.Result()
                result.success = False
                result.message = 'Movement canceled'
                return result

            # Move one step
            step = min(self.speed * 0.1, distance)
            self.current_x += (dx / distance) * step
            self.current_y += (dy / distance) * step

            # Send feedback
            feedback_msg.current_x = self.current_x
            feedback_msg.current_y = self.current_y
            feedback_msg.distance_remaining = distance
            feedback_msg.estimated_time_remaining = distance / self.speed
            goal_handle.publish_feedback(feedback_msg)

            time.sleep(0.1)  # 10 Hz update

        # Complete
        goal_handle.succeed()
        result = MoveToPosition.Result()
        result.success = True
        result.final_x = self.current_x
        result.final_y = self.current_y
        result.total_time_seconds = time.time() - start_time
        result.message = 'Reached target position'

        self.get_logger().info(
            f'Move completed in {result.total_time_seconds:.1f}s'
        )
        return result


def main(args=None):
    rclpy.init(args=args)
    node = MoveActionServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

Action Client

# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_client.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from my_robot_interfaces.action import MoveToPosition


class MoveActionClient(Node):
    def __init__(self):
        super().__init__('move_action_client')
        self._action_client = ActionClient(
            self, MoveToPosition, '/move_to_position'
        )

    def send_goal(self, x, y, theta=0.0):
        goal_msg = MoveToPosition.Goal()
        goal_msg.target_x = x
        goal_msg.target_y = y
        goal_msg.target_theta = theta

        self._action_client.wait_for_server()
        self.get_logger().info(f'Sending goal: ({x}, {y})')

        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return

        self.get_logger().info('Goal accepted!')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def feedback_callback(self, feedback_msg):
        fb = feedback_msg.feedback
        self.get_logger().info(
            f'Position: ({fb.current_x:.2f}, {fb.current_y:.2f}) | '
            f'Remaining: {fb.distance_remaining:.2f}m'
        )

    def get_result_callback(self, future):
        result = future.result().result
        if result.success:
            self.get_logger().info(
                f'Arrived at ({result.final_x:.2f}, {result.final_y:.2f}) '
                f'in {result.total_time_seconds:.1f}s'
            )
        else:
            self.get_logger().warn(f'Failed: {result.message}')
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)
    client = MoveActionClient()
    client.send_goal(3.0, 4.0)
    rclpy.spin(client)


if __name__ == '__main__':
    main()

Comparison: When to Use What?

Here's a summary table to help you decide quickly:

Criterion Topic Service Action
Pattern Pub/Sub Request/Response Goal/Feedback/Result
Synchronous Asynchronous Synchronous (blocking) Asynchronous
Feedback Continuous (stream) Single response only Continuous feedback + 1 result
Cancel N/A No Yes
Use case Sensor data, state Query, trigger Navigation, long task
Example /scan, /cmd_vel /get_map, /set_mode /navigate_to_pose

Simple rules:

Topic, Service, and Action comparison diagram in ROS 2

Debug Communication with CLI

Some useful CLI commands for debugging:

# === Topics ===
ros2 topic list -t                     # List with message type
ros2 topic info /sensor_data --verbose # Detailed QoS info
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
  "{linear: {x: 0.5}, angular: {z: 0.3}}"  # Publish manually

# === Services ===
ros2 service list -t
ros2 service type /get_battery_status
ros2 service call /get_battery_status \
  my_robot_interfaces/srv/GetBatteryStatus \
  "{robot_id: 'robot_01'}"

# === Actions ===
ros2 action list -t
ros2 action info /move_to_position
ros2 action send_goal /move_to_position \
  my_robot_interfaces/action/MoveToPosition \
  "{target_x: 2.0, target_y: 3.0, target_theta: 0.0}" \
  --feedback  # Display feedback

Summary

You now understand the 3 core communication primitives of ROS 2:

Mastering these 3 mechanisms is the foundation for designing any robot system. In Part 3, we'll apply all of this to a real-world application: using Nav2 for your robot to auto-map with SLAM and move autonomously.


Related Articles

Related Posts

Deep DiveDigital Twins và ROS 2: Simulation trong sản xuất
simulationros2digital-twinPart 6

Digital Twins và ROS 2: Simulation trong sản xuất

Ứng dụng simulation trong công nghiệp — digital twins, ROS 2 + Gazebo/Isaac integration cho nhà máy thông minh.

3/4/202611 min read
TutorialSim-to-Real Pipeline: Từ training đến robot thật
simulationsim2realtutorialPart 5

Sim-to-Real Pipeline: Từ training đến robot thật

End-to-end guide: train policy trong sim, evaluate, domain randomization, deploy lên robot thật và iterate.

2/4/202615 min read
TutorialBắt đầu với MuJoCo: Cài đặt đến mô phỏng robot đầu tiên
simulationmujocotutorialPart 2

Bắt đầu với MuJoCo: Cài đặt đến mô phỏng robot đầu tiên

Hands-on tutorial MuJoCo — cài đặt, tạo MJCF model, simulate robot arm và visualize kết quả với Python.

30/3/202611 min read