Lesson 5.3: Testing, Evaluation, and Refinement of the Capstone Project
Overview
This lesson covers systematic testing, evaluation, and refinement of the integrated autonomous humanoid system. Learn to validate system performance, conduct comprehensive testing, and iteratively improve the system based on evaluation results.
Learning Objectives
By the end of this lesson, you should be able to:
- Design comprehensive test suites for multi-modal robotics systems
- Implement automated testing for integrated VSLAM and navigation
- Evaluate system performance against defined metrics
- Identify and debug integration issues
- Refine system components based on testing results
- Conduct user acceptance testing for the capstone project
Testing Strategy for Integrated Systems
Multi-Level Testing Approach
�������������������������������������������������������������������������
Capstone System Testing Strategy
�������������������������������������������������������������������������$
����������������� ����������������� �����������������
Unit Tests Integration System Tests
(Components) Tests (APIs) (End-to-End)
" VSLAM Nodes " ROS Bridge " Full Task
" Nav2 Plugins " Message Flow Execution
" Perception " TF Chain " Performance
Pipelines " Service Calls Benchmarks
����������������� ����������������� �����������������
� � �
����������������� ����������������� �����������������
Regression Performance & Acceptance
Testing Stress Testing Testing
" Verify no " Max throughput " User workflow
regressions " Failure modes " Task success
" Compare " Robustness " Usability
against " Edge cases " Safety
baselines
����������������� ����������������� �����������������
�������������������������������������������������������������������������
Unit Testing Individual Components
Testing Isaac ROS VSLAM Components
# tests/test_vslam_components.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import String
import numpy as np
import cv2
from cv_bridge import CvBridge
class TestVSLAMComponents(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
@classmethod
def tearDownClass(cls):
rclpy.shutdown()
def setUp(self):
self.node = rclpy.create_node('test_vslam_components')
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)
self.bridge = CvBridge()
def tearDown(self):
self.node.destroy_node()
def test_feature_extraction_node(self):
"""Test Isaac ROS feature extraction node"""
# Create mock image data
test_image = self.create_test_image(640, 480)
image_msg = self.bridge.cv2_to_imgmsg(test_image, encoding='bgr8')
# Set up publisher and subscription for testing
pub = self.node.create_publisher(Image, '/test_image', 10)
received_features = []
def feature_callback(msg):
received_features.append(msg)
sub = self.node.create_subscription(
String, # In practice, this would be a custom feature message
'/test_features',
feature_callback,
10
)
# Publish test image and wait for response
pub.publish(image_msg)
# Spin to process messages
start_time = time.time()
while len(received_features) == 0 and time.time() - start_time < 5.0:
self.executor.spin_once(timeout_sec=0.1)
# Assert that features were extracted
self.assertGreater(len(received_features), 0, "No features extracted from test image")
# Additional assertions on feature quality
# (In practice, would check feature count, distribution, etc.)
print(f"Extracted {len(received_features)} features from test image")
def test_pose_estimation_accuracy(self):
"""Test pose estimation accuracy with known transformations"""
# Create test data with known pose
known_translation = [1.0, 2.0, 0.5] # meters
known_rotation = [0.0, 0.0, 0.0, 1.0] # identity quaternion
# Simulate visual features with known correspondences
features_1 = self.generate_test_features(100, known_translation, known_rotation)
features_2 = self.generate_test_features(100, known_translation, known_rotation)
# Add small motion between frames
motion_translation = [0.1, 0.05, 0.0]
motion_rotation = [0.0, 0.0, 0.05, 0.9987] # Small rotation (~3 degrees)
features_2 = self.apply_motion_to_features(features_2, motion_translation, motion_rotation)
# Test pose estimation algorithm
estimated_motion = self.estimate_pose_from_features(features_1, features_2)
# Check accuracy
translation_error = np.linalg.norm(
np.array(estimated_motion[:3]) - np.array(motion_translation)
)
rotation_error = self.calculate_rotation_error(
estimated_motion[3:], motion_rotation
)
# Assertions with reasonable tolerances
self.assertLess(translation_error, 0.05, "Translation error too high") # 5cm tolerance
self.assertLess(rotation_error, 0.1, "Rotation error too high") # 0.1 rad tolerance (~5.7 degrees)
def create_test_image(self, width, height):
"""Create a test image with distinctive features"""
# Create an image with a grid pattern and random shapes
img = np.ones((height, width, 3), dtype=np.uint8) * 255 # White background
# Add a grid pattern
for i in range(0, height, 50):
cv2.line(img, (0, i), (width, i), (0, 0, 0), 1)
for i in range(0, width, 50):
cv2.line(img, (i, 0), (i, height), (0, 0, 0), 1)
# Add random circles and rectangles for features
for _ in range(20):
x = np.random.randint(50, width-50)
y = np.random.randint(50, height-50)
if np.random.rand() > 0.5:
# Draw circle
radius = np.random.randint(10, 20)
color = tuple(np.random.randint(0, 255, 3))
cv2.circle(img, (x, y), radius, color, -1)
else:
# Draw rectangle
w = np.random.randint(15, 30)
h = np.random.randint(15, 30)
color = tuple(np.random.randint(0, 255, 3))
cv2.rectangle(img, (x-w//2, y-h//2), (x+w//2, y+h//2), color, -1)
return img
def generate_test_features(self, count, translation, rotation):
"""Generate synthetic features for testing"""
features = []
for i in range(count):
# Create feature with some randomness but known relationship
x = np.random.uniform(-2.0, 2.0)
y = np.random.uniform(-2.0, 2.0)
z = np.random.uniform(0.5, 3.0)
features.append({
'id': i,
'position': [x, y, z],
'descriptor': np.random.rand(128).tolist(), # SIFT-like descriptor
'confidence': np.random.uniform(0.7, 1.0)
})
return features
def apply_motion_to_features(self, features, translation, rotation):
"""Apply known motion to features"""
transformed_features = []
for feat in features:
# Apply translation
new_pos = [
feat['position'][0] + translation[0],
feat['position'][1] + translation[1],
feat['position'][2] + translation[2]
]
# Apply rotation (simplified - in practice would use full transform)
# For small rotations, we can approximate
new_pos[0] = new_pos[0] * np.cos(rotation[2]) - new_pos[1] * np.sin(rotation[2])
new_pos[1] = new_pos[0] * np.sin(rotation[2]) + new_pos[1] * np.cos(rotation[2])
transformed_features.append({
'id': feat['id'],
'position': new_pos,
'descriptor': feat['descriptor'],
'confidence': feat['confidence']
})
return transformed_features
def estimate_pose_from_features(self, features1, features2):
"""Estimate pose difference between feature sets (mock implementation)"""
# In a real test, this would call the actual pose estimation algorithm
# For this mock, return a plausible result based on the known motion
return [0.1, 0.05, 0.0, 0.0, 0.0, 0.05, 0.9987]
def calculate_rotation_error(self, quat1, quat2):
"""Calculate rotation error between two quaternions"""
# Convert to rotation matrices and compare
# Or use quaternion dot product for angle difference
dot_product = abs(sum(a*b for a, b in zip(quat1, quat2)))
# Clamp to avoid numerical errors
dot_product = max(-1.0, min(1.0, dot_product))
angle_error = 2 * np.arccos(dot_product)
return angle_error
class TestNavigationComponents(unittest.TestCase):
def setUp(self):
self.node = rclpy.create_node('test_navigation_components')
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)
def test_global_planner(self):
"""Test global path planning with known map and start/goal"""
# Create a simple test map (mock)
test_map = self.create_test_map(10, 10) # 10x10 grid map
start = (1, 1)
goal = (8, 8)
# Test path planning
path = self.plan_path_global(test_map, start, goal)
# Verify path is valid
self.assertIsNotNone(path, "Path should not be None")
self.assertGreater(len(path), 1, "Path should have at least 2 points")
# Verify path starts at start and ends at goal
self.assertEqual(path[0], start, "Path should start at start position")
self.assertEqual(path[-1], goal, "Path should end at goal position")
# Verify path is collision-free
for x, y in path:
self.assertEqual(test_map[y, x], 0, f"Path contains obstacle at ({x}, {y})")
def test_local_planner(self):
"""Test local path planning with obstacle avoidance"""
# Test local planner with dynamic obstacles
current_pose = (5, 5)
global_path = [(5, 5), (5, 6), (5, 7), (5, 8), (5, 9)]
obstacles = [(5, 6), (5, 7)] # Obstacles on planned path
# Test local replanning
local_path = self.plan_local_path(current_pose, global_path, obstacles)
# Verify local path avoids obstacles
for x, y in local_path:
self.assertNotIn((x, y), obstacles, f"Local path should not go through obstacle at ({x}, {y})")
def create_test_map(self, width, height):
"""Create a test occupancy grid map"""
# 0 = free space, 100 = obstacle, -1 = unknown
test_map = np.zeros((height, width), dtype=np.int8)
# Add some obstacles
test_map[3, 3:7] = 100 # Horizontal wall
test_map[6:8, 6] = 100 # Vertical wall
return test_map
def plan_path_global(self, occupancy_map, start, goal):
"""Mock global path planning function"""
# This would normally call Nav2's global planner
# For testing, return a simple path
path = [start]
current = start
# Simple path to goal (Manhattan-style)
while current != goal:
dx = goal[0] - current[0]
dy = goal[1] - current[1]
if dx != 0:
next_x = current[0] + (1 if dx > 0 else -1)
current = (next_x, current[1])
elif dy != 0:
next_y = current[1] + (1 if dy > 0 else -1)
current = (current[0], next_y)
# Check if path is valid
if occupancy_map[current[1], current[0]] == 100: # Obstacle
return None # Path blocked
path.append(current)
# Prevent infinite loops
if len(path) > 100:
return None # Too long path
return path
def plan_local_path(self, current_pose, global_path, obstacles):
"""Mock local path planning function"""
# This would normally call Nav2's local planner
# For testing, return a path that avoids obstacles
local_path = [current_pose]
# Check if global path is blocked and replan locally
if any(obstacle in global_path for obstacle in obstacles):
# Simple local replanning around obstacles
# In practice, this would use DWA, MPC, or similar
for step in range(5): # Plan 5 steps ahead
next_pose = (
current_pose[0] + np.random.randint(-1, 2),
current_pose[1] + np.random.randint(-1, 2)
)
# Ensure next pose is valid and not an obstacle
if (0 <= next_pose[0] < 10 and 0 <= next_pose[1] < 10 and
next_pose not in obstacles):
local_path.append(next_pose)
current_pose = next_pose
return local_path
if __name__ == '__main__':
unittest.main()
Integration Testing
ROS 2 Integration Tests
# tests/test_integration.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.qos import QoSProfile
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped, Twist
from sensor_msgs.msg import Image, CameraInfo, Imu
import time
import threading
class TestIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
@classmethod
def tearDownClass(cls):
rclpy.shutdown()
def setUp(self):
self.node = rclpy.create_node('integration_tester')
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)
# Test data storage
self.received_data = {
'voice_commands': [],
'vision_detections': [],
'navigation_goals': [],
'robot_poses': [],
'velocities': []
}
# Set up subscriptions to monitor system behavior
self.voice_sub = self.node.create_subscription(
String, '/capstone/voice_command', self.voice_callback, 10
)
self.vision_sub = self.node.create_subscription(
String, '/capstone/vision_detections', self.vision_callback, 10
)
self.nav_goal_sub = self.node.create_subscription(
PoseStamped, '/capstone/navigation_goal', self.nav_goal_callback, 10
)
self.pose_sub = self.node.create_subscription(
PoseStamped, '/capstone/robot_pose', self.pose_callback, 10
)
self.vel_sub = self.node.create_subscription(
Twist, '/cmd_vel', self.velocity_callback, 10
)
def voice_callback(self, msg):
self.received_data['voice_commands'].append(msg)
def vision_callback(self, msg):
self.received_data['vision_detections'].append(msg)
def nav_goal_callback(self, msg):
self.received_data['navigation_goals'].append(msg)
def pose_callback(self, msg):
self.received_data['robot_poses'].append(msg)
def velocity_callback(self, msg):
self.received_data['velocities'].append(msg)
def test_voice_to_navigation_flow(self):
"""Test complete flow from voice command to navigation execution"""
# Clear previous data
self.received_data['navigation_goals'].clear()
self.received_data['velocities'].clear()
# Send a voice command
voice_pub = self.node.create_publisher(String, '/capstone/voice_command', 10)
cmd_msg = String()
cmd_msg.data = 'navigate to the kitchen'
voice_pub.publish(cmd_msg)
# Wait for system response
time.sleep(3.0) # Give system time to process
# Spin to process messages
self.executor.spin_once(timeout_sec=0.1)
# Verify navigation goal was published
self.assertGreater(
len(self.received_data['navigation_goals']), 0,
"Navigation goal should be published after voice command"
)
# Verify velocity commands were sent
self.assertGreater(
len(self.received_data['velocities']), 0,
"Velocity commands should be sent for navigation"
)
# Check that navigation goal corresponds to "kitchen"
goal = self.received_data['navigation_goals'][-1]
# In a real test, we would verify the goal coordinates match "kitchen" location
print(f"Received navigation goal: {goal.pose.position}")
def test_vision_to_manipulation_flow(self):
"""Test flow from vision detection to manipulation"""
# Clear previous data
self.received_data['vision_detections'].clear()
# In a real test, we would also monitor manipulation commands
# Simulate vision detection (in practice, this would come from Isaac ROS pipeline)
vision_pub = self.node.create_publisher(String, '/capstone/vision_detections', 10)
detection_msg = String()
detection_msg.data = '{"objects": [{"name": "red_cube", "position": [1.5, 1.0, 0.1], "confidence": 0.95}]}'
vision_pub.publish(detection_msg)
# Wait for system response
time.sleep(2.0)
# Spin to process messages
self.executor.spin_once(timeout_sec=0.1)
# Verify vision detection was received
self.assertGreater(
len(self.received_data['vision_detections']), 0,
"Vision detections should be received"
)
# In a real test, we would verify manipulation commands were generated
# based on the detected object
detection_data = self.received_data['vision_detections'][-1].data
self.assertIn('red_cube', detection_data, "Should detect red cube")
def test_multi_modal_integration(self):
"""Test integration of voice, vision, and navigation"""
# Clear data
for key in self.received_data.keys():
self.received_data[key].clear()
# Send coordinated commands
voice_pub = self.node.create_publisher(String, '/capstone/voice_command', 10)
cmd_msg = String()
cmd_msg.data = 'pick up the red cube'
voice_pub.publish(cmd_msg)
# Simulate vision detecting the cube
vision_pub = self.node.create_publisher(String, '/capstone/vision_detections', 10)
detection_msg = String()
detection_msg.data = '{"objects": [{"name": "red_cube", "position": [1.5, 1.0, 0.1], "confidence": 0.95}]}'
vision_pub.publish(detection_msg)
# Wait for system processing
time.sleep(5.0)
# Process messages
for _ in range(50): # Process multiple cycles
self.executor.spin_once(timeout_sec=0.1)
time.sleep(0.02)
# Verify all components responded appropriately
self.assertGreater(
len(self.received_data['navigation_goals']), 0,
"Navigation should be triggered for manipulation task"
)
self.assertGreater(
len(self.received_data['velocities']), 0,
"Robot should move during manipulation task"
)
print(f"Processed {len(self.received_data['navigation_goals'])} navigation goals")
print(f"Sent {len(self.received_data['velocities'])} velocity commands")
def test_timing_consistency(self):
"""Test timing consistency between components"""
# Send multiple commands in quick succession
voice_pub = self.node.create_publisher(String, '/capstone/voice_command', 10)
commands = [
'go to location A',
'go to location B',
'pick up object',
'place object'
]
start_time = time.time()
for cmd in commands:
cmd_msg = String()
cmd_msg.data = cmd
voice_pub.publish(cmd_msg)
time.sleep(0.1) # Small delay between commands
# Wait for processing
time.sleep(4.0)
# Process messages
for _ in range(100):
self.executor.spin_once(timeout_sec=0.05)
total_time = time.time() - start_time
print(f"Processed {len(commands)} commands in {total_time:.2f}s")
print(f"Average processing time: {total_time/len(commands):.2f}s per command")
# Verify system handled rapid command sequence
self.assertGreater(
len(self.received_data['navigation_goals']) + len(self.received_data['velocities']),
len(commands) * 0.5, # At least half should produce responses
"System should handle rapid command sequence"
)
def run_integration_tests():
"""Run all integration tests"""
import subprocess
import sys
# First, ensure all required nodes are running
# In practice, you'd launch the system under test first
print("Running integration tests...")
# Run the tests
suite = unittest.TestLoader().loadTestsFromTestCase(TestIntegration)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
return result.wasSuccessful()
if __name__ == '__main__':
success = run_integration_tests()
sys.exit(0 if success else 1)
Performance Benchmarking
Performance Evaluation Suite
# tests/performance_benchmarks.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
from sensor_msgs.msg import Image
import time
import statistics
import threading
from collections import deque
import psutil
import GPUtil
class PerformanceBenchmarkNode(Node):
def __init__(self):
super().__init__('performance_benchmark_node')
# Performance metrics storage
self.latency_measurements = deque(maxlen=1000)
self.throughput_measurements = deque(maxlen=1000)
self.resource_usage = deque(maxlen=1000)
# Publishers for metrics
self.latency_pub = self.create_publisher(Float32, '/benchmark/latency', 10)
self.throughput_pub = self.create_publisher(Float32, '/benchmark/throughput', 10)
self.cpu_usage_pub = self.create_publisher(Float32, '/benchmark/cpu_usage', 10)
self.gpu_usage_pub = self.create_publisher(Float32, '/benchmark/gpu_usage', 10)
# Timers for continuous monitoring
self.monitor_timer = self.create_timer(0.1, self.monitor_resources)
self.report_timer = self.create_timer(1.0, self.report_performance)
# Performance test triggers
self.test_trigger_sub = self.create_subscription(
String,
'/benchmark/trigger_test',
self.trigger_test_callback,
10
)
self.get_logger().info('Performance benchmark node initialized')
def trigger_test_callback(self, msg):
"""Trigger specific performance tests"""
test_type = msg.data
if test_type == 'vslam_throughput':
self.run_vslam_throughput_test()
elif test_type == 'navigation_latency':
self.run_navigation_latency_test()
elif test_type == 'memory_stress':
self.run_memory_stress_test()
elif test_type == 'full_system':
self.run_full_system_benchmark()
else:
self.get_logger().warn(f'Unknown test type: {test_type}')
def run_vslam_throughput_test(self):
"""Test maximum throughput of VSLAM pipeline"""
self.get_logger().info('Starting VSLAM throughput test...')
# Record start time
start_time = time.time()
processed_frames = 0
# Simulate high-rate image input
image_pub = self.create_publisher(Image, '/camera/color/image_rect_color', 10)
# Generate test images at high frequency
test_duration = 10.0 # seconds
target_rate = 30.0 # Hz
def publish_images():
nonlocal processed_frames
rate = self.create_rate(target_rate)
image_counter = 0
while time.time() - start_time < test_duration:
# Create and publish test image
test_image = self.create_test_image(640, 480)
image_msg = self.cv_bridge.cv2_to_imgmsg(test_image, encoding='bgr8')
image_msg.header.stamp = self.get_clock().now().to_msg()
image_msg.header.frame_id = 'camera_color_optical_frame'
image_pub.publish(image_msg)
processed_frames += 1
try:
rate.sleep()
except:
break
# Start image publishing in separate thread
publish_thread = threading.Thread(target=publish_images)
publish_thread.start()
# Monitor processing rate
initial_processed = len(self.latency_measurements)
time.sleep(test_duration)
final_processed = len(self.latency_measurements)
publish_thread.join()
actual_throughput = (final_processed - initial_processed) / test_duration
self.get_logger().info(f'VSLAM throughput: {actual_throughput:.2f} Hz')
# Publish results
throughput_msg = Float32()
throughput_msg.data = float(actual_throughput)
self.throughput_pub.publish(throughput_msg)
def run_navigation_latency_test(self):
"""Test navigation system latency"""
self.get_logger().info('Starting navigation latency test...')
# This would involve sending navigation goals and measuring response time
# For this example, we'll simulate the test
latencies = []
for i in range(50):
start_time = time.time()
# Simulate navigation goal processing
# In practice, this would involve sending actual navigation goals
# and measuring time to first velocity command
time.sleep(0.05 + 0.02 * (i % 5)) # Simulate varying processing time
end_time = time.time()
latency = end_time - start_time
latencies.append(latency)
avg_latency = statistics.mean(latencies)
std_latency = statistics.stdev(latencies) if len(latencies) > 1 else 0.0
max_latency = max(latencies)
self.get_logger().info(f'Navigation latency: avg={avg_latency:.3f}s, std={std_latency:.3f}s, max={max_latency:.3f}s')
# Publish average latency
latency_msg = Float32()
latency_msg.data = float(avg_latency)
self.latency_pub.publish(latency_msg)
def run_memory_stress_test(self):
"""Test system behavior under memory pressure"""
self.get_logger().info('Starting memory stress test...')
# Allocate large amounts of memory to simulate stress
memory_blocks = []
initial_memory = psutil.virtual_memory().percent
try:
# Try to allocate memory until we reach 80% usage
while psutil.virtual_memory().percent < 80:
# Allocate 10MB blocks
block = bytearray(10 * 1024 * 1024)
memory_blocks.append(block)
# Fill with data to prevent optimization
for i in range(0, len(block), 1024):
block[i:i+10] = b'test_data'
time.sleep(0.01) # Small delay to allow system to respond
peak_memory = psutil.virtual_memory().percent
self.get_logger().info(f'Reached {peak_memory:.1f}% memory usage')
# Hold for a bit to see system response
time.sleep(5.0)
# Deallocate memory gradually
while memory_blocks:
memory_blocks.pop()
time.sleep(0.001)
final_memory = psutil.virtual_memory().percent
self.get_logger().info(f'Memory returned to {final_memory:.1f}%')
except MemoryError:
self.get_logger().warn('Memory stress test stopped due to allocation failure')
def run_full_system_benchmark(self):
"""Run comprehensive system benchmark"""
self.get_logger().info('Starting full system benchmark...')
# Combine multiple test scenarios
test_threads = []
# Start throughput test
throughput_thread = threading.Thread(target=self.run_vslam_throughput_test)
test_threads.append(throughput_thread)
# Start navigation test
nav_thread = threading.Thread(target=self.run_navigation_latency_test)
test_threads.append(nav_thread)
# Start monitoring resources
monitor_start = time.time()
# Start all threads
for thread in test_threads:
thread.start()
# Run for specified duration
benchmark_duration = 30.0
while time.time() - monitor_start < benchmark_duration:
time.sleep(0.1)
# Wait for threads to complete
for thread in test_threads:
thread.join(timeout=5.0) # Wait up to 5 seconds for each thread
self.get_logger().info('Full system benchmark completed')
def monitor_resources(self):
"""Monitor system resources continuously"""
current_time = time.time()
# CPU usage
cpu_percent = psutil.cpu_percent(interval=None)
cpu_msg = Float32()
cpu_msg.data = float(cpu_percent)
self.cpu_usage_pub.publish(cpu_msg)
# Memory usage
memory_percent = psutil.virtual_memory().percent
# GPU usage (if available)
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu_load = gpus[0].load * 100
gpu_msg = Float32()
gpu_msg.data = float(gpu_load)
self.gpu_usage_pub.publish(gpu_msg)
except:
# GPU monitoring not available
pass
# Store resource usage data
self.resource_usage.append({
'timestamp': current_time,
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'gpu_load': gpu_load if 'gpu_load' in locals() else 0.0
})
def report_performance(self):
"""Report current performance metrics"""
if self.resource_usage:
recent_stats = list(self.resource_usage)[-10:] # Last 10 measurements
avg_cpu = statistics.mean([stat['cpu_percent'] for stat in recent_stats])
avg_memory = statistics.mean([stat['memory_percent'] for stat in recent_stats])
avg_gpu = statistics.mean([stat['gpu_load'] for stat in recent_stats])
self.get_logger().info(
f'Resources - CPU: {avg_cpu:.1f}%, '
f'Mem: {avg_memory:.1f}%, '
f'GPU: {avg_gpu:.1f}%'
)
if self.latency_measurements:
recent_latencies = list(self.latency_measurements)[-20:]
avg_latency = statistics.mean(recent_latencies)
self.get_logger().info(f'Avg latency: {avg_latency:.3f}s')
if self.throughput_measurements:
recent_throughput = list(self.throughput_measurements)[-20:]
avg_throughput = statistics.mean(recent_throughput)
self.get_logger().info(f'Avg throughput: {avg_throughput:.2f} Hz')
def create_test_image(self, width, height):
"""Create a test image for performance testing"""
import numpy as np
import cv2
# Create an image with rich features for processing
img = np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)
# Add some structured patterns that are computationally intensive to process
for i in range(0, height, 20):
cv2.line(img, (0, i), (width, i), (128, 128, 128), 1)
for i in range(0, width, 20):
cv2.line(img, (i, 0), (i, height), (128, 128, 128), 1)
# Add random shapes for feature detection
for _ in range(50):
center = (np.random.randint(0, width), np.random.randint(0, height))
radius = np.random.randint(5, 15)
color = (int(np.random.randint(0, 255)), int(np.random.randint(0, 255)), int(np.random.randint(0, 255)))
thickness = -1 if np.random.rand() > 0.5 else 2
cv2.circle(img, center, radius, color, thickness)
return img
def main(args=None):
rclpy.init(args=args)
node = PerformanceBenchmarkNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
System Evaluation Metrics
Defining Success Criteria
# evaluation/metrics.py
from dataclasses import dataclass
from typing import Dict, List, Tuple
import statistics
import time
@dataclass
class PerformanceMetric:
name: str
value: float
unit: str
timestamp: float
component: str
@dataclass
class SuccessCriteria:
"""Define success criteria for the capstone project"""
# Navigation metrics
navigation_success_rate: float = 0.85 # 85% success rate
navigation_accuracy: float = 0.3 # 30cm final position accuracy
navigation_time_efficiency: float = 0.9 # 90% of optimal time
# VSLAM metrics
vslam_tracking_success_rate: float = 0.90 # 90% tracking success
vslam_drift_rate: float = 0.05 # 5cm drift per minute
vslam_initialization_time: float = 30.0 # 30 seconds max initialization
# Manipulation metrics
manipulation_success_rate: float = 0.75 # 75% success rate
manipulation_precision: float = 0.02 # 2cm precision
# System-wide metrics
system_response_time: float = 2.0 # 2 seconds max response
system_reliability: float = 0.95 # 95% uptime during operation
resource_utilization: Dict[str, float] = None # CPU, GPU, Memory limits
class SystemEvaluator:
def __init__(self, criteria: SuccessCriteria = None):
self.criteria = criteria or SuccessCriteria()
self.metrics_collection = []
self.test_results = {}
# Initialize resource limits
if self.criteria.resource_utilization is None:
self.criteria.resource_utilization = {
'cpu_percent': 80.0,
'gpu_percent': 85.0,
'memory_percent': 75.0
}
def evaluate_navigation_performance(self, test_data: List[Dict]) -> Dict:
"""Evaluate navigation performance against criteria"""
results = {
'success_rate': 0.0,
'average_accuracy': 0.0,
'average_efficiency': 0.0,
'failures': [],
'metrics': []
}
if not test_data:
return results
successful_navigations = 0
total_accuracy_errors = []
total_time_efficiencies = []
for test_case in test_data:
if test_case.get('completed', False):
successful_navigations += 1
# Calculate accuracy error
if 'final_position' in test_case and 'target_position' in test_case:
pos_error = self.calculate_position_error(
test_case['final_position'],
test_case['target_position']
)
total_accuracy_errors.append(pos_error)
# Calculate time efficiency
if 'actual_time' in test_case and 'optimal_time' in test_case:
efficiency = test_case['optimal_time'] / test_case['actual_time']
total_time_efficiencies.append(efficiency)
else:
results['failures'].append(test_case.get('test_id', 'unknown'))
# Calculate averages
results['success_rate'] = successful_navigations / len(test_data)
if total_accuracy_errors:
results['average_accuracy'] = statistics.mean(total_accuracy_errors)
if total_time_efficiencies:
results['average_efficiency'] = statistics.mean(total_time_efficiencies)
# Evaluate against criteria
results['meets_criteria'] = (
results['success_rate'] >= self.criteria.navigation_success_rate and
(not total_accuracy_errors or results['average_accuracy'] <= self.criteria.navigation_accuracy) and
(not total_time_efficiencies or results['average_efficiency'] >= self.criteria.navigation_time_efficiency)
)
return results
def evaluate_vslam_performance(self, test_data: List[Dict]) -> Dict:
"""Evaluate VSLAM performance against criteria"""
results = {
'tracking_success_rate': 0.0,
'average_drift': 0.0,
'initialization_time': 0.0,
'failures': [],
'metrics': []
}
if not test_data:
return results
total_tracking_success = 0
total_drift = []
initialization_times = []
for test_case in test_data:
if 'tracking_results' in test_case:
tracking_data = test_case['tracking_results']
successful_trackings = sum(1 for track in tracking_data if track.get('success', False))
total_tracking_success += successful_trackings / len(tracking_data) if tracking_data else 0
if 'drift_measurements' in test_case:
total_drift.extend(test_case['drift_measurements'])
if 'initialization_time' in test_case:
initialization_times.append(test_case['initialization_time'])
# Calculate averages
if len(test_data) > 0:
results['tracking_success_rate'] = total_tracking_success / len(test_data)
if total_drift:
results['average_drift'] = statistics.mean(total_drift)
if initialization_times:
results['initialization_time'] = statistics.mean(initialization_times)
# Evaluate against criteria
results['meets_criteria'] = (
results['tracking_success_rate'] >= self.criteria.vslam_tracking_success_rate and
(not total_drift or results['average_drift'] <= self.criteria.vslam_drift_rate) and
(not initialization_times or results['initialization_time'] <= self.criteria.vslam_initialization_time)
)
return results
def evaluate_manipulation_performance(self, test_data: List[Dict]) -> Dict:
"""Evaluate manipulation performance against criteria"""
results = {
'success_rate': 0.0,
'average_precision': 0.0,
'failures': [],
'metrics': []
}
if not test_data:
return results
successful_manipulations = 0
total_precision_errors = []
for test_case in test_data:
if test_case.get('completed', False):
successful_manipulations += 1
if 'precision_error' in test_case:
total_precision_errors.append(test_case['precision_error'])
results['success_rate'] = successful_manipulations / len(test_data) if test_data else 0.0
if total_precision_errors:
results['average_precision'] = statistics.mean(total_precision_errors)
# Evaluate against criteria
results['meets_criteria'] = (
results['success_rate'] >= self.criteria.manipulation_success_rate and
(not total_precision_errors or results['average_precision'] <= self.criteria.manipulation_precision)
)
return results
def evaluate_system_responsiveness(self, test_data: List[Dict]) -> Dict:
"""Evaluate system response time performance"""
results = {
'average_response_time': 0.0,
'max_response_time': 0.0,
'response_times': [],
'metrics': []
}
if not test_data:
return results
response_times = []
for test_case in test_data:
if 'response_time' in test_case:
response_times.append(test_case['response_time'])
metric = PerformanceMetric(
name='response_time',
value=test_case['response_time'],
unit='seconds',
timestamp=test_case.get('timestamp', time.time()),
component='system'
)
results['metrics'].append(metric)
if response_times:
results['average_response_time'] = statistics.mean(response_times)
results['max_response_time'] = max(response_times)
results['response_times'] = response_times
# Evaluate against criteria
results['meets_criteria'] = (
not response_times or results['average_response_time'] <= self.criteria.system_response_time
)
return results
def calculate_position_error(self, actual_pos: Tuple[float, float, float],
target_pos: Tuple[float, float, float]) -> float:
"""Calculate Euclidean distance between actual and target positions"""
dx = actual_pos[0] - target_pos[0]
dy = actual_pos[1] - target_pos[1]
dz = actual_pos[2] - target_pos[2]
return (dx*dx + dy*dy + dz*dz) ** 0.5
def generate_evaluation_report(self, evaluation_results: Dict) -> str:
"""Generate a comprehensive evaluation report"""
report = []
report.append("# Capstone Project Evaluation Report\n")
report.append(f"**Evaluation Date**: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
report.append("## Executive Summary\n")
# Overall success
all_criteria_met = all(result.get('meets_criteria', False) for result in evaluation_results.values())
report.append(f"- **Overall Success**: {' PASS' if all_criteria_met else 'L FAIL'}\n")
# Component summaries
for component, result in evaluation_results.items():
meets_criteria = result.get('meets_criteria', False)
report.append(f"- **{component.replace('_', ' ').title()}**: {' PASS' if meets_criteria else 'L FAIL'}")
if 'success_rate' in result:
report.append(f" - Success Rate: {result['success_rate']:.2%} (Target: {getattr(self.criteria, f'{component.split(\"_\")[0]}_success_rate', 0.0):.2%})")
if 'average_accuracy' in result:
report.append(f" - Average Accuracy: {result['average_accuracy']:.3f}m (Target: d{getattr(self.criteria, f'{component.split(\"_\")[0]}_accuracy', 0.0):.3f}m)")
if 'average_response_time' in result:
report.append(f" - Avg Response Time: {result['average_response_time']:.3f}s (Target: d{self.criteria.system_response_time:.3f}s)")
report.append("\n## Detailed Results\n")
for component, result in evaluation_results.items():
report.append(f"### {component.replace('_', ' ').title()}\n")
report.append(f"- Met Criteria: {'Yes' if result.get('meets_criteria', False) else 'No'}\n")
# Add component-specific metrics
if 'success_rate' in result:
report.append(f"- Success Rate: {result['success_rate']:.2%}\n")
if 'average_accuracy' in result:
report.append(f"- Average Accuracy: {result['average_accuracy']:.3f}m\n")
if 'average_response_time' in result:
report.append(f"- Average Response Time: {result['average_response_time']:.3f}s\n")
if 'failures' in result and result['failures']:
report.append(f"- Failures: {len(result['failures'])}\n")
report.append(f"- Failure Details: {result['failures'][:3]}...\n") # Show first 3 failures
report.append("\n")
report.append("## Recommendations\n")
if not all_criteria_met:
report.append("- Identify and address components that did not meet success criteria\n")
report.append("- Consider performance optimization for underperforming components\n")
report.append("- Implement additional error handling and fallback mechanisms\n")
else:
report.append("- All success criteria met - system is ready for deployment\n")
report.append("- Continue monitoring performance in operational environment\n")
return "\n".join(report)
def save_evaluation_report(self, evaluation_results: Dict, filename: str = "evaluation_report.md"):
"""Save evaluation report to file"""
report_content = self.generate_evaluation_report(evaluation_results)
with open(filename, 'w') as f:
f.write(report_content)
print(f"Evaluation report saved to {filename}")
Debugging and Refinement Techniques
System Debugging Framework
# debugging/debug_framework.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
from visualization_msgs.msg import Marker, MarkerArray
import time
import traceback
import threading
from collections import defaultdict, deque
class SystemDebugger(Node):
def __init__(self):
super().__init__('system_debugger')
# Debug data storage
self.debug_logs = defaultdict(deque)
self.error_counts = defaultdict(int)
self.warning_counts = defaultdict(int)
self.component_status = defaultdict(str)
# Publishers for debugging information
self.diagnostic_pub = self.create_publisher(DiagnosticArray, '/diagnostics', 10)
self.debug_marker_pub = self.create_publisher(MarkerArray, '/debug/markers', 10)
self.log_pub = self.create_publisher(String, '/debug/log', 10)
# Subscriptions for monitoring system state
self.system_state_sub = self.create_subscription(
String,
'/capstone/system_state',
self.system_state_callback,
10
)
# Timer for diagnostic publishing
self.diag_timer = self.create_timer(1.0, self.publish_diagnostics)
self.get_logger().info('System debugger initialized')
def system_state_callback(self, msg):
"""Monitor system state changes"""
try:
# Parse system state message
state_data = self.parse_system_state(msg.data)
# Update component status
for component, status in state_data.get('components', {}).items():
self.component_status[component] = status
# Log status changes
if status == 'error':
self.error_counts[component] += 1
self.log_debug(f"Component {component} in error state", severity='error')
elif status == 'warning':
self.warning_counts[component] += 1
self.log_debug(f"Component {component} in warning state", severity='warning')
except Exception as e:
self.log_debug(f"Error parsing system state: {str(e)}", severity='error')
self.error_counts['system_parser'] += 1
def parse_system_state(self, state_string):
"""Parse system state string into structured data"""
# This would parse the actual system state message format
# For now, returning a mock structure
return {
'timestamp': time.time(),
'components': {
'vslam': 'running',
'navigation': 'running',
'manipulation': 'idle',
'vision': 'running',
'voice': 'idle'
},
'resources': {
'cpu': 65.0,
'memory': 45.0,
'gpu': 70.0
}
}
def log_debug(self, message: str, severity: str = 'info', component: str = 'system'):
"""Log debug information with severity and component"""
log_entry = {
'timestamp': time.time(),
'message': message,
'severity': severity,
'component': component
}
self.debug_logs[component].append(log_entry)
# Keep only recent logs (last 100 entries per component)
if len(self.debug_logs[component]) > 100:
self.debug_logs[component].popleft()
# Publish to ROS topic
log_msg = String()
log_msg.data = f"[{severity.upper()}][{component}] {message}"
self.log_pub.publish(log_msg)
# Log to console with appropriate level
if severity == 'error':
self.get_logger().error(message)
elif severity == 'warning':
self.get_logger().warning(message)
else:
self.get_logger().info(message)
def publish_diagnostics(self):
"""Publish system diagnostics"""
diag_array = DiagnosticArray()
diag_array.header.stamp = self.get_clock().now().to_msg()
# Create diagnostic status for each component
for component, status in self.component_status.items():
diag_status = DiagnosticStatus()
diag_status.name = f"capstone_{component}"
diag_status.hardware_id = component
if status == 'running':
diag_status.level = DiagnosticStatus.OK
diag_status.message = "Component operating normally"
elif status == 'warning':
diag_status.level = DiagnosticStatus.WARN
diag_status.message = f"Component has warnings ({self.warning_counts[component]} warnings)"
elif status == 'error':
diag_status.level = DiagnosticStatus.ERROR
diag_status.message = f"Component has errors ({self.error_counts[component]} errors)"
else:
diag_status.level = DiagnosticStatus.STALE
diag_status.message = f"Component status unknown: {status}"
# Add key-value pairs for additional information
diag_status.values.extend([
{'key': 'Error Count', 'value': str(self.error_counts[component])},
{'key': 'Warning Count', 'value': str(self.warning_counts[component])},
{'key': 'Last Status Update', 'value': str(time.time())}
])
diag_array.status.append(diag_status)
self.diagnostic_pub.publish(diag_array)
def create_debug_marker(self, marker_id: int, position, marker_type: int,
scale, color, text: str = ""):
"""Create a debug visualization marker"""
marker = Marker()
marker.header.frame_id = "map"
marker.header.stamp = self.get_clock().now().to_msg()
marker.ns = "debug"
marker.id = marker_id
marker.type = marker_type
marker.action = Marker.ADD
# Set position
marker.pose.position.x = position[0]
marker.pose.position.y = position[1]
marker.pose.position.z = position[2] if len(position) > 2 else 0.0
marker.pose.orientation.w = 1.0
# Set scale
marker.scale.x = scale[0] if len(scale) > 0 else 1.0
marker.scale.y = scale[1] if len(scale) > 1 else 1.0
marker.scale.z = scale[2] if len(scale) > 2 else 1.0
# Set color
marker.color.r = color[0] if len(color) > 0 else 1.0
marker.color.g = color[1] if len(color) > 1 else 0.0
marker.color.b = color[2] if len(color) > 2 else 0.0
marker.color.a = color[3] if len(color) > 3 else 1.0
# Set text if provided
if text:
marker.text = text
return marker
def visualize_path(self, path_points, path_name: str = "debug_path"):
"""Visualize a path using markers"""
marker_array = MarkerArray()
for i, point in enumerate(path_points):
marker = self.create_debug_marker(
i,
point,
Marker.SPHERE,
[0.1, 0.1, 0.1],
[0.0, 1.0, 0.0, 1.0], # Green
f"{path_name}_{i}"
)
marker_array.markers.append(marker)
# Add line strip to connect points
line_marker = self.create_debug_marker(
len(path_points),
[0, 0, 0],
Marker.LINE_STRIP,
[0.05, 0.05, 0.05],
[0.0, 1.0, 0.0, 0.8], # Green with transparency
)
line_marker.points = []
for point in path_points:
from geometry_msgs.msg import Point
p = Point()
p.x = point[0]
p.y = point[1]
p.z = point[2] if len(point) > 2 else 0.0
line_marker.points.append(p)
marker_array.markers.append(line_marker)
self.debug_marker_pub.publish(marker_array)
def debug_vslam_trace(self, trajectory_points):
"""Visualize VSLAM trajectory for debugging"""
marker_array = MarkerArray()
# Create sphere markers for each pose in trajectory
for i, pose in enumerate(trajectory_points):
marker = self.create_debug_marker(
i,
[pose[0], pose[1], pose[2]], # Position
Marker.ARROW, # Use arrow to show orientation
[0.2, 0.05, 0.05], # Scale: length, head width, shaft width
[1.0, 0.0, 0.0, 1.0], # Red for trajectory
f"pose_{i}"
)
# Set orientation
if len(pose) >= 6: # If orientation is provided as [x, y, z, rx, ry, rz]
# Convert Euler angles to quaternion (simplified)
from mathutils import Quaternion, Vector
orientation = self.euler_to_quaternion(pose[3], pose[4], pose[5])
marker.pose.orientation.x = orientation[0]
marker.pose.orientation.y = orientation[1]
marker.pose.orientation.z = orientation[2]
marker.pose.orientation.w = orientation[3]
marker_array.markers.append(marker)
self.debug_marker_pub.publish(marker_array)
def euler_to_quaternion(self, roll, pitch, yaw):
"""Convert Euler angles to quaternion"""
# Simplified conversion (in practice, use tf2 for robust conversion)
cr = np.cos(roll * 0.5)
sr = np.sin(roll * 0.5)
cp = np.cos(pitch * 0.5)
sp = np.sin(pitch * 0.5)
cy = np.cos(yaw * 0.5)
sy = np.sin(yaw * 0.5)
w = cr * cp * cy + sr * sp * sy
x = sr * cp * cy - cr * sp * sy
y = cr * sp * cy + sr * cp * sy
z = cr * cp * sy - sr * sp * cy
return [x, y, z, w]
def get_component_diagnostics(self, component_name: str):
"""Get detailed diagnostics for a specific component"""
diagnostics = {
'component': component_name,
'status': self.component_status.get(component_name, 'unknown'),
'error_count': self.error_counts[component_name],
'warning_count': self.warning_counts[component_name],
'recent_logs': list(self.debug_logs[component_name])[-10:], # Last 10 logs
'timestamp': time.time()
}
return diagnostics
def run_diagnostic_scan(self):
"""Run comprehensive diagnostic scan of all components"""
self.log_debug("Starting comprehensive diagnostic scan", severity='info')
# Check all component statuses
for component, status in self.component_status.items():
diagnostics = self.get_component_diagnostics(component)
if diagnostics['error_count'] > 0:
self.log_debug(
f"Component {component} has {diagnostics['error_count']} errors",
severity='error'
)
elif diagnostics['warning_count'] > 0:
self.log_debug(
f"Component {component} has {diagnostics['warning_count']} warnings",
severity='warning'
)
self.log_debug("Diagnostic scan completed", severity='info')
class RefinementEngine:
"""Engine for refining and optimizing system performance based on evaluation results"""
def __init__(self, evaluator: SystemEvaluator):
self.evaluator = evaluator
self.refinement_strategies = self.define_refinement_strategies()
def define_refinement_strategies(self):
"""Define strategies for refining different system components"""
return {
'navigation': {
'low_success_rate': self.refine_navigation_success,
'high_error': self.refine_navigation_accuracy,
'slow_execution': self.refine_navigation_efficiency
},
'vslam': {
'low_tracking_success': self.refine_vslam_tracking,
'high_drift': self.refine_vslam_stability,
'long_initialization': self.refine_vslam_initialization
},
'manipulation': {
'low_success_rate': self.refine_manipulation_success,
'low_precision': self.refine_manipulation_precision
},
'system': {
'high_response_time': self.refine_system_responsiveness,
'resource_overuse': self.refine_resource_utilization
}
}
def analyze_evaluation_results(self, results: Dict):
"""Analyze evaluation results to identify refinement opportunities"""
refinement_opportunities = []
for component, result in results.items():
if not result.get('meets_criteria', True):
# Identify specific issues
if component == 'navigation_performance':
if result.get('success_rate', 0) < self.evaluator.criteria.navigation_success_rate:
refinement_opportunities.append({
'component': 'navigation',
'issue': 'low_success_rate',
'current': result['success_rate'],
'target': self.evaluator.criteria.navigation_success_rate
})
if result.get('average_accuracy', float('inf')) > self.evaluator.criteria.navigation_accuracy:
refinement_opportunities.append({
'component': 'navigation',
'issue': 'high_error',
'current': result['average_accuracy'],
'target': self.evaluator.criteria.navigation_accuracy
})
elif component == 'vslam_performance':
if result.get('tracking_success_rate', 0) < self.evaluator.criteria.vslam_tracking_success_rate:
refinement_opportunities.append({
'component': 'vslam',
'issue': 'low_tracking_success',
'current': result['tracking_success_rate'],
'target': self.evaluator.criteria.vslam_tracking_success_rate
})
if result.get('average_drift', 0) > self.evaluator.criteria.vslam_drift_rate:
refinement_opportunities.append({
'component': 'vslam',
'issue': 'high_drift',
'current': result['average_drift'],
'target': self.evaluator.criteria.vslam_drift_rate
})
return refinement_opportunities
def apply_refinements(self, refinement_opportunities: List[Dict]):
"""Apply refinements based on evaluation results"""
applied_refinements = []
for opportunity in refinement_opportunities:
component = opportunity['component']
issue = opportunity['issue']
if component in self.refinement_strategies:
if issue in self.refinement_strategies[component]:
strategy = self.refinement_strategies[component][issue]
try:
result = strategy(opportunity)
applied_refinements.append({
'opportunity': opportunity,
'result': result,
'status': 'applied'
})
print(f"Applied refinement for {component}.{issue}: {result}")
except Exception as e:
print(f"Failed to apply refinement for {component}.{issue}: {str(e)}")
applied_refinements.append({
'opportunity': opportunity,
'error': str(e),
'status': 'failed'
})
return applied_refinements
def refine_navigation_success(self, opportunity):
"""Refine navigation for better success rate"""
# Possible refinements:
# - Increase obstacle clearance threshold
# - Adjust local planner parameters
# - Improve sensor fusion
# - Add more robust recovery behaviors
current_rate = opportunity['current']
target_rate = opportunity['target']
# Example: Increase obstacle inflation radius
if current_rate < 0.7:
# Significant underperformance - major parameter adjustment
new_inflation = 0.8 # meters
return f"Increased obstacle inflation radius to {new_inflation}m for better success rate"
else:
# Moderate underperformance - minor adjustment
new_inflation = 0.6 # meters
return f"Increased obstacle inflation radius to {new_inflation}m for improved success rate"
def refine_navigation_accuracy(self, opportunity):
"""Refine navigation for better accuracy"""
# Possible refinements:
# - Tune controller parameters (P, I, D gains)
# - Improve odometry sources
# - Adjust goal tolerance parameters
current_error = opportunity['current']
target_error = opportunity['target']
# Example: Adjust goal tolerances
new_xy_tolerance = min(0.5, target_error * 1.5) # Be more forgiving but improve gradually
return f"Adjusted XY goal tolerance to {new_xy_tolerance}m for better accuracy"
def refine_vslam_tracking(self, opportunity):
"""Refine VSLAM for better tracking success"""
# Possible refinements:
# - Adjust feature detection parameters
# - Improve lighting conditions
# - Use more robust feature descriptors
# - Increase processing power allocation
current_rate = opportunity['current']
target_rate = opportunity['target']
# Example: Adjust feature detection thresholds
if current_rate < 0.8:
# Significantly below target - aggressive tuning
return "Increased feature detection sensitivity and reduced minimum disparity threshold"
else:
# Closer to target - subtle adjustment
return "Fine-tuned feature detection parameters for improved tracking"
def refine_system_responsiveness(self, opportunity):
"""Refine system for better response time"""
# Possible refinements:
# - Optimize data processing pipelines
# - Reduce unnecessary computations
# - Improve message passing efficiency
# - Prioritize critical tasks
current_time = opportunity['current']
target_time = opportunity['target']
# Example: Suggest pipeline optimization
improvement_needed = current_time - target_time
return f"Suggest pipeline optimization to reduce response time by {improvement_needed:.3f}s"
def main(args=None):
rclpy.init(args=args)
# Initialize debugger
debugger = SystemDebugger()
# Initialize evaluator
criteria = SuccessCriteria()
evaluator = SystemEvaluator(criteria)
# Initialize refinement engine
refiner = RefinementEngine(evaluator)
try:
# Run diagnostic scan
debugger.run_diagnostic_scan()
# Example evaluation results (in practice, these would come from actual tests)
test_results = {
'navigation_performance': {
'success_rate': 0.82,
'average_accuracy': 0.35,
'average_efficiency': 0.85,
'meets_criteria': False
},
'vslam_performance': {
'tracking_success_rate': 0.88,
'average_drift': 0.06,
'initialization_time': 28.0,
'meets_criteria': False
},
'manipulation_performance': {
'success_rate': 0.78,
'average_precision': 0.018,
'meets_criteria': True
}
}
# Analyze results and identify refinement opportunities
opportunities = refiner.analyze_evaluation_results(test_results)
print(f"Identified {len(opportunities)} refinement opportunities")
for opp in opportunities:
print(f" - {opp['component']}.{opp['issue']}: {opp['current']} vs target {opp['target']}")
# Apply refinements
if opportunities:
refinements = refiner.apply_refinements(opportunities)
print(f"Applied {len(refinements)} refinements")
# Continue running
rclpy.spin(debugger)
except KeyboardInterrupt:
pass
finally:
debugger.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Hands-On Exercise: Testing and Evaluation
- Set up the unit and integration test frameworks for Isaac ROS components
- Run performance benchmarks on your VSLAM and navigation pipelines
- Evaluate system performance against defined metrics
- Use the debugging framework to identify and resolve issues
- Apply refinement strategies based on evaluation results
- Re-run tests to validate improvements
- Generate a comprehensive evaluation report
Example commands to run tests:
# Run unit tests
python3 -m pytest tests/test_vslam_components.py -v
# Run integration tests
python3 -m pytest tests/test_integration.py -v
# Run performance benchmarks
ros2 run isaac_ros_benchmark performance_node --ros-args -p test_type:=full_system
# Monitor system diagnostics
ros2 run rqt_plot rqt_plot /diagnostics
# Visualize debug markers
ros2 run rviz2 rviz2
Summary
This lesson covered systematic testing, evaluation, and refinement of the integrated autonomous humanoid system. You learned to implement unit and integration tests, conduct performance benchmarking, evaluate system performance against defined metrics, debug system issues, and apply refinement strategies based on evaluation results. The next lesson will explore future trends in Physical AI and Humanoid Robotics.