Lesson 5.2: Implementing the Autonomous Humanoid: Integration Challenges
Overview
This lesson addresses the practical challenges of integrating all components from previous modules into a unified autonomous humanoid system. You'll learn to handle real-world integration issues, resolve timing and synchronization problems, and create a cohesive system from disparate components.
Learning Objectives
By the end of this lesson, you should be able to:
- Identify and resolve common integration challenges in multi-component robotics systems
- Implement robust data synchronization between components with different update rates
- Handle timing and latency issues in real-time robotics applications
- Create fault-tolerant integration patterns
- Debug and monitor integrated system performance
- Optimize system-wide performance across all components
Integration Architecture Overview
System Integration Challenges Map
�������������������������������������������������������������������������
Integration Challenges
�������������������������������������������������������������������������$
����������������� ����������������� �����������������
Voice LLM Cognitive Navigation
Processing ���� Planning ���� & Path Plan
(Variable Hz) (Variable Hz) (10-20 Hz)
[~5-20 Hz] [~1-5 Hz]
����������������� ����������������� �����������������
� � �
����������������� ����������������� �����������������
Isaac Sim Isaac ROS Manipulation
(Simulation) (Perception) (Variable Hz)
(60-120 Hz) (15-30 Hz) [~5-10 Hz]
����������������� ����������������� �����������������
������������������������<������������������������
�
Real-Time Coordination
& System Integration
�������������������������������������������������������������������������
Challenge 1: Timing and Synchronization
Problem: Different Update Rates
Different components operate at different frequencies:
- Simulation: 60-120 Hz
- Vision processing: 15-30 Hz
- Voice processing: 5-20 Hz (triggered by voice activity)
- Navigation: 10-20 Hz
- LLM calls: 1-5 Hz (expensive operations)
Solution: Asynchronous Data Handling with Buffering
# capstone_integration/timing_manager.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Header
from sensor_msgs.msg import Image, CameraInfo, Imu
from geometry_msgs.msg import PoseStamped, Twist
from builtin_interfaces.msg import Time
import threading
import queue
import time
from collections import deque
import numpy as np
class TimingManager(Node):
def __init__(self):
super().__init__('timing_manager')
# Data buffers for different components
self.vision_buffer = deque(maxlen=10) # Store last 10 vision frames
self.voice_buffer = deque(maxlen=5) # Store last 5 voice commands
self.imu_buffer = deque(maxlen=20) # Store last 20 IMU readings
self.odom_buffer = deque(maxlen=20) # Store last 20 odometry readings
# Timestamp synchronization
self.last_vision_time = None
self.last_voice_time = None
self.last_nav_time = None
# Mutex for thread-safe access
self.data_mutex = threading.Lock()
# Subscriptions
self.vision_sub = self.create_subscription(
Image,
'/camera/color/image_rect_color',
self.vision_callback,
10
)
self.voice_sub = self.create_subscription(
String,
'/capstone/voice_command',
self.voice_callback,
10
)
self.odom_sub = self.create_subscription(
Odometry,
'/odom',
self.odom_callback,
10
)
self.imu_sub = self.create_subscription(
Imu,
'/imu/data',
self.imu_callback,
10
)
# Publishers
self.synced_data_pub = self.create_publisher(
String, # This would be a custom message in practice
'/capstone/synced_data',
10
)
# Timer for periodic synchronization checks
self.sync_timer = self.create_timer(0.1, self.check_synchronization_status)
self.get_logger().info('Timing manager initialized')
def vision_callback(self, msg):
"""Process incoming vision data with timestamp"""
with self.data_mutex:
# Add to buffer with timestamp
self.vision_buffer.append({
'data': msg,
'timestamp': msg.header.stamp,
'received_time': self.get_clock().now().to_msg()
})
self.last_vision_time = msg.header.stamp
def voice_callback(self, msg):
"""Process incoming voice commands with timestamp"""
with self.data_mutex:
self.voice_buffer.append({
'data': msg,
'timestamp': self.get_clock().now().to_msg(),
'received_time': self.get_clock().now().to_msg()
})
self.last_voice_time = self.get_clock().now().to_msg()
def odom_callback(self, msg):
"""Process incoming odometry data"""
with self.data_mutex:
self.odom_buffer.append({
'data': msg,
'timestamp': msg.header.stamp,
'received_time': self.get_clock().now().to_msg()
})
def imu_callback(self, msg):
"""Process incoming IMU data"""
with self.data_mutex:
self.imu_buffer.append({
'data': msg,
'timestamp': msg.header.stamp,
'received_time': self.get_clock().now().to_msg()
})
def check_synchronization_status(self):
"""Periodically check synchronization status"""
with self.data_mutex:
# Calculate time differences between components
current_time = self.get_clock().now()
if self.last_vision_time:
vision_age = (current_time.nanoseconds - self.last_vision_time.nanosec * 1e9) / 1e9
if vision_age > 0.5: # More than 500ms old
self.get_logger().warn(f'Vision data is {vision_age:.2f}s old')
if self.last_voice_time:
voice_age = (current_time.nanoseconds - self.last_voice_time.nanosec * 1e9) / 1e9
if voice_age > 5.0: # More than 5s old
self.get_logger().warn(f'Voice command is {voice_age:.2f}s old')
def get_synchronized_data(self, target_time, time_tolerance=0.1):
"""
Get data from all components synchronized to target_time
"""
with self.data_mutex:
# Find data closest to target_time
synchronized_data = {}
# Find closest vision data
if self.vision_buffer:
closest_vision = min(
self.vision_buffer,
key=lambda x: abs(
(x['timestamp'].nanosec * 1e9 + x['timestamp'].sec * 1e9) -
(target_time.nanosec * 1e9 + target_time.sec * 1e9)
)
)
# Check if within tolerance
time_diff = abs(
(closest_vision['timestamp'].nanosec * 1e9 + closest_vision['timestamp'].sec * 1e9) -
(target_time.nanosec * 1e9 + target_time.sec * 1e9)
) / 1e9
if time_diff <= time_tolerance:
synchronized_data['vision'] = closest_vision['data']
else:
self.get_logger().warn(f'Vision data too old for sync: {time_diff:.3f}s')
# Similar process for other data sources...
return synchronized_data
def interpolate_pose(self, pose1, pose2, alpha):
"""Interpolate between two poses based on time difference"""
# Linear interpolation for position
interpolated_pos = Pose()
interpolated_pos.position.x = pose1.position.x + alpha * (pose2.position.x - pose1.position.x)
interpolated_pos.position.y = pose1.position.y + alpha * (pose2.position.y - pose1.position.y)
interpolated_pos.position.z = pose1.position.z + alpha * (pose2.position.z - pose1.position.z)
# Slerp for orientation (simplified)
# In practice, use proper quaternion slerp
interpolated_pos.orientation.x = pose1.orientation.x + alpha * (pose2.orientation.x - pose1.orientation.x)
interpolated_pos.orientation.y = pose1.orientation.y + alpha * (pose2.orientation.y - pose1.orientation.y)
interpolated_pos.orientation.z = pose1.orientation.z + alpha * (pose2.orientation.z - pose1.orientation.z)
interpolated_pos.orientation.w = pose1.orientation.w + alpha * (pose2.orientation.w - pose1.orientation.w)
# Normalize quaternion
norm = np.sqrt(
interpolated_pos.orientation.x**2 +
interpolated_pos.orientation.y**2 +
interpolated_pos.orientation.z**2 +
interpolated_pos.orientation.w**2
)
if norm > 0:
interpolated_pos.orientation.x /= norm
interpolated_pos.orientation.y /= norm
interpolated_pos.orientation.z /= norm
interpolated_pos.orientation.w /= norm
return interpolated_pos
Advanced Synchronization: Time-Shifted Data Fusion
# capstone_integration/temporal_fusion.py
import numpy as np
from scipy.interpolate import interp1d
import threading
from collections import OrderedDict
class TemporalDataFusion:
def __init__(self, max_history_seconds=5.0):
self.max_history = max_history_seconds
self.data_history = {} # Stores data by source
self.timestamps = {} # Stores timestamps by source
self.lock = threading.Lock()
def add_data(self, source_name, data, timestamp):
"""Add data from a source with its timestamp"""
with self.lock:
if source_name not in self.data_history:
self.data_history[source_name] = []
self.timestamps[source_name] = []
# Add data and timestamp
self.data_history[source_name].append(data)
self.timestamps[source_name].append(timestamp)
# Trim history to max_history_seconds
self.trim_history(source_name)
def trim_history(self, source_name):
"""Remove old data to maintain history size"""
current_time = time.time()
cutoff_time = current_time - self.max_history
# Remove old entries
valid_indices = [
i for i, ts in enumerate(self.timestamps[source_name])
if ts >= cutoff_time
]
self.data_history[source_name] = [
self.data_history[source_name][i] for i in valid_indices
]
self.timestamps[source_name] = [
self.timestamps[source_name][i] for i in valid_indices
]
def get_fused_data_at_time(self, target_time, sources_needed=None):
"""
Get fused data from all sources at a specific time point
Interpolates data to match target_time
"""
with self.lock:
fused_data = {}
sources = sources_needed if sources_needed else self.data_history.keys()
for source in sources:
if source not in self.data_history or len(self.data_history[source]) < 2:
# Not enough data for interpolation
if source in self.data_history and len(self.data_history[source]) > 0:
# Use latest available data
fused_data[source] = {
'data': self.data_history[source][-1],
'timestamp': self.timestamps[source][-1],
'interpolated': False
}
continue
# Perform temporal interpolation
timestamps = np.array(self.timestamps[source])
data_points = self.data_history[source]
if target_time < timestamps[0] or target_time > timestamps[-1]:
# Extrapolation needed - use nearest value
if target_time < timestamps[0]:
idx = 0
else:
idx = -1
fused_data[source] = {
'data': data_points[idx],
'timestamp': timestamps[idx],
'interpolated': False
}
else:
# Interpolation
interpolated_data = self.interpolate_data_at_time(
timestamps, data_points, target_time
)
fused_data[source] = {
'data': interpolated_data,
'timestamp': target_time,
'interpolated': True
}
return fused_data
def interpolate_data_at_time(self, timestamps, data_points, target_time):
"""Interpolate data to match target timestamp"""
# This is a simplified example - actual implementation would depend on data type
# For poses: interpolate position and slerp orientation
# For images: use nearest neighbor (can't interpolate images)
# For scalars: linear interpolation
# Find indices for interpolation
idx_after = np.searchsorted(timestamps, target_time, side='left')
idx_before = idx_after - 1
if idx_before < 0 or idx_after >= len(timestamps):
# Use nearest value
return data_points[min(max(idx_after, 0), len(data_points)-1)]
# Calculate interpolation factor
time_before = timestamps[idx_before]
time_after = timestamps[idx_after]
alpha = (target_time - time_before) / (time_after - time_before)
# Interpolate based on data type
data_before = data_points[idx_before]
data_after = data_points[idx_after]
# This is a simplified interpolation - real implementation would be type-specific
if isinstance(data_before, (int, float)):
# Scalar interpolation
return data_before + alpha * (data_after - data_before)
elif hasattr(data_before, '__dict__'):
# Object with attributes - recursively interpolate numeric attributes
return self.interpolate_object_attributes(data_before, data_after, alpha)
else:
# Default: use data_before if interpolation is not feasible
return data_before
def interpolate_object_attributes(self, obj1, obj2, alpha):
"""Interpolate numeric attributes of an object"""
import copy
interpolated_obj = copy.deepcopy(obj1)
for attr_name in dir(obj1):
if not attr_name.startswith('_'): # Skip private attributes
attr1 = getattr(obj1, attr_name)
attr2 = getattr(obj2, attr_name)
if isinstance(attr1, (int, float)):
# Numeric attribute interpolation
setattr(interpolated_obj, attr_name, attr1 + alpha * (attr2 - attr1))
elif hasattr(attr1, '__dict__'):
# Recursive interpolation for nested objects
setattr(interpolated_obj, attr_name, self.interpolate_object_attributes(attr1, attr2, alpha))
return interpolated_obj
Challenge 2: Data Format and Type Conversion
Problem: Inconsistent Data Representations
Different components use different data formats:
- Isaac Sim: USD prims, Gf matrices
- ROS 2: Standard message types (Point, Pose, etc.)
- Isaac ROS: Specialized message types
- LLM: Text-based inputs/outputs
Solution: NITROS (NVIDIA Integrated Robotics Object Store) Bridge
# capstone_integration/nitros_bridge.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import Pose, Point, Quaternion
from std_msgs.msg import String
import numpy as np
import torch
from pxr import Gf, Usd, UsdGeom
class NitrosBridge(Node):
def __init__(self):
super().__init__('nitros_bridge')
# Initialize converters
self.ros_to_isaac_converter = ROSIsaacConverter()
self.isaac_to_ros_converter = IsaacROSConverter()
self.text_to_action_converter = TextActionConverter()
# Subscriptions for different data types
self.image_sub = self.create_subscription(
Image,
'/camera/color/image_rect_color',
self.image_conversion_handler,
10
)
self.pose_sub = self.create_subscription(
Pose,
'/capstone/target_pose',
self.pose_conversion_handler,
10
)
self.text_sub = self.create_subscription(
String,
'/capstone/llm_command',
self.text_conversion_handler,
10
)
# Publishers for converted data
self.isaac_ready_pub = self.create_publisher(
String, # Actually custom Isaac-ready message
'/capstone/isaac_ready_data',
10
)
self.get_logger().info('NITROS Bridge initialized')
def image_conversion_handler(self, ros_image):
"""Convert ROS Image to Isaac-compatible format"""
# Convert ROS Image to numpy array
image_np = self.ros_image_to_numpy(ros_image)
# Convert to Isaac format (e.g., for perception pipeline)
isaac_image = self.ros_to_isaac_converter.convert_image(ros_image)
# Publish converted image
converted_msg = String()
converted_msg.data = f"Converted image: {isaac_image.shape}"
self.isaac_ready_pub.publish(converted_msg)
def pose_conversion_handler(self, ros_pose):
"""Convert ROS Pose to Isaac Transform"""
# Convert ROS Pose to USD Transform
transform_matrix = self.ros_to_isaac_converter.convert_pose_to_transform(ros_pose)
# Apply transform to Isaac Sim prim
self.apply_transform_to_isaac_sim(transform_matrix)
def text_conversion_handler(self, text_msg):
"""Convert natural language to robot actions"""
# Parse natural language command
action_sequence = self.text_to_action_converter.parse_command(text_msg.data)
# Convert to Isaac Sim actions
isaac_actions = self.text_to_action_converter.convert_to_isaac_actions(action_sequence)
# Publish Isaac-compatible action commands
action_msg = String()
action_msg.data = str(isaac_actions)
self.isaac_ready_pub.publish(action_msg)
def ros_image_to_numpy(self, ros_image):
"""Convert ROS Image message to numpy array"""
import cv2
from cv_bridge import CvBridge
bridge = CvBridge()
cv_image = bridge.imgmsg_to_cv2(ros_image, desired_encoding='passthrough')
return cv_image
def apply_transform_to_isaac_sim(self, transform_matrix):
"""Apply transform to Isaac Sim prim"""
# This would interface with Isaac Sim's USD stage
# For now, just log the transform
self.get_logger().info(f'Applying transform: {transform_matrix}')
class ROSIsaacConverter:
"""Convert ROS data types to Isaac-compatible formats"""
@staticmethod
def convert_image(ros_image):
"""Convert ROS Image to Isaac perception format"""
# Convert to numpy array first
if ros_image.encoding in ['rgb8', 'bgr8']:
height, width = ros_image.height, ros_image.width
image_array = np.frombuffer(ros_image.data, dtype=np.uint8)
image_array = image_array.reshape((height, width, 3))
# Convert to tensor format expected by Isaac ROS
image_tensor = torch.from_numpy(image_array).permute(2, 0, 1).unsqueeze(0).float() / 255.0
return image_tensor
else:
# Handle other encodings
raise ValueError(f"Unsupported image encoding: {ros_image.encoding}")
@staticmethod
def convert_pose_to_transform(ros_pose):
"""Convert ROS Pose to USD Transform matrix"""
# Extract position and orientation from ROS Pose
pos = Gf.Vec3d(
ros_pose.position.x,
ros_pose.position.y,
ros_pose.position.z
)
rot = Gf.Quatf(
ros_pose.orientation.w,
ros_pose.orientation.x,
ros_pose.orientation.y,
ros_pose.orientation.z
)
# Create transform matrix
transform = Gf.Matrix4d()
transform.SetIdentity()
# Set translation
transform.SetTranslateOnly(pos)
# Set rotation (convert quaternion to rotation matrix)
rotation_matrix = Gf.Matrix4d()
rotation_matrix.SetRotateOnly(rot)
transform *= rotation_matrix
return transform
@staticmethod
def convert_twist_to_velocity(ros_twist):
"""Convert ROS Twist to Isaac velocity format"""
linear = Gf.Vec3d(
ros_twist.linear.x,
ros_twist.linear.y,
ros_twist.linear.z
)
angular = Gf.Vec3d(
ros_twist.angular.x,
ros_twist.angular.y,
ros_twist.angular.z
)
return linear, angular
class IsaacROSConverter:
"""Convert Isaac data types to ROS-compatible formats"""
@staticmethod
def convert_transform_to_pose(usd_transform):
"""Convert USD Transform to ROS Pose"""
# Extract translation (position)
translation = usd_transform.ExtractTranslation()
pos = Point()
pos.x = translation[0]
pos.y = translation[1]
pos.z = translation[2]
# Extract rotation (orientation as quaternion)
# This is simplified - actual extraction would be more complex
# For now, assuming we have a rotation matrix
rotation_matrix = usd_transform.GetMatrix3()
quat = IsaacROSConverter.matrix_to_quaternion(rotation_matrix)
orientation = Quaternion()
orientation.x = quat[0]
orientation.y = quat[1]
orientation.z = quat[2]
orientation.w = quat[3]
pose = Pose()
pose.position = pos
pose.orientation = orientation
return pose
@staticmethod
def matrix_to_quaternion(matrix):
"""Convert 3x3 rotation matrix to quaternion"""
# Implementation of rotation matrix to quaternion conversion
# This is a standard algorithm
trace = matrix[0][0] + matrix[1][1] + matrix[2][2]
if trace > 0:
s = np.sqrt(trace + 1.0) * 2 # s = 4 * qw
qw = 0.25 * s
qx = (matrix[2][1] - matrix[1][2]) / s
qy = (matrix[0][2] - matrix[2][0]) / s
qz = (matrix[1][0] - matrix[0][1]) / s
else:
if matrix[0][0] > matrix[1][1] and matrix[0][0] > matrix[2][2]:
s = np.sqrt(1.0 + matrix[0][0] - matrix[1][1] - matrix[2][2]) * 2
qw = (matrix[2][1] - matrix[1][2]) / s
qx = 0.25 * s
qy = (matrix[0][1] + matrix[1][0]) / s
qz = (matrix[0][2] + matrix[2][0]) / s
elif matrix[1][1] > matrix[2][2]:
s = np.sqrt(1.0 + matrix[1][1] - matrix[0][0] - matrix[2][2]) * 2
qw = (matrix[0][2] - matrix[2][0]) / s
qx = (matrix[0][1] + matrix[1][0]) / s
qy = 0.25 * s
qz = (matrix[1][2] + matrix[2][1]) / s
else:
s = np.sqrt(1.0 + matrix[2][2] - matrix[0][0] - matrix[1][1]) * 2
qw = (matrix[1][0] - matrix[0][1]) / s
qx = (matrix[0][2] + matrix[2][0]) / s
qy = (matrix[1][2] + matrix[2][1]) / s
qz = 0.25 * s
# Normalize
norm = np.sqrt(qx*qx + qy*qy + qz*qz + qw*qw)
return [qx/norm, qy/norm, qz/norm, qw/norm]
class TextActionConverter:
"""Convert natural language to robot actions"""
def __init__(self):
# Define action vocabulary and mappings
self.action_mappings = {
'move': ['go', 'navigate', 'travel', 'walk', 'drive'],
'grasp': ['pick up', 'take', 'grasp', 'catch', 'grab'],
'place': ['put', 'place', 'set', 'drop', 'release'],
'look': ['look at', 'observe', 'see', 'find', 'locate'],
'follow': ['follow', 'track', 'chase', 'accompany']
}
self.object_categories = [
'cube', 'sphere', 'cylinder', 'bottle', 'cup',
'box', 'object', 'person', 'wall', 'floor'
]
self.location_keywords = [
'kitchen', 'living room', 'bedroom', 'office',
'here', 'there', 'near', 'far', 'on', 'under', 'next to'
]
def parse_command(self, natural_language):
"""Parse natural language command into structured actions"""
command_lower = natural_language.lower()
# Extract action
action = self.extract_action(command_lower)
# Extract object
target_object = self.extract_object(command_lower)
# Extract location
target_location = self.extract_location(command_lower)
# Create structured command
structured_command = {
'raw_command': natural_language,
'parsed_action': action,
'target_object': target_object,
'target_location': target_location,
'confidence': self.estimate_parsing_confidence(command_lower)
}
return structured_command
def extract_action(self, command):
"""Extract action from command"""
for canonical_action, variations in self.action_mappings.items():
for variation in variations:
if variation in command:
return canonical_action
# If no match, return 'unknown'
return 'unknown'
def extract_object(self, command):
"""Extract target object from command"""
words = command.split()
for i, word in enumerate(words):
if word in ['the', 'a', 'an']:
if i + 1 < len(words):
next_word = words[i + 1]
if next_word in self.object_categories:
# Check if there's a color adjective before the object
if i > 0:
prev_word = words[i - 1]
if prev_word in ['red', 'blue', 'green', 'yellow', 'white', 'black', 'orange', 'purple']:
return f'{prev_word}_{next_word}'
return next_word
return 'unknown_object'
def extract_location(self, command):
"""Extract target location from command"""
for location in self.location_keywords:
if location in command:
return location
return 'current_location'
def estimate_parsing_confidence(self, command):
"""Estimate confidence in parsing result"""
confidence = 0.5 # Base confidence
# Increase confidence if we found clear action/object/location
if any(action in command for variations in self.action_mappings.values() for action in variations):
confidence += 0.2
if any(obj in command for obj in self.object_categories):
confidence += 0.2
if any(loc in command for loc in self.location_keywords):
confidence += 0.1
return min(confidence, 1.0)
def convert_to_isaac_actions(self, structured_command):
"""Convert structured command to Isaac Sim actions"""
action_sequence = []
action = structured_command['parsed_action']
target_obj = structured_command['target_object']
target_loc = structured_command['target_location']
if action == 'move':
action_sequence.append({
'type': 'navigation',
'destination': self.resolve_location(target_loc),
'priority': 1
})
elif action == 'grasp':
action_sequence.append({
'type': 'navigation',
'destination': self.approach_object_location(target_obj),
'priority': 1
})
action_sequence.append({
'type': 'manipulation',
'action': 'grasp',
'target_object': target_obj,
'priority': 2
})
elif action == 'place':
action_sequence.append({
'type': 'navigation',
'destination': self.resolve_location(target_loc),
'priority': 1
})
action_sequence.append({
'type': 'manipulation',
'action': 'release',
'target_object': target_obj,
'priority': 2
})
elif action == 'look':
action_sequence.append({
'type': 'pan_tilt',
'action': 'look_at',
'target_object': target_obj,
'priority': 1
})
elif action == 'follow':
action_sequence.append({
'type': 'navigation',
'action': 'follow',
'target_object': target_obj,
'priority': 1
})
else:
# Unknown action - just navigate to a safe spot
action_sequence.append({
'type': 'navigation',
'destination': [0, 0, 0], # Return to home position
'priority': 1
})
return action_sequence
def resolve_location(self, location_keyword):
"""Resolve location keyword to coordinates"""
# This would map semantic locations to actual coordinates
# In practice, this would use a semantic map
location_map = {
'kitchen': [3, 2, 0],
'living room': [0, 3, 0],
'bedroom': [-2, 1, 0],
'office': [2, -1, 0],
'current_location': [0, 0, 0], # Relative to current position
'here': [0, 0, 0],
'there': [1, 1, 0] # Default "there" location
}
return location_map.get(location_keyword, [0, 0, 0])
def approach_object_location(self, target_object):
"""Calculate approach location for target object"""
# This would use object detection to find the object's location
# For now, return a default approach location
return [1, 0, 0] # 1 meter in front of current position
Challenge 3: Error Handling and Fault Tolerance
Problem: Component Failures and Degraded Performance
In a complex system, individual components may fail or perform poorly, requiring graceful degradation.
Solution: Resilient Integration Patterns
# capstone_integration/fault_tolerance.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from sensor_msgs.msg import Image
import threading
import time
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, Any
class ComponentStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
FAILED = "failed"
UNKNOWN = "unknown"
@dataclass
class ComponentHealth:
name: str
status: ComponentStatus
last_heartbeat: float
error_count: int
performance_score: float # 0.0-1.0, where 1.0 is optimal
class HealthMonitor(Node):
def __init__(self):
super().__init__('health_monitor')
# Component health tracking
self.component_health = {}
self.health_mutex = threading.Lock()
# Component heartbeat timeouts (seconds)
self.heartbeat_timeouts = {
'vision_pipeline': 2.0,
'voice_processor': 5.0,
'navigation_stack': 1.0,
'llm_interface': 10.0,
'manipulation_planner': 2.0
}
# Performance thresholds
self.performance_thresholds = {
'vision_pipeline': 0.7, # 70% success rate
'voice_processor': 0.8, # 80% recognition accuracy
'navigation_stack': 0.85, # 85% success rate
'llm_interface': 0.9, # 90% availability
'manipulation_planner': 0.75 # 75% success rate
}
# Initialize component health
for comp_name in self.heartbeat_timeouts.keys():
self.component_health[comp_name] = ComponentHealth(
name=comp_name,
status=ComponentStatus.UNKNOWN,
last_heartbeat=time.time(),
error_count=0,
performance_score=0.0
)
# Subscriptions for component heartbeats
self.heartbeat_sub = self.create_subscription(
String,
'/capstone/component_heartbeat',
self.heartbeat_callback,
10
)
# Timer for health checks
self.health_check_timer = self.create_timer(1.0, self.perform_health_checks)
# Publisher for system status
self.status_pub = self.create_publisher(String, '/capstone/system_status', 10)
self.get_logger().info('Health monitor initialized')
def heartbeat_callback(self, msg):
"""Process component heartbeat"""
try:
# Expected format: "COMPONENT_NAME:STATUS:PERFORMANCE_SCORE"
parts = msg.data.split(':')
if len(parts) >= 3:
comp_name = parts[0]
status_str = parts[1]
perf_score = float(parts[2])
with self.health_mutex:
if comp_name in self.component_health:
self.component_health[comp_name].last_heartbeat = time.time()
self.component_health[comp_name].status = ComponentStatus(status_str)
self.component_health[comp_name].performance_score = perf_score
# Reset error count on successful heartbeat
self.component_health[comp_name].error_count = 0
self.get_logger().debug(f'Heartbeat received for {comp_name}: {status_str}, perf: {perf_score:.2f}')
except Exception as e:
self.get_logger().error(f'Error processing heartbeat: {str(e)}')
def perform_health_checks(self):
"""Check health of all components"""
current_time = time.time()
system_status = "healthy"
with self.health_mutex:
for comp_name, health in self.component_health.items():
# Check heartbeat timeout
time_since_heartbeat = current_time - health.last_heartbeat
if time_since_heartbeat > self.heartbeat_timeouts[comp_name]:
health.status = ComponentStatus.FAILED
health.error_count += 1
system_status = "degraded" # System is degraded if any component fails
self.get_logger().warn(f'Component {comp_name} heartbeat timeout: {time_since_heartbeat:.1f}s')
elif health.performance_score < self.performance_thresholds[comp_name]:
health.status = ComponentStatus.DEGRADED
system_status = "degraded"
self.get_logger().warn(f'Component {comp_name} performance degraded: {health.performance_score:.2f}')
else:
health.status = ComponentStatus.HEALTHY
# Publish overall system status
status_msg = String()
status_msg.data = system_status
self.status_pub.publish(status_msg)
def get_component_status(self, component_name):
"""Get status of a specific component"""
with self.health_mutex:
if component_name in self.component_health:
return self.component_health[component_name].status
return ComponentStatus.UNKNOWN
def is_component_available(self, component_name):
"""Check if component is available for use"""
status = self.get_component_status(component_name)
return status in [ComponentStatus.HEALTHY, ComponentStatus.DEGRADED]
class FallbackManager:
"""Manage fallback strategies for component failures"""
def __init__(self, health_monitor: HealthMonitor):
self.health_monitor = health_monitor
self.fallback_strategies = self.initialize_fallback_strategies()
def initialize_fallback_strategies(self):
"""Define fallback strategies for each component"""
return {
'vision_pipeline': [
# Primary: Full Isaac ROS detection
lambda: self.use_full_vision_pipeline(),
# Fallback 1: Simple color-based detection
lambda: self.use_color_based_detection(),
# Fallback 2: Use pre-recorded object locations
lambda: self.use_cached_object_locations(),
# Fallback 3: Abort and request human assistance
lambda: self.request_human_assistance()
],
'voice_processor': [
# Primary: Online LLM processing
lambda: self.use_online_voice_processing(),
# Fallback 1: Offline keyword spotting
lambda: self.use_offline_keyword_spotting(),
# Fallback 2: Use pre-programmed commands
lambda: self.use_preprogrammed_commands(),
# Fallback 3: Accept commands via GUI
lambda: self.use_gui_commands()
],
'navigation_stack': [
# Primary: Full Nav2 with global/local planners
lambda: self.use_full_navigation(),
# Fallback 1: Simple obstacle avoidance
lambda: self.use_simple_avoidance(),
# Fallback 2: Return to safe location
lambda: self.return_to_safe_location(),
# Fallback 3: Stop and wait for assistance
lambda: self.stop_and_wait()
]
}
def execute_with_fallback(self, primary_component, task_func):
"""Execute task with fallback strategies"""
if self.health_monitor.is_component_available(primary_component):
try:
result = task_func()
if result is not None:
return result
except Exception as e:
self.health_monitor.get_logger().warn(f'Primary {primary_component} failed: {str(e)}')
# Execute fallback strategies
if primary_component in self.fallback_strategies:
for i, fallback_strategy in enumerate(self.fallback_strategies[primary_component]):
try:
self.health_monitor.get_logger().info(f'Attempting fallback strategy {i+1} for {primary_component}')
result = fallback_strategy()
if result is not None:
self.health_monitor.get_logger().info(f'Fallback strategy {i+1} succeeded for {primary_component}')
return result
except Exception as e:
self.health_monitor.get_logger().warn(f'Fallback strategy {i+1} failed for {primary_component}: {str(e)}')
# All strategies failed
self.health_monitor.get_logger().error(f'All strategies failed for {primary_component}')
return None
def use_full_vision_pipeline(self):
"""Try full Isaac ROS vision pipeline"""
# This would interface with Isaac ROS detection
pass
def use_color_based_detection(self):
"""Use simple color-based detection as fallback"""
# Implement simple color-based object detection
pass
def use_cached_object_locations(self):
"""Use pre-recorded object locations"""
# Return cached object locations from previous runs
pass
def request_human_assistance(self):
"""Request human assistance"""
# Publish request for human intervention
pass
def use_online_voice_processing(self):
"""Try online voice processing"""
pass
def use_offline_keyword_spotting(self):
"""Use offline keyword detection"""
# Simple keyword-based command recognition
pass
def use_preprogrammed_commands(self):
"""Use pre-defined command mappings"""
# Map simple keywords to predefined actions
pass
def use_gui_commands(self):
"""Accept commands via GUI"""
# Listen for commands from a GUI interface
pass
def use_full_navigation(self):
"""Try full Nav2 navigation"""
pass
def use_simple_avoidance(self):
"""Use simple obstacle avoidance"""
# Implement basic reactive navigation
pass
def return_to_safe_location(self):
"""Return to a pre-defined safe location"""
# Navigate to home/base position
pass
def stop_and_wait(self):
"""Stop robot and wait for assistance"""
# Emergency stop and await human intervention
pass
Challenge 4: Performance Optimization Across Components
Problem: Bottleneck Identification and Mitigation
In a multi-component system, performance bottlenecks can emerge from unexpected places.
Solution: Performance Profiling and Optimization
# capstone_integration/performance_optimizer.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
import time
import threading
from collections import deque, defaultdict
import psutil
import GPUtil
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class PerformanceMetric:
timestamp: float
value: float
unit: str
component: str
class PerformanceProfiler(Node):
def __init__(self):
super().__init__('performance_profiler')
# Performance tracking
self.metrics = defaultdict(lambda: deque(maxlen=100)) # Store last 100 samples
self.component_times = defaultdict(list)
self.lock = threading.Lock()
# Publishers for performance metrics
self.cpu_usage_pub = self.create_publisher(Float32, '/capstone/performance/cpu_usage', 10)
self.gpu_usage_pub = self.create_publisher(Float32, '/capstone/performance/gpu_usage', 10)
self.memory_usage_pub = self.create_publisher(Float32, '/capstone/performance/memory_usage', 10)
self.component_latency_pub = self.create_publisher(String, '/capstone/performance/component_latency', 10)
# Timer for performance reporting
self.perf_timer = self.create_timer(0.5, self.report_performance_metrics)
# Initialize monitoring
self.last_cpu_time = time.time()
self.last_gpu_query = time.time()
self.get_logger().info('Performance profiler initialized')
def start_component_timer(self, component_name):
"""Start timing for a component"""
start_time = time.time()
with self.lock:
self.component_times[component_name].append({'start': start_time, 'end': None})
return start_time
def end_component_timer(self, component_name, start_time):
"""End timing for a component and record latency"""
end_time = time.time()
latency = end_time - start_time
with self.lock:
# Update the timing record
if self.component_times[component_name] and self.component_times[component_name][-1]['start'] == start_time:
self.component_times[component_name][-1]['end'] = end_time
# Store latency metric
self.metrics[f'{component_name}_latency'].append(PerformanceMetric(
timestamp=end_time,
value=latency,
unit='seconds',
component=component_name
))
# Publish latency if it exceeds threshold (e.g., 100ms for real-time components)
if latency > 0.1:
self.get_logger().warn(f'High latency detected in {component_name}: {latency:.3f}s')
def report_performance_metrics(self):
"""Report current performance metrics"""
current_time = time.time()
# CPU Usage
cpu_percent = psutil.cpu_percent(interval=None)
cpu_msg = Float32()
cpu_msg.data = float(cpu_percent)
self.cpu_usage_pub.publish(cpu_msg)
# Memory Usage
memory_percent = psutil.virtual_memory().percent
memory_msg = Float32()
memory_msg.data = float(memory_percent)
self.memory_usage_pub.publish(memory_msg)
# GPU Usage (if available)
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu_load = gpus[0].load * 100 # Use first GPU
gpu_msg = Float32()
gpu_msg.data = float(gpu_load)
self.gpu_usage_pub.publish(gpu_msg)
except:
# GPU monitoring not available
pass
# Component latencies
with self.lock:
for component, metrics in self.component_times.items():
recent_metrics = [m for m in metrics if m['end'] is not None and current_time - m['end'] < 5.0] # Last 5 seconds
if recent_metrics:
avg_latency = sum(m['end'] - m['start'] for m in recent_metrics) / len(recent_metrics)
latency_report = f"{component}: avg={avg_latency:.3f}s, count={len(recent_metrics)}"
latency_msg = String()
latency_msg.data = latency_report
self.component_latency_pub.publish(latency_msg)
# Log if latency is concerning
if avg_latency > 0.5: # More than 500ms average
self.get_logger().warn(f'High average latency in {component}: {avg_latency:.3f}s')
def get_performance_summary(self):
"""Get a summary of current performance"""
with self.lock:
summary = {}
for component, metrics in self.component_times.items():
recent_metrics = [m for m in metrics if m['end'] is not None]
if recent_metrics:
latencies = [(m['end'] - m['start']) for m in recent_metrics[-10:]] # Last 10
if latencies:
summary[component] = {
'avg_latency': sum(latencies) / len(latencies),
'max_latency': max(latencies),
'min_latency': min(latencies),
'sample_count': len(latencies)
}
return summary
class AdaptivePipelineOptimizer:
"""Dynamically optimize pipeline based on performance metrics"""
def __init__(self, performance_profiler: PerformanceProfiler):
self.profiler = performance_profiler
self.optimization_thresholds = {
'vision_processing': 0.1, # 100ms threshold
'llm_processing': 2.0, # 2s threshold
'navigation_update': 0.1, # 100ms threshold
'manipulation_planning': 0.5 # 500ms threshold
}
self.pipeline_config = self.initialize_pipeline_config()
def initialize_pipeline_config(self):
"""Initialize pipeline configuration with default settings"""
return {
'vision_processing': {
'enabled': True,
'resolution_scale': 1.0,
'processing_rate': 15.0, # Hz
'feature_count': 1000
},
'llm_processing': {
'enabled': True,
'model_size': 'medium', # small, medium, large
'request_frequency': 2.0 # Hz
},
'navigation_update': {
'enabled': True,
'planning_frequency': 10.0, # Hz
'accuracy_level': 'standard' # low, standard, high
}
}
def optimize_pipeline(self):
"""Optimize pipeline configuration based on performance"""
perf_summary = self.profiler.get_performance_summary()
for component, stats in perf_summary.items():
if component in self.optimization_thresholds:
threshold = self.optimization_thresholds[component]
if stats['avg_latency'] > threshold:
self.get_logger().info(f'Optimizing {component} due to high latency: {stats["avg_latency"]:.3f}s > {threshold}s')
# Apply optimizations based on component
if component == 'vision_processing':
self.optimize_vision_processing(stats)
elif component == 'llm_processing':
self.optimize_llm_processing(stats)
elif component == 'navigation_update':
self.optimize_navigation_update(stats)
def optimize_vision_processing(self, stats):
"""Optimize vision processing pipeline"""
current_config = self.pipeline_config['vision_processing']
# Reduce processing rate if latency is high
if stats['avg_latency'] > self.optimization_thresholds['vision_processing'] * 2:
# More than 2x threshold - significant degradation
new_rate = max(5.0, current_config['processing_rate'] * 0.5) # Halve rate, min 5Hz
current_config['processing_rate'] = new_rate
self.get_logger().warn(f'Reduced vision processing rate to {new_rate}Hz due to high latency')
# Reduce resolution if still struggling
elif stats['avg_latency'] > self.optimization_thresholds['vision_processing']:
new_scale = max(0.5, current_config['resolution_scale'] * 0.8) # Reduce by 20%, min 50%
current_config['resolution_scale'] = new_scale
self.get_logger().info(f'Reduced vision resolution scale to {new_scale} due to latency')
# Reduce feature count if still struggling
if stats['avg_latency'] > self.optimization_thresholds['vision_processing'] * 1.5:
new_features = max(200, int(current_config['feature_count'] * 0.7)) # Reduce by 30%, min 200
current_config['feature_count'] = new_features
self.get_logger().info(f'Reduced feature count to {new_features} due to latency')
def optimize_llm_processing(self, stats):
"""Optimize LLM processing pipeline"""
current_config = self.pipeline_config['llm_processing']
# Switch to smaller model if latency is high
if stats['avg_latency'] > self.optimization_thresholds['llm_processing'] * 2:
# More than 2x threshold - switch to smaller model
if current_config['model_size'] == 'large':
current_config['model_size'] = 'medium'
self.get_logger().warn('Switched to medium LLM model due to high latency')
elif current_config['model_size'] == 'medium':
current_config['model_size'] = 'small'
self.get_logger().warn('Switched to small LLM model due to high latency')
# Reduce request frequency
if stats['avg_latency'] > self.optimization_thresholds['llm_processing']:
new_freq = max(0.5, current_config['request_frequency'] * 0.5) # Halve frequency, min 0.5Hz
current_config['request_frequency'] = new_freq
self.get_logger().info(f'Reduced LLM request frequency to {new_freq}Hz due to latency')
def optimize_navigation_update(self, stats):
"""Optimize navigation pipeline"""
current_config = self.pipeline_config['navigation_update']
# Reduce planning frequency if latency is high
if stats['avg_latency'] > self.optimization_thresholds['navigation_update'] * 2:
# More than 2x threshold - reduce frequency significantly
new_freq = max(2.0, current_config['planning_frequency'] * 0.3) # Reduce to 30%, min 2Hz
current_config['planning_frequency'] = new_freq
self.get_logger().warn(f'Reduced navigation planning frequency to {new_freq}Hz due to high latency')
# Reduce accuracy level
elif stats['avg_latency'] > self.optimization_thresholds['navigation_update']:
if current_config['accuracy_level'] == 'high':
current_config['accuracy_level'] = 'standard'
self.get_logger().info('Reduced navigation accuracy to standard level due to latency')
elif current_config['accuracy_level'] == 'standard':
current_config['accuracy_level'] = 'low'
self.get_logger().warn('Reduced navigation accuracy to low level due to latency')
def apply_pipeline_config(self):
"""Apply current pipeline configuration to running nodes"""
# This would send configuration updates to the relevant nodes
# For example, using ROS 2 parameters or services
self.get_logger().info('Applying updated pipeline configuration')
# Example: Update vision processing node parameters
# vision_node.set_parameters([
# Parameter('processing_rate', value=self.pipeline_config['vision_processing']['processing_rate'])
# ])
Integration Testing and Validation
Comprehensive System Validation
# capstone_integration/integration_tester.py
import unittest
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped
import time
class IntegrationTester(Node):
def __init__(self):
super().__init__('integration_tester')
# Test results tracking
self.test_results = {}
self.test_start_times = {}
self.get_logger().info('Integration tester initialized')
def run_comprehensive_tests(self):
"""Run all integration tests"""
self.get_logger().info('Starting comprehensive integration tests')
# Test 1: Basic component communication
self.test_component_communication()
# Test 2: Data flow integrity
self.test_data_flow_integrity()
# Test 3: Timing and synchronization
self.test_timing_synchronization()
# Test 4: Error handling
self.test_error_handling()
# Test 5: End-to-end functionality
self.test_end_to_end_functionality()
self.print_test_summary()
def test_component_communication(self):
"""Test that all components can communicate"""
test_name = 'component_communication'
self.start_test(test_name)
# Check if all expected topics have publishers
expected_topics = [
'/capstone/voice_command',
'/capstone/vision_detections',
'/capstone/navigation_goal',
'/capstone/manipulation_command'
]
success = True
for topic in expected_topics:
publishers = self.get_publishers_info_by_topic(topic)
if not publishers:
self.get_logger().error(f'No publishers found for topic: {topic}')
success = False
self.end_test(test_name, success)
def test_data_flow_integrity(self):
"""Test that data flows correctly between components"""
test_name = 'data_flow_integrity'
self.start_test(test_name)
# Subscribe to intermediate topics and verify data format
# This would involve more complex validation in practice
success = True # Placeholder - implement actual validation
self.end_test(test_name, success)
def test_timing_synchronization(self):
"""Test that timing and synchronization work correctly"""
test_name = 'timing_synchronization'
self.start_test(test_name)
# Verify that timestamps are properly synchronized
# Check that data age is within acceptable limits
success = True # Placeholder - implement actual validation
self.end_test(test_name, success)
def test_error_handling(self):
"""Test that error handling works correctly"""
test_name = 'error_handling'
self.start_test(test_name)
# Simulate component failures and verify fallbacks
# This would require more sophisticated simulation
success = True # Placeholder - implement actual validation
self.end_test(test_name, success)
def test_end_to_end_functionality(self):
"""Test complete end-to-end functionality"""
test_name = 'end_to_end_functionality'
self.start_test(test_name)
# Send a complete command sequence and verify execution
# This would be the most comprehensive test
success = True # Placeholder - implement actual validation
self.end_test(test_name, success)
def start_test(self, test_name):
"""Start tracking a test"""
self.test_start_times[test_name] = time.time()
self.get_logger().info(f'Starting test: {test_name}')
def end_test(self, test_name, success):
"""End tracking a test"""
duration = time.time() - self.test_start_times[test_name]
self.test_results[test_name] = {
'success': success,
'duration': duration,
'timestamp': time.time()
}
status = 'PASSED' if success else 'FAILED'
self.get_logger().info(f'Test {test_name}: {status} in {duration:.2f}s')
def print_test_summary(self):
"""Print summary of all tests"""
self.get_logger().info('\n=== INTEGRATION TEST SUMMARY ===')
total_tests = len(self.test_results)
passed_tests = sum(1 for result in self.test_results.values() if result['success'])
failed_tests = total_tests - passed_tests
self.get_logger().info(f'Total tests: {total_tests}')
self.get_logger().info(f'Passed: {passed_tests}')
self.get_logger().info(f'Failed: {failed_tests}')
self.get_logger().info(f'Success rate: {passed_tests/total_tests*100:.1f}%' if total_tests > 0 else '0%')
for test_name, result in self.test_results.items():
status = ' PASS' if result['success'] else ' FAIL'
self.get_logger().info(f' {status}: {test_name} ({result["duration"]:.2f}s)')
self.get_logger().info('=== END TEST SUMMARY ===')
def main(args=None):
rclpy.init(args=args)
tester = IntegrationTester()
# Run tests
tester.run_comprehensive_tests()
# Clean up
tester.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Hands-On Exercise: Integration Challenge
- Set up the timing manager to handle different update rates
- Implement the NITROS bridge for data format conversion
- Create a basic health monitor for the system
- Add performance profiling to identify bottlenecks
- Test the integrated system with a simple voice command
- Introduce artificial delays to test fallback mechanisms
- Document any issues encountered during integration
Example commands to test integration:
# Launch the integrated system
ros2 launch isaac_ros_capstone full_integration.launch.py
# Send a test voice command
ros2 topic pub /capstone/voice_command std_msgs/String "data: 'pick up the red cube'"
# Monitor system health
ros2 topic echo /capstone/system_status
# Monitor performance metrics
ros2 topic echo /capstone/performance/component_latency
Summary
This lesson addressed the critical challenges of integrating multiple complex components into a unified autonomous humanoid system. You learned about timing synchronization, data format conversion, error handling, and performance optimization. The next lesson will cover testing, evaluation, and refinement of the capstone project.