Bridging Python Agents to ROS Controllers with rclpy
Overview
This lesson covers how to integrate Python-based AI agents with ROS 2 robot controllers using the rclpy client library. This lesson will cover writing Python code to interact with ROS 2 systems, enabling intelligent control.
Learning Objectives
By the end of this lesson, you should be able to:
- Understand how to use rclpy to interface Python agents with ROS 2
- Create ROS 2 nodes that act as bridges between AI agents and robot controllers
- Implement communication patterns between Python agents and ROS 2 systems
- Design architectures for AI-driven robot control
Introduction to rclpy
rclpy is the Python client library for ROS 2. It provides Python bindings for the ROS 2 client library (rcl) and the ROS 2 middleware layer (rmw). This allows Python programs to interact with ROS 2 systems.
Architecture of AI-Agent-to-ROS Bridge
The bridge between AI agents and ROS 2 controllers typically involves:
- An AI agent running as a Python process
- A ROS 2 bridge node that translates between the AI agent and ROS 2
- ROS 2 controllers that execute commands on the robot
Example: Simple AI Agent Communicating with ROS 2
Let's create a simple example where a Python AI agent makes decisions and sends commands to a robot:
AI Agent Node
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
import random
class AIAgentNode(Node):
def __init__(self):
super().__init__('ai_agent_node')
# Publisher to send commands to robot
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
# Subscriber to receive sensor data
self.sensor_subscriber = self.create_subscription(
String, '/sensor_data', self.sensor_callback, 10)
# Timer for decision making
self.timer = self.create_timer(1.0, self.make_decision)
self.sensor_data = None
self.get_logger().info('AI Agent Node initialized')
def sensor_callback(self, msg):
self.sensor_data = msg.data
self.get_logger().info(f'Received sensor data: {self.sensor_data}')
def make_decision(self):
# Simple AI logic - in real applications, this could involve complex ML models
twist_msg = Twist()
if self.sensor_data and 'obstacle' in self.sensor_data:
# If obstacle detected, turn
twist_msg.linear.x = 0.0
twist_msg.angular.z = 0.5 # Turn right
else:
# Otherwise, move forward
twist_msg.linear.x = 0.5 # Move forward
twist_msg.angular.z = 0.0
self.cmd_vel_publisher.publish(twist_msg)
self.get_logger().info(f'Published command: linear.x={twist_msg.linear.x}, angular.z={twist_msg.angular.z}')
def main(args=None):
rclpy.init(args=args)
ai_agent_node = AIAgentNode()
try:
rclpy.spin(ai_agent_node)
except KeyboardInterrupt:
pass
finally:
ai_agent_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Advanced: Using External AI Libraries
You can integrate more sophisticated AI libraries like TensorFlow, PyTorch, or scikit-learn:
Example with Decision Logic
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
from collections import deque
class AdvancedAIAgentNode(Node):
def __init__(self):
super().__init__('advanced_ai_agent_node')
# Publishers and subscribers
self.cmd_vel_publisher = self.create_publisher(Twist, '/cmd_vel', 10)
self.scan_subscriber = self.create_subscription(
LaserScan, '/scan', self.scan_callback, 10)
# Timer for decision making
self.timer = self.create_timer(0.1, self.make_decision) # 10Hz
# State management
self.scan_data = None
self.command_history = deque(maxlen=10) # Keep last 10 commands
self.obstacle_threshold = 1.0 # meters
self.get_logger().info('Advanced AI Agent Node initialized')
def scan_callback(self, msg):
# Store the latest scan data
self.scan_data = np.array(msg.ranges)
# Filter out invalid readings (inf, nan)
self.scan_data = np.where((self.scan_data == float('inf')) |
np.isnan(self.scan_data),
msg.range_max,
self.scan_data)
def simple_navigation_logic(self):
"""Simple navigation algorithm"""
if self.scan_data is None:
return 0.0, 0.0 # Stop if no data
# Find the minimum distance in front (�30 degrees)
front_indices = slice(len(self.scan_data)//2 - 30, len(self.scan_data)//2 + 30)
front_distances = self.scan_data[front_indices]
min_distance = np.min(front_distances)
if min_distance < self.obstacle_threshold:
# Obstacle detected, turn
linear_vel = 0.0
angular_vel = 0.5
else:
# Clear path, move forward
linear_vel = 0.3
angular_vel = 0.0
return linear_vel, angular_vel
def make_decision(self):
if self.scan_data is None:
return
linear_vel, angular_vel = self.simple_navigation_logic()
# Create and publish twist message
twist_msg = Twist()
twist_msg.linear.x = linear_vel
twist_msg.angular.z = angular_vel
self.cmd_vel_publisher.publish(twist_msg)
# Store command in history
self.command_history.append((linear_vel, angular_vel))
self.get_logger().info(f'Command: linear.x={linear_vel:.2f}, angular.z={angular_vel:.2f}')
def main(args=None):
rclpy.init(args=args)
ai_agent_node = AdvancedAIAgentNode()
try:
rclpy.spin(ai_agent_node)
except KeyboardInterrupt:
pass
finally:
ai_agent_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Integration Patterns
1. Direct Integration
The AI agent runs as a ROS 2 node directly, using rclpy for all communication.
2. Bridge Pattern
The AI agent runs separately and communicates with a bridge node via inter-process communication (IPC) like shared memory, TCP sockets, or message queues.
3. Service-Based Integration
The AI agent calls ROS 2 services to request specific actions from the robot.
Hands-On Exercise
-
Create a new ROS 2 package for this lesson:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python ai_ros_bridge
cd ai_ros_bridge -
Create the AI agent node in
ai_ros_bridge/ai_ros_bridge/simple_ai_agent.pywith the first example code above. -
Update the
setup.pyfile to include your new node as an executable. -
Build and source your workspace:
cd ~/ros2_ws
colcon build --packages-select ai_ros_bridge
source install/setup.bash -
In a simulation environment (like Gazebo), run your AI agent:
ros2 run ai_ros_bridge simple_ai_agent
Troubleshooting Tips
- Ensure proper ROS 2 environment setup before running nodes
- Check topic names match between publisher and subscriber
- Verify that the robot simulation or hardware is properly connected
- Use
ros2 topic listandros2 node listto verify communication
Summary
This lesson demonstrated how to bridge Python-based AI agents with ROS 2 robot controllers using rclpy. This integration enables intelligent control of robots using AI algorithms. The next lesson will cover understanding URDF (Unified Robot Description Format) for humanoids.