Skip to main content

Lesson 4.1: Voice-to-Action: Enabling Voice Commands with OpenAI Whisper

Overview

This lesson covers implementing voice command processing using OpenAI Whisper for humanoid robots. Learn to set up speech-to-text conversion, process natural language commands, and integrate voice input with robot control systems.

Learning Objectives

By the end of this lesson, you should be able to:

  • Set up OpenAI Whisper for speech-to-text conversion in robotics applications
  • Process voice commands and convert them to actionable robot instructions
  • Integrate voice processing with Isaac ROS and ROS 2 systems
  • Handle voice command ambiguity and implement error recovery
  • Design voice command grammars for humanoid robot control

Introduction to Voice Processing for Robotics

Why Voice Commands in Robotics?

Voice commands enable:

  • Natural Interaction: Intuitive human-robot communication
  • Hands-Free Operation: Useful when physical interfaces aren't practical
  • Accessibility: Enables robot control for users with limited mobility
  • Efficiency: Quick command execution without menu navigation

Voice Processing Pipeline for Humanoids

�����������������    �����������������    �����������������
 Voice Input ���� STT Processing ���� Command Parser 
 (Microphone)   (Whisper)   (NLP/LLM) 
����������������� ����������������� �����������������
  
� � �
����������������� ����������������� �����������������
 Audio Stream   Text Output   Structured 
 (Raw Audio)   (Transcript)   Command 
����������������� ����������������� �����������������
  
�����������������������<�����������������������

Robot Action Execution

Setting Up OpenAI Whisper for Robotics

Installation and Dependencies

# Install Whisper and related dependencies
pip install openai-whisper
pip install sounddevice # For audio input
pip install pyaudio # Alternative audio input
pip install transformers # For additional NLP capabilities
pip install torch torchaudio # For Whisper models

# For Isaac ROS integration
sudo apt update
sudo apt install ros-humble-isaac-ros-audio-input # If available

Whisper Model Selection for Robotics

Different Whisper models offer trade-offs between accuracy and speed:

# config/whisper_model_config.yaml
whisper_models:
tiny:
size: "75 MB"
languages: ["en", "multilingual"]
performance: "Fastest, lowest accuracy"
use_case: "Real-time applications, limited compute"
relative_speed: 32x
relative_vram: 1 GB

base:
size: "145 MB"
languages: ["en", "multilingual"]
performance: "Fast, good accuracy"
use_case: "Balanced real-time performance"
relative_speed: 16x
relative_vram: 1 GB

small:
size: "470 MB"
languages: ["en", "multilingual"]
performance: "Moderate speed, high accuracy"
use_case: "Applications requiring higher accuracy"
relative_speed: 6x
relative_vram: 2 GB

medium:
size: "1.5 GB"
languages: ["en", "multilingual"]
performance: "Slow, very high accuracy"
use_case: "Accuracy-critical applications"
relative_speed: 2x
relative_vram: 5 GB

large:
size: "3.0 GB"
languages: ["multilingual only"]
performance: "Slowest, highest accuracy"
use_case: "Multilingual applications, research"
relative_speed: 1x
relative_vram: 10 GB

Basic Whisper Implementation

# robot_voice_processor.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from audio_common_msgs.msg import AudioData
import whisper
import numpy as np
import torch
import io
from scipy.io import wavfile

class RobotVoiceProcessor(Node):
def __init__(self):
super().__init__('robot_voice_processor')

# Initialize Whisper model
self.get_logger().info('Loading Whisper model...')
try:
# Use 'tiny' or 'base' for real-time robotics applications
self.model = whisper.load_model("base.en") # English-only model for better performance
self.get_logger().info('Whisper model loaded successfully')
except Exception as e:
self.get_logger().error(f'Failed to load Whisper model: {str(e)}')
# Fallback to CPU if GPU fails
self.model = whisper.load_model("base.en", device="cpu")
self.get_logger().warn('Loaded Whisper model on CPU due to error')

# Audio processing parameters
self.sample_rate = 16000 # Standard for speech recognition
self.chunk_duration = 1.0 # Process audio in 1-second chunks
self.vad_threshold = 0.3 # Voice activity detection threshold
self.min_voice_duration = 0.5 # Minimum voice duration to process

# Subscriptions and publishers
self.audio_sub = self.create_subscription(
AudioData,
'/audio/input',
self.audio_callback,
10
)

self.voice_command_pub = self.create_publisher(
String,
'/voice_command/transcript',
10
)

self.parsed_command_pub = self.create_publisher(
String,
'/voice_command/parsed',
10
)

