Giao tiếp trong ROS 2 — Trái tim của mọi hệ thống robot
Một hệ thống robot bao gồm hàng chục, thậm chí hàng trăm node. Sensor node đọc LiDAR, camera node xử lý ảnh, planner node tính đường đi, motor node điều khiển bánh xe. Tất cả cần giao tiếp với nhau. ROS 2 cung cấp 3 cơ chế giao tiếp chính: Topics, Services và Actions. Hiểu rõ khi nào dùng cái gì là kỹ năng quan trọng nhất của một kỹ sư robotics.
Bài này nối tiếp Phần 1: Cài đặt và Node đầu tiên, nơi bạn đã viết publisher/subscriber đơn giản. Giờ chúng ta sẽ đi sâu hơn.
Topics — Dòng chảy dữ liệu liên tục
Khi nào dùng Topic?
Topic phù hợp khi bạn cần stream dữ liệu liên tục từ một hoặc nhiều nguồn đến một hoặc nhiều nơi nhận. Đặc điểm:
- Publish/Subscribe (1-to-many, many-to-many)
- Bất đồng bộ — publisher không biết ai đang subscribe
- Không có response — gửi đi và quên
Ví dụ thực tế: sensor data (LiDAR scan, camera frames, IMU readings), robot state (vị trí, vận tốc), log messages.
Custom Message
Trong Phần 1, chúng ta dùng std_msgs/Float32. Trong thực tế, bạn sẽ cần tạo message riêng. Tạo package chứa custom messages:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_cmake my_robot_interfaces \
--dependencies std_msgs geometry_msgs
Tạo file message:
# ~/ros2_ws/src/my_robot_interfaces/msg/SensorData.msg
# Dữ liệu tổng hợp từ nhiều sensor
std_msgs/Header header
float32 temperature
float32 humidity
float32 battery_voltage
bool emergency_stop
string status_message
Cập nhật CMakeLists.txt:
# Thêm vào CMakeLists.txt
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SensorData.msg"
DEPENDENCIES std_msgs geometry_msgs
)
Cập nhật package.xml:
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
Build và kiểm tra:
cd ~/ros2_ws
colcon build --packages-select my_robot_interfaces
source install/setup.bash
# Kiểm tra message đã được tạo
ros2 interface show my_robot_interfaces/msg/SensorData
Publisher với Custom Message
# ~/ros2_ws/src/my_first_robot/my_first_robot/sensor_data_publisher.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import SensorData
import random
from builtin_interfaces.msg import Time
class SensorDataPublisher(Node):
def __init__(self):
super().__init__('sensor_data_publisher')
self.publisher_ = self.create_publisher(SensorData, '/sensor_data', 10)
self.timer = self.create_timer(0.5, self.publish_sensor_data)
self.get_logger().info('Sensor data publisher started (2 Hz)')
def publish_sensor_data(self):
msg = SensorData()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'base_link'
msg.temperature = 25.0 + random.uniform(-3.0, 3.0)
msg.humidity = 60.0 + random.uniform(-10.0, 10.0)
msg.battery_voltage = 12.6 - random.uniform(0.0, 0.5)
msg.emergency_stop = False
msg.status_message = 'nominal'
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = SensorDataPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
QoS — Quality of Service
QoS là một trong những tính năng mạnh nhất của ROS 2 so với ROS 1. Bạn có thể cấu hình cách dữ liệu được truyền cho từng topic:
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
# QoS cho sensor data (chấp nhận mất vài message)
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
# QoS cho lệnh điều khiển (không được mất message)
command_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
# QoS cho map data (subscriber nhận được dù join muộn)
map_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# Sử dụng
self.publisher_ = self.create_publisher(LaserScan, '/scan', sensor_qos)
Chọn QoS thế nào?
| Tình huống | Reliability | Durability | Depth |
|---|---|---|---|
| Sensor data (LiDAR, camera) | BEST_EFFORT | VOLATILE | 5-10 |
| Lệnh điều khiển (cmd_vel) | RELIABLE | VOLATILE | 10 |
| Map, TF static | RELIABLE | TRANSIENT_LOCAL | 1 |
| Log, diagnostics | BEST_EFFORT | VOLATILE | 20 |
Services — Hỏi và trả lời
Khi nào dùng Service?
Service phù hợp khi bạn cần request một kết quả cụ thể và chờ response:
- Request/Response (1-to-1)
- Đồng bộ — client gửi request, đợi server trả lời
- Không phù hợp cho long-running tasks (sẽ block)
Ví dụ thực tế: lấy trạng thái pin, trigger chụp ảnh, reset sensor, thay đổi mode hoạt động.
Tạo Custom Service
# ~/ros2_ws/src/my_robot_interfaces/srv/GetBatteryStatus.srv
# Request (trên dấu ---)
string robot_id
---
# Response (dưới dấu ---)
float32 voltage
float32 percentage
float32 estimated_runtime_minutes
string health_status
bool is_charging
Cập nhật CMakeLists.txt:
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SensorData.msg"
"srv/GetBatteryStatus.srv"
DEPENDENCIES std_msgs geometry_msgs
)
Service Server
# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_server.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus
class BatteryServiceServer(Node):
def __init__(self):
super().__init__('battery_service_server')
self.srv = self.create_service(
GetBatteryStatus,
'/get_battery_status',
self.handle_battery_request
)
# Giả lập trạng thái pin
self.voltage = 12.4
self.is_charging = False
self.get_logger().info('Battery service server ready')
def handle_battery_request(self, request, response):
self.get_logger().info(f'Battery status requested for: {request.robot_id}')
response.voltage = self.voltage
response.percentage = max(0.0, (self.voltage - 10.0) / 2.6 * 100.0)
response.estimated_runtime_minutes = response.percentage * 1.2
response.is_charging = self.is_charging
if response.percentage > 50.0:
response.health_status = 'good'
elif response.percentage > 20.0:
response.health_status = 'low'
else:
response.health_status = 'critical'
self.get_logger().info(
f'Response: {response.voltage}V ({response.percentage:.0f}%)'
)
return response
def main(args=None):
rclpy.init(args=args)
node = BatteryServiceServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
Service Client (async)
# ~/ros2_ws/src/my_first_robot/my_first_robot/battery_service_client.py
import rclpy
from rclpy.node import Node
from my_robot_interfaces.srv import GetBatteryStatus
class BatteryServiceClient(Node):
def __init__(self):
super().__init__('battery_service_client')
self.client = self.create_client(
GetBatteryStatus,
'/get_battery_status'
)
# Đợi server sẵn sàng
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for battery service...')
def send_request(self, robot_id):
request = GetBatteryStatus.Request()
request.robot_id = robot_id
# Gọi async — không block event loop
future = self.client.call_async(request)
return future
def main(args=None):
rclpy.init(args=args)
client = BatteryServiceClient()
future = client.send_request('robot_01')
rclpy.spin_until_future_complete(client, future)
result = future.result()
client.get_logger().info(
f'Battery: {result.voltage}V | {result.percentage:.0f}% | '
f'Status: {result.health_status} | Charging: {result.is_charging}'
)
client.destroy_node()
rclpy.shutdown()
Test từ command line:
# Terminal 1: Chạy server
ros2 run my_first_robot battery_server
# Terminal 2: Gọi service từ CLI
ros2 service call /get_battery_status \
my_robot_interfaces/srv/GetBatteryStatus \
"{robot_id: 'robot_01'}"
Actions — Nhiệm vụ dài hạn với feedback
Khi nào dùng Action?
Action dùng cho long-running tasks cần feedback trung gian và có thể cancel giữa chừng:
- Goal/Feedback/Result pattern
- Bất đồng bộ — client gửi goal, nhận feedback liên tục, cuối cùng nhận result
- Cancelable — có thể huỷ task giữa chừng
Ví dụ thực tế: navigation đến điểm đích (feedback = vị trí hiện tại), quét bản đồ (feedback = % hoàn thành), di chuyển cánh tay robot (feedback = joint angles hiện tại).
Tạo Custom Action
# ~/ros2_ws/src/my_robot_interfaces/action/MoveToPosition.action
# Goal
float32 target_x
float32 target_y
float32 target_theta
---
# Result
bool success
float32 final_x
float32 final_y
float32 total_time_seconds
string message
---
# Feedback
float32 current_x
float32 current_y
float32 distance_remaining
float32 estimated_time_remaining
Cập nhật CMakeLists.txt:
find_package(action_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SensorData.msg"
"srv/GetBatteryStatus.srv"
"action/MoveToPosition.action"
DEPENDENCIES std_msgs geometry_msgs action_msgs
)
Action Server
# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_server.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from my_robot_interfaces.action import MoveToPosition
import math
import time
class MoveActionServer(Node):
def __init__(self):
super().__init__('move_action_server')
self._action_server = ActionServer(
self,
MoveToPosition,
'/move_to_position',
execute_callback=self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
)
# Vị trí hiện tại của robot
self.current_x = 0.0
self.current_y = 0.0
self.speed = 0.5 # m/s
self.get_logger().info('Move action server ready')
def goal_callback(self, goal_request):
self.get_logger().info(
f'Received goal: ({goal_request.target_x}, {goal_request.target_y})'
)
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
self.get_logger().info('Received cancel request')
return CancelResponse.ACCEPT
async def execute_callback(self, goal_handle):
self.get_logger().info('Executing move...')
target_x = goal_handle.request.target_x
target_y = goal_handle.request.target_y
start_time = time.time()
feedback_msg = MoveToPosition.Feedback()
while True:
# Tính khoảng cách còn lại
dx = target_x - self.current_x
dy = target_y - self.current_y
distance = math.sqrt(dx * dx + dy * dy)
# Đã đến nơi?
if distance < 0.05:
break
# Kiểm tra cancel
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = MoveToPosition.Result()
result.success = False
result.message = 'Movement canceled'
return result
# Di chuyển một bước
step = min(self.speed * 0.1, distance)
self.current_x += (dx / distance) * step
self.current_y += (dy / distance) * step
# Gửi feedback
feedback_msg.current_x = self.current_x
feedback_msg.current_y = self.current_y
feedback_msg.distance_remaining = distance
feedback_msg.estimated_time_remaining = distance / self.speed
goal_handle.publish_feedback(feedback_msg)
time.sleep(0.1) # 10 Hz update
# Hoàn thành
goal_handle.succeed()
result = MoveToPosition.Result()
result.success = True
result.final_x = self.current_x
result.final_y = self.current_y
result.total_time_seconds = time.time() - start_time
result.message = 'Reached target position'
self.get_logger().info(
f'Move completed in {result.total_time_seconds:.1f}s'
)
return result
def main(args=None):
rclpy.init(args=args)
node = MoveActionServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
Action Client
# ~/ros2_ws/src/my_first_robot/my_first_robot/move_action_client.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from my_robot_interfaces.action import MoveToPosition
class MoveActionClient(Node):
def __init__(self):
super().__init__('move_action_client')
self._action_client = ActionClient(
self, MoveToPosition, '/move_to_position'
)
def send_goal(self, x, y, theta=0.0):
goal_msg = MoveToPosition.Goal()
goal_msg.target_x = x
goal_msg.target_y = y
goal_msg.target_theta = theta
self._action_client.wait_for_server()
self.get_logger().info(f'Sending goal: ({x}, {y})')
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted!')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
fb = feedback_msg.feedback
self.get_logger().info(
f'Position: ({fb.current_x:.2f}, {fb.current_y:.2f}) | '
f'Remaining: {fb.distance_remaining:.2f}m'
)
def get_result_callback(self, future):
result = future.result().result
if result.success:
self.get_logger().info(
f'Arrived at ({result.final_x:.2f}, {result.final_y:.2f}) '
f'in {result.total_time_seconds:.1f}s'
)
else:
self.get_logger().warn(f'Failed: {result.message}')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
client = MoveActionClient()
client.send_goal(3.0, 4.0)
rclpy.spin(client)
if __name__ == '__main__':
main()
So sánh: Khi nào dùng gì?
Đây là bảng tổng hợp giúp bạn quyết định nhanh:
| Tiêu chí | Topic | Service | Action |
|---|---|---|---|
| Pattern | Pub/Sub | Request/Response | Goal/Feedback/Result |
| Đồng bộ | Bất đồng bộ | Đồng bộ (blocking) | Bất đồng bộ |
| Feedback | Liên tục (stream) | Chỉ 1 response | Feedback liên tục + 1 result |
| Cancel | N/A | Không | Có |
| Use case | Sensor data, state | Query, trigger | Navigation, long task |
| Ví dụ | /scan, /cmd_vel |
/get_map, /set_mode |
/navigate_to_pose |
Quy tắc đơn giản:
- Dữ liệu chảy liên tục? => Topic
- Hỏi/đáp nhanh (< 1 giây)? => Service
- Task chạy lâu, cần theo dõi tiến độ? => Action
Debug giao tiếp với CLI
Một số lệnh CLI hữu ích cho debug:
# === Topics ===
ros2 topic list -t # List kèm message type
ros2 topic info /sensor_data --verbose # Chi tiết QoS
ros2 topic pub /cmd_vel geometry_msgs/msg/Twist \
"{linear: {x: 0.5}, angular: {z: 0.3}}" # Publish thủ công
# === Services ===
ros2 service list -t
ros2 service type /get_battery_status
ros2 service call /get_battery_status \
my_robot_interfaces/srv/GetBatteryStatus \
"{robot_id: 'robot_01'}"
# === Actions ===
ros2 action list -t
ros2 action info /move_to_position
ros2 action send_goal /move_to_position \
my_robot_interfaces/action/MoveToPosition \
"{target_x: 2.0, target_y: 3.0, target_theta: 0.0}" \
--feedback # Hiển thị feedback
Tổng kết
Bạn đã nắm được 3 primitives giao tiếp cốt lõi của ROS 2:
- Topics cho dữ liệu stream liên tục với QoS control
- Services cho request/response nhanh
- Actions cho long-running tasks với feedback và cancel
Hiểu rõ 3 cơ chế này là nền tảng để thiết kế bất kỳ hệ thống robot nào. Trong Phần 3, chúng ta sẽ áp dụng tất cả vào một ứng dụng thực tế: dùng Nav2 để robot tự lập bản đồ SLAM và di chuyển tự động.
Bài viết liên quan
- ROS 2 từ A đến Z (Phần 1): Cài đặt và Node đầu tiên — Cài đặt, workspace, publisher/subscriber
- Giới thiệu ROS 2: Nền tảng lập trình robot thế hệ mới — Tổng quan kiến trúc ROS 2
- MQTT cho giao tiếp robot IoT — So sánh MQTT và ROS 2 DDS