self.get_logger().info('Robot Voice Processor initialized')

def audio_callback(self, msg):
"""Process incoming audio data from microphone"""
try:
# Convert audio data to numpy array
# Assuming audio data is in int16 format
audio_data = np.frombuffer(msg.data, dtype=np.int16).astype(np.float32)

# Normalize to [-1, 1] range
audio_data = audio_data / 32768.0

# Check for voice activity
if self.is_voice_active(audio_data):
self.get_logger().info(f'Voice activity detected in chunk of {len(audio_data)/self.sample_rate:.2f}s')

# Perform speech-to-text
transcript = self.transcribe_audio(audio_data)

if transcript.strip(): # Only publish if we got a transcript
self.publish_transcript(transcript)

# Parse the command for robot execution
parsed_command = self.parse_command(transcript)
self.publish_parsed_command(parsed_command)
else:
self.get_logger().debug('No significant voice activity detected')

except Exception as e:
self.get_logger().error(f'Error processing audio: {str(e)}')

def is_voice_active(self, audio_data):
"""Simple voice activity detection based on energy threshold"""
# Calculate RMS energy of the audio chunk
rms_energy = np.sqrt(np.mean(audio_data ** 2))

# Check if energy exceeds threshold and duration is sufficient
duration = len(audio_data) / self.sample_rate

return rms_energy > self.vad_threshold and duration >= self.min_voice_duration

def transcribe_audio(self, audio_data):
"""Transcribe audio using Whisper model"""
try:
# Ensure audio is in correct format for Whisper
# Whisper expects audio at 16kHz
if len(audio_data) == 0:
return ""

# Pad or trim audio to minimum length if needed
min_length = 16000 # 1 second at 16kHz
if len(audio_data) < min_length:
padding = min_length - len(audio_data)
audio_data = np.pad(audio_data, (0, padding), mode='constant')

# Run Whisper transcription
result = self.model.transcribe(audio_data)
transcript = result['text'].strip()

self.get_logger().info(f'Transcribed: "{transcript}"')
return transcript

except Exception as e:
self.get_logger().error(f'Error in Whisper transcription: {str(e)}')
return ""

def parse_command(self, transcript):
"""Parse natural language command into robot instructions"""
# This is a simple rule-based parser - in practice, you might use
# more sophisticated NLP or LLM-based parsing
transcript_lower = transcript.lower()

# Define command patterns
command_patterns = {
'move_forward': ['move forward', 'go forward', 'forward', 'go ahead', 'straight'],
'move_backward': ['move backward', 'go backward', 'backward', 'reverse', 'back'],
'turn_left': ['turn left', 'rotate left', 'left', 'pivot left'],
'turn_right': ['turn right', 'rotate right', 'right', 'pivot right'],
'stop': ['stop', 'halt', 'freeze', 'pause'],
'pick_up': ['pick up', 'grasp', 'take', 'grab', 'lift'],
'place_down': ['place down', 'put down', 'release', 'drop', 'place'],
'navigate_to': ['go to', 'navigate to', 'move to', 'go to location'],
'find_object': ['find', 'locate', 'look for', 'search for', 'where is']
}

# Identify the command type
command_type = None
confidence = 0.0

for cmd_type, patterns in command_patterns.items():
for pattern in patterns:
if pattern in transcript_lower:
command_type = cmd_type
confidence = 1.0 # Perfect match
break
if command_type:
break

if not command_type:
# Try fuzzy matching with similarity
command_type, confidence = self.fuzzy_match_command(transcript_lower, command_patterns)

# Extract parameters from the command
parameters = self.extract_parameters(transcript_lower, command_type)

parsed_command = {
'type': command_type,
'confidence': confidence,
'original_transcript': transcript,
'parameters': parameters,
'timestamp': self.get_clock().now().to_msg()
}

return parsed_command

def fuzzy_match_command(self, transcript, patterns):
"""Perform fuzzy matching for command identification"""
# This is a simplified implementation
# In practice, use libraries like fuzzywuzzy or difflib
best_match = None
best_score = 0.0

for cmd_type, cmd_patterns in patterns.items():
for pattern in cmd_patterns:
# Simple similarity check
score = self.calculate_similarity(transcript, pattern)
if score > best_score:
best_score = score
best_match = cmd_type

return best_match, best_score

def calculate_similarity(self, str1, str2):
"""Calculate similarity between two strings"""
# Simplified similarity calculation
# In practice, use difflib.SequenceMatcher or fuzzywuzzy
words1 = set(str1.split())
words2 = set(str2.split())

intersection = words1.intersection(words2)
union = words1.union(words2)

if len(union) == 0:
return 0.0

return len(intersection) / len(union)

def extract_parameters(self, transcript, command_type):
"""Extract parameters from voice command"""
parameters = {}

# Extract location for navigation commands
if command_type and 'navigate' in command_type:
# Look for location keywords
locations = ['kitchen', 'bedroom', 'living room', 'office', 'bathroom', 'dining room', 'hallway']
for location in locations:
if location in transcript:
parameters['target_location'] = location
break

# Extract object for manipulation commands
if command_type and any(keyword in command_type for keyword in ['pick', 'grasp', 'take', 'place']):
# Look for object descriptions
objects = ['cube', 'ball', 'box', 'cup', 'bottle', 'book', 'phone', 'keys']
colors = ['red', 'blue', 'green', 'yellow', 'white', 'black', 'gray', 'orange', 'purple', 'pink']

for obj in objects:
if obj in transcript:
parameters['target_object'] = obj
break

for color in colors:
if color in transcript:
parameters['object_color'] = color
break

return parameters

def publish_transcript(self, transcript):
"""Publish the raw transcript"""
transcript_msg = String()
transcript_msg.data = transcript
self.voice_command_pub.publish(transcript_msg)

def publish_parsed_command(self, parsed_command):
"""Publish the parsed command structure"""
import json
command_msg = String()
command_msg.data = json.dumps(parsed_command)
self.parsed_command_pub.publish(command_msg)

def main(args=None):
rclpy.init(args=args)
node = RobotVoiceProcessor()

try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down Robot Voice Processor')
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Advanced Voice Command Processing

Context-Aware Command Understanding

# context_aware_voice_processor.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import Image
import json
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class RobotContext:
"""Current state and context of the robot"""
position: Optional[List[float]] = None
orientation: Optional[List[float]] = None
detected_objects: List[Dict] = None
available_actions: List[str] = None
current_task: str = ""
battery_level: float = 100.0

class ContextAwareVoiceProcessor(Node):
def __init__(self):
super().__init__('context_aware_voice_processor')

# Robot context tracking
self.context = RobotContext()
self.context.detected_objects = []
self.context.available_actions = []

# Subscriptions for context
self.pose_sub = self.create_subscription(
PoseStamped,
'/robot/pose',
self.pose_callback,
10
)

self.objects_sub = self.create_subscription(
String,
'/vision/detected_objects',
self.objects_callback,
10
)

# Audio and command processing
self.audio_sub = self.create_subscription(
String, # In practice, this would be AudioData
'/audio/transcript',
self.transcript_callback,
10
)

self.contextual_command_pub = self.create_publisher(
String,
'/voice_command/contextual',
10
)

# Update context periodically
self.context_update_timer = self.create_timer(1.0, self.update_context)

self.get_logger().info('Context-Aware Voice Processor initialized')

def pose_callback(self, msg):
"""Update robot position in context"""
self.context.position = [
msg.pose.position.x,
msg.pose.position.y,
msg.pose.position.z
]
self.context.orientation = [
msg.pose.orientation.x,
msg.pose.orientation.y,
msg.pose.orientation.z,
msg.pose.orientation.w
]

def objects_callback(self, msg):
"""Update detected objects in context"""
try:
objects_data = json.loads(msg.data)
self.context.detected_objects = objects_data.get('objects', [])
except json.JSONDecodeError:
self.get_logger().error('Failed to parse detected objects data')

def transcript_callback(self, msg):
"""Process contextual voice command"""
transcript = msg.data

# Parse command with context awareness
contextual_command = self.parse_contextual_command(transcript)

if contextual_command:
# Publish contextual command
command_msg = String()
command_msg.data = json.dumps(contextual_command)
self.contextual_command_pub.publish(command_msg)

def parse_contextual_command(self, transcript):
"""Parse command considering robot context"""
command = self.basic_parse_command(transcript)

if not command:
return None

# Enhance command with context
enhanced_command = {
'original_command': command,
'context': self.get_current_context(),
'resolved_parameters': {},
'confidence_adjusted': self.adjust_confidence_with_context(command, self.context)
}

# Resolve ambiguous references based on context
resolved_params = self.resolve_contextual_references(command, self.context)
enhanced_command['resolved_parameters'] = resolved_params

return enhanced_command

def basic_parse_command(self, transcript):
"""Basic command parsing (similar to previous implementation)"""
# Simplified command parsing
transcript_lower = transcript.lower()

if any(word in transcript_lower for word in ['move forward', 'go forward']):
return {'type': 'move_forward', 'confidence': 0.9}
elif any(word in transcript_lower for word in ['turn left', 'rotate left']):
return {'type': 'turn_left', 'confidence': 0.9}
elif any(word in transcript_lower for word in ['pick up', 'grasp', 'take']):
return {'type': 'pick_up', 'confidence': 0.8}
elif any(word in transcript_lower for word in ['go to', 'navigate to']):
return {'type': 'navigate_to', 'confidence': 0.85}
else:
return {'type': 'unknown', 'confidence': 0.1}

def get_current_context(self):
"""Get current robot context as dictionary"""
return {
'position': self.context.position,
'detected_objects': self.context.detected_objects,
'available_actions': self.context.available_actions,
'battery_level': self.context.battery_level
}

def adjust_confidence_with_context(self, command, context):
"""Adjust command confidence based on context"""
base_confidence = command.get('confidence', 0.5)

# Increase confidence if command is available in current context
if (command.get('type') in context.available_actions or
context.available_actions == []): # If all actions available
base_confidence += 0.1

# Adjust for contextual relevance
if (command.get('type') == 'navigate_to' and
context.position is not None):
base_confidence += 0.05 # More relevant when robot knows its position

return min(base_confidence, 1.0) # Cap at 1.0

def resolve_contextual_references(self, command, context):
"""Resolve ambiguous references based on context"""
resolved = {}

# Resolve object references like "that one" or "the red one"
if command.get('type') in ['pick_up', 'grasp', 'take', 'examine']:
# Look for ambiguous object references
transcript = command.get('original_transcript', '').lower()

if 'that' in transcript or 'there' in transcript:
# Use most recently detected object or closest object
if context.detected_objects:
# Find closest object to robot
if context.position:
closest_obj = min(
context.detected_objects,
key=lambda obj: self.calculate_distance_3d(
context.position,
obj.get('position', [0, 0, 0])
)
)
resolved['target_object'] = closest_obj
else:
# Use most recent object
resolved['target_object'] = context.detected_objects[-1]

# Resolve location references like "over there" or "near me"
elif command.get('type') == 'navigate_to':
transcript = command.get('original_transcript', '').lower()

if 'there' in transcript:
# This would require more complex spatial reasoning
# For now, use the position of the most recently mentioned object
if context.detected_objects:
target_obj = context.detected_objects[-1]
resolved['target_position'] = target_obj.get('position', [0, 0, 0])

return resolved

def calculate_distance_3d(self, pos1, pos2):
"""Calculate 3D Euclidean distance between two positions"""
dx = pos1[0] - pos2[0]
dy = pos1[1] - pos2[1]
dz = pos1[2] - pos2[2]
return (dx*dx + dy*dy + dz*dz) ** 0.5

def update_context(self):
"""Periodically update context information"""
self.get_logger().debug(f'Context updated. Objects: {len(self.context.detected_objects)}')

def main(args=None):
rclpy.init(args=args)
node = ContextAwareVoiceProcessor()

try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down Context-Aware Voice Processor')
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Isaac ROS Audio Integration

Isaac ROS Audio Pipeline

# isaac_ros_audio_integration.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import AudioData
from std_msgs.msg import String
import subprocess
import threading
import queue

class IsaacAudioProcessor(Node):
def __init__(self):
super().__init__('isaac_audio_processor')

# Isaac Sim audio input (simulated)
self.audio_input_sub = self.create_subscription(
AudioData,
'/isaac_sim/audio_input',
self.isaac_audio_callback,
10
)

# Processed command output
self.processed_command_pub = self.create_publisher(
String,
'/isaac_audio/processed_command',
10
)

# Audio processing queue and thread
self.audio_queue = queue.Queue()
self.processing_thread = threading.Thread(target=self.process_audio_queue)
self.processing_thread.daemon = True
self.processing_thread.start()

# Initialize Whisper model
import whisper
self.whisper_model = whisper.load_model("base.en")

self.get_logger().info('Isaac Audio Processor initialized')

def isaac_audio_callback(self, msg):
"""Handle audio input from Isaac Sim"""
# Add audio data to processing queue
self.audio_queue.put(msg)

def process_audio_queue(self):
"""Process audio data in separate thread"""
while rclpy.ok():
try:
# Get audio data from queue with timeout
audio_msg = self.audio_queue.get(timeout=1.0)

# Process the audio
transcript = self.process_isaac_audio(audio_msg)

if transcript.strip():
# Parse and publish command
parsed_command = self.parse_isaac_command(transcript)
self.publish_isaac_command(parsed_command)

self.audio_queue.task_done()

except queue.Empty:
continue # No audio data, continue loop
except Exception as e:
self.get_logger().error(f'Error in audio processing thread: {str(e)}')

def process_isaac_audio(self, audio_msg):
"""Process audio from Isaac Sim using Whisper"""
import numpy as np
import torch

try:
# Convert audio data to numpy array
audio_data = np.frombuffer(audio_msg.data, dtype=np.int16).astype(np.float32) / 32768.0

# Transcribe using Whisper
result = self.whisper_model.transcribe(audio_data)
return result['text'].strip()

except Exception as e:
self.get_logger().error(f'Error processing Isaac audio: {str(e)}')
return ""

def parse_isaac_command(self, transcript):
"""Parse command specifically for Isaac Sim environment"""
# Similar to previous parser but adapted for Isaac Sim
transcript_lower = transcript.lower()

# Isaac Sim specific commands
command_types = {
'move_forward': ['move forward', 'go forward', 'forward', 'advance'],
'move_backward': ['move backward', 'go backward', 'backward', 'retreat'],
'turn_left': ['turn left', 'rotate left', 'left', 'pivot left'],
'turn_right': ['turn right', 'rotate right', 'right', 'pivot right'],
'move_to_object': ['go to', 'navigate to', 'move to', 'approach'],
'grasp_object': ['pick up', 'grasp', 'take', 'grab', 'lift'],
'release_object': ['place', 'put down', 'release', 'drop'],
'inspect_object': ['look at', 'examine', 'inspect', 'check'],
'reset_simulation': ['reset', 'restart', 'reinitialize'],
'save_checkpoint': ['save', 'checkpoint', 'record state']
}

command_type = None
for cmd_type, patterns in command_types.items():
for pattern in patterns:
if pattern in transcript_lower:
command_type = cmd_type
break
if command_type:
break

# Extract Isaac Sim specific parameters
parameters = self.extract_isaac_parameters(transcript_lower, command_type)

return {
'type': command_type,
'original_transcript': transcript,
'parameters': parameters,
'source': 'isaac_sim_audio',
'timestamp': self.get_clock().now().to_msg()
}

def extract_isaac_parameters(self, transcript, command_type):
"""Extract parameters for Isaac Sim commands"""
parameters = {}

# Extract object names from Isaac Sim environment
if command_type and any(keyword in command_type for keyword in ['grasp', 'move_to', 'inspect']):
# Common object names in Isaac Sim
objects = [
'cube', 'sphere', 'cylinder', 'capsule', 'cone', 'torus',
'robot', 'table', 'chair', 'box', 'ball', 'container',
'target', 'goal', 'obstacle', 'marker'
]

for obj in objects:
if obj in transcript:
parameters['target_object'] = obj
break

# Extract scene locations
if command_type and 'move_to' in command_type:
locations = [
'spawn', 'origin', 'center', 'home', 'base', 'workstation',
'loading zone', 'inspection station', 'delivery point'
]

for loc in locations:
if loc in transcript:
parameters['target_location'] = loc
break

return parameters

def publish_isaac_command(self, command):
"""Publish processed command for Isaac Sim execution"""
import json
cmd_msg = String()
cmd_msg.data = json.dumps(command)
self.processed_command_pub.publish(cmd_msg)

def main(args=None):
rclpy.init(args=args)
node = IsaacAudioProcessor()

try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down Isaac Audio Processor')
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Voice Command Grammar and Validation

Structured Command Grammar

# config/voice_command_grammar.yaml
command_grammar:
navigation_commands:
patterns:
- "go to [LOCATION]"
- "navigate to [LOCATION]"
- "move to [LOCATION]"
- "go to the [LOCATION]"
- "take me to [LOCATION]"
locations:
- "kitchen"
- "bedroom"
- "living room"
- "office"
- "bathroom"
- "dining room"
- "hallway"
- "garage"
- "garden"
- "entrance"
- "exit"
- "home"
- "base station"
parameters:
required: ["target_location"]
optional: ["speed", "avoid_obstacles"]

manipulation_commands:
patterns:
- "[ACTION] the [COLOR] [OBJECT]"
- "[ACTION] [OBJECT]"
- "[ACTION] [DESCRIPTOR] [OBJECT]"
- "pick up [DESCRIPTOR] [OBJECT]"
- "grasp the [OBJECT]"
actions:
- "pick up"
- "grasp"
- "take"
- "lift"
- "hold"
- "release"
- "place"
- "put down"
- "drop"
- "manipulate"
objects:
- "cube"
- "ball"
- "box"
- "cup"
- "bottle"
- "book"
- "phone"
- "keys"
- "toy"
- "tool"
- "object"
colors:
- "red"
- "blue"
- "green"
- "yellow"
- "white"
- "black"
- "gray"
- "orange"
- "purple"
- "pink"
parameters:
required: ["action", "target_object"]
optional: ["color", "size", "position"]

inspection_commands:
patterns:
- "find [OBJECT]"
- "locate [OBJECT]"
- "where is the [OBJECT]"
- "show me the [OBJECT]"
- "look for [OBJECT]"
parameters:
required: ["target_object"]
optional: ["search_area", "scan_method"]

system_commands:
patterns:
- "[ACTION] system"
- "[ACTION] robot"
- "robot [ACTION]"
actions:
- "stop"
- "halt"
- "pause"
- "resume"
- "reset"
- "calibrate"
- "charge"
- "sleep"
- "wake up"
- "shutdown"
parameters:
required: ["action"]
optional: []

validation_rules:
# Rules for validating parsed commands
navigation:
required_fields: ["target_location"]
location_validity_check: true
path_exists_check: true

manipulation:
required_fields: ["action", "target_object"]
object_in_workspace: true
reachable_check: true
graspable_check: true

inspection:
required_fields: ["target_object"]
object_recognizable: true
viewable_check: true

Command Validation Implementation

# command_validator.py
import yaml
import json
from typing import Dict, Any, List

class VoiceCommandValidator:
def __init__(self, grammar_file_path):
with open(grammar_file_path, 'r') as f:
self.grammar = yaml.safe_load(f)

def validate_command(self, parsed_command: Dict[str, Any]) -> Dict[str, Any]:
"""Validate a parsed voice command against grammar rules"""
command_type = parsed_command.get('type', 'unknown')
parameters = parsed_command.get('parameters', {})

validation_result = {
'is_valid': True,
'errors': [],
'warnings': [],
'suggestions': []
}

if command_type == 'navigate_to':
validation_result = self.validate_navigation_command(parameters, validation_result)
elif command_type in ['pick_up', 'grasp', 'take', 'place', 'release']:
validation_result = self.validate_manipulation_command(parameters, validation_result)
elif command_type in ['find', 'locate', 'look_for']:
validation_result = self.validate_inspection_command(parameters, validation_result)
elif command_type in ['stop', 'pause', 'resume', 'reset']:
validation_result = self.validate_system_command(parameters, validation_result)
else:
validation_result['warnings'].append(f'Unknown command type: {command_type}')

return validation_result

def validate_navigation_command(self, params: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate navigation command parameters"""
required_fields = self.grammar['command_grammar']['validation_rules']['navigation']['required_fields']

for field in required_fields:
if field not in params:
result['is_valid'] = False
result['errors'].append(f'Missing required field: {field}')

# Validate location if provided
if 'target_location' in params:
valid_locations = self.grammar['command_grammar']['navigation_commands']['locations']
target_location = params['target_location']

if target_location not in valid_locations:
# Check for similar locations (suggest corrections)
similar_locs = self.find_similar_items(target_location, valid_locations, threshold=0.7)
if similar_locs:
result['warnings'].append(f'Unknown location: {target_location}. Did you mean: {similar_locs[0]}?')
result['suggestions'].append(f'Use location: {similar_locs[0]}')
else:
result['errors'].append(f'Invalid location: {target_location}')

return result

def validate_manipulation_command(self, params: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate manipulation command parameters"""
required_fields = self.grammar['command_grammar']['validation_rules']['manipulation']['required_fields']

for field in required_fields:
if field not in params:
result['is_valid'] = False
result['errors'].append(f'Missing required field: {field}')

# Validate action if provided
if 'action' in params:
valid_actions = self.grammar['command_grammar']['manipulation_commands']['actions']
action = params['action']

if action not in valid_actions:
similar_actions = self.find_similar_items(action, valid_actions, threshold=0.7)
if similar_actions:
result['warnings'].append(f'Unknown action: {action}. Did you mean: {similar_actions[0]}?')
result['suggestions'].append(f'Use action: {similar_actions[0]}')
else:
result['errors'].append(f'Invalid action: {action}')

# Validate object if provided
if 'target_object' in params:
valid_objects = self.grammar['command_grammar']['manipulation_commands']['objects']
target_object = params['target_object']

if target_object not in valid_objects:
similar_objects = self.find_similar_items(target_object, valid_objects, threshold=0.7)
if similar_objects:
result['warnings'].append(f'Unknown object: {target_object}. Did you mean: {similar_objects[0]}?')
result['suggestions'].append(f'Use object: {similar_objects[0]}')
else:
result['warnings'].append(f'Uncommon object: {target_object}. May not be recognized.')

return result

def validate_inspection_command(self, params: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate inspection command parameters"""
required_fields = self.grammar['command_grammar']['validation_rules']['inspection']['required_fields']

for field in required_fields:
if field not in params:
result['is_valid'] = False
result['errors'].append(f'Missing required field: {field}')

# Validate target object if provided
if 'target_object' in params:
valid_objects = self.grammar['command_grammar']['manipulation_commands']['objects'] # Reuse from manipulation
target_object = params['target_object']

if target_object not in valid_objects:
similar_objects = self.find_similar_items(target_object, valid_objects, threshold=0.7)
if similar_objects:
result['warnings'].append(f'Unknown object: {target_object}. Did you mean: {similar_objects[0]}?')
else:
result['warnings'].append(f'Uncommon object: {target_object}. May not be detectable.')

return result

def validate_system_command(self, params: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate system command parameters"""
required_fields = self.grammar['command_grammar']['validation_rules']['system']['required_fields']

for field in required_fields:
if field not in params:
result['is_valid'] = False
result['errors'].append(f'Missing required field: {field}')

return result

def find_similar_items(self, target: str, candidates: List[str], threshold: float = 0.7) -> List[str]:
"""Find items in candidates that are similar to target"""
# This is a simplified similarity function
# In practice, use difflib or fuzzywuzzy for better results
similar = []

for candidate in candidates:
similarity = self.calculate_string_similarity(target.lower(), candidate.lower())
if similarity >= threshold:
similar.append(candidate)

# Sort by similarity (highest first)
similar.sort(key=lambda x: self.calculate_string_similarity(target.lower(), x.lower()), reverse=True)
return similar

def calculate_string_similarity(self, str1: str, str2: str) -> float:
"""Calculate similarity between two strings (simplified)"""
# Simplified Jaccard similarity for words
words1 = set(str1.split())
words2 = set(str2.split())

intersection = words1.intersection(words2)
union = words1.union(words2)

if not union:
return 0.0

return len(intersection) / len(union)

# Example usage
def validate_voice_command_example():
validator = VoiceCommandValidator('config/voice_command_grammar.yaml')

# Example parsed command
test_command = {
'type': 'navigate_to',
'parameters': {
'target_location': 'kitchn' # Intentional typo
}
}

validation_result = validator.validate_command(test_command)

print("Validation Result:")
print(f" Valid: {validation_result['is_valid']}")
print(f" Errors: {validation_result['errors']}")
print(f" Warnings: {validation_result['warnings']}")
print(f" Suggestions: {validation_result['suggestions']}")

Error Handling and Recovery

Voice Command Error Recovery Strategies

# error_recovery_handler.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import json
import time

class VoiceCommandRecoveryHandler(Node):
def __init__(self):
super().__init__('voice_command_recovery_handler')

# Subscriptions
self.command_sub = self.create_subscription(
String,
'/voice_command/parsed',
self.command_callback,
10
)

self.validation_sub = self.create_subscription(
String,
'/voice_command/validation',
self.validation_callback,
10
)

# Publishers
self.recovery_command_pub = self.create_publisher(
String,
'/voice_command/recovery',
10
)

self.user_prompt_pub = self.create_publisher(
String,
'/voice_command/user_prompt',
10
)

# Internal state
self.command_history = []
self.max_history = 10
self.last_recovery_time = 0
self.recovery_cooldown = 2.0 # seconds

self.get_logger().info('Voice Command Recovery Handler initialized')

def command_callback(self, msg):
"""Process incoming voice commands and check for errors"""
try:
command_data = json.loads(msg.data)

# Add to command history
self.command_history.append({
'command': command_data,
'timestamp': time.time(),
'status': 'pending'
})

# Keep history size manageable
if len(self.command_history) > self.max_history:
self.command_history.pop(0)

except json.JSONDecodeError:
self.get_logger().error('Invalid JSON in command message')

def validation_callback(self, msg):
"""Process validation results and trigger recovery if needed"""
try:
validation_data = json.loads(msg.data)

if not validation_data.get('is_valid', True):
# Command validation failed, trigger recovery
self.handle_validation_failure(validation_data)

except json.JSONDecodeError:
self.get_logger().error('Invalid JSON in validation message')

def handle_validation_failure(self, validation_result):
"""Handle validation failures with appropriate recovery strategy"""
current_time = time.time()

# Check cooldown to prevent spam
if current_time - self.last_recovery_time < self.recovery_cooldown:
return

self.last_recovery_time = current_time

errors = validation_result.get('errors', [])
warnings = validation_result.get('warnings', [])
suggestions = validation_result.get('suggestions', [])

if errors:
self.get_logger().error(f'Command validation errors: {errors}')

# Strategy 1: Ask for clarification
if 'Missing required field' in str(errors):
self.ask_for_missing_information(errors)
elif 'Invalid location' in str(errors) or 'Unknown location' in str(errors):
self.ask_for_corrected_location(errors, suggestions)
elif 'Invalid action' in str(errors):
self.ask_for_corrected_action(errors, suggestions)
else:
# General error - ask user to repeat
self.prompt_user_to_repeat()

elif warnings:
self.get_logger().warning(f'Command validation warnings: {warnings}')

# For warnings, we might still execute but inform user
if suggestions:
self.inform_user_of_suggestions(suggestions)

def ask_for_missing_information(self, errors):
"""Ask user for missing information"""
missing_fields = []
for error in errors:
if 'Missing required field' in error:
field_name = error.split(': ')[1] if ':' in error else 'unknown'
missing_fields.append(field_name)

if missing_fields:
prompt = f"I'm missing some information. Could you please specify: {', '.join(missing_fields)}"
self.publish_user_prompt(prompt)

def ask_for_corrected_location(self, errors, suggestions):
"""Ask user to confirm or correct location"""
original_location = "unknown" # Would extract from original command

if suggestions:
corrected_location = suggestions[0].split(': ')[1] if ':' in suggestions[0] else "unknown"
prompt = f"Did you mean '{corrected_location}' instead of the location you mentioned? Please confirm or say the correct location."
else:
prompt = "I didn't recognize that location. Could you please repeat the location?"

self.publish_user_prompt(prompt)

def ask_for_corrected_action(self, errors, suggestions):
"""Ask user to confirm or correct action"""
if suggestions:
corrected_action = suggestions[0].split(': ')[1] if ':' in suggestions[0] else "unknown"
prompt = f"Did you mean to '{corrected_action}' instead? Please confirm or repeat your command."
else:
prompt = "I didn't understand that action. Could you please repeat your command?"

self.publish_user_prompt(prompt)

def prompt_user_to_repeat(self, specific_issue=None):
"""General prompt for user to repeat command"""
if specific_issue:
prompt = f"I had trouble with your command: {specific_issue}. Could you please repeat it?"
else:
prompt = "I didn't understand your command. Could you please repeat it?"

self.publish_user_prompt(prompt)

def inform_user_of_suggestions(self, suggestions):
"""Inform user of validation suggestions"""
if suggestions:
prompt = f"I noticed something that might need attention: {suggestions[0]}"
self.publish_user_prompt(prompt)

def publish_user_prompt(self, prompt_text):
"""Publish a prompt for the user"""
prompt_msg = String()
prompt_msg.data = prompt_text
self.user_prompt_pub.publish(prompt_msg)

self.get_logger().info(f'User prompt: {prompt_text}')

def main(args=None):
rclpy.init(args=args)
node = VoiceCommandRecoveryHandler()

try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down Voice Command Recovery Handler')
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Hands-On Exercise

  1. Set up OpenAI Whisper in your development environment
  2. Implement the basic voice processor node with speech-to-text functionality
  3. Test with recorded audio samples or live microphone input
  4. Implement the contextual voice processor that considers robot state
  5. Integrate with Isaac Sim audio input (simulated)
  6. Implement command validation and error recovery
  7. Test the complete voice-to-action pipeline with simulated commands

Example commands to test:

# Test the voice processing node
ros2 run your_voice_package robot_voice_processor

# Test with sample audio
ros2 topic pub /audio/input audio_common_msgs/AudioData "data: [0, 1, 2, ...]"

# Monitor the processed commands
ros2 topic echo /voice_command/parsed

Summary

This lesson covered implementing voice command processing for humanoid robots using OpenAI Whisper. You learned to set up speech-to-text conversion, process natural language commands, integrate with Isaac ROS systems, handle voice command ambiguity, and implement error recovery strategies. The next lesson will explore cognitive planning with LLMs for translating natural language to robot actions.