Of course! rospy is the official Python client library for ROS (Robot Operating System). It's the primary way you'll write ROS nodes in Python. This guide will cover the essentials, from basic concepts to common tasks.

Table of Contents
-
Core Concepts
- What is
rospy? - The Node
- The Publisher
- The Subscriber
- The Service & Service Client (RPC)
- The Parameter Server
rospyTime and Duration
- What is
-
Essential
rospyTutorials & Examples- "Hello World" Publisher
- "Hello World" Subscriber
- Complete Talker/Listener Example
- Simple Service Example
-
Common Tasks & Useful Tips
- Logging (
rospy.loginfo) - Rate Control (
rospy.Rate) - Arguments (
rospy.myargv) - Shutdown Handling
- Waiting for things (
rospy.wait_for_message,rospy.wait_for_service)
- Logging (
-
Troubleshooting & Best Practices
(图片来源网络,侵删)- Common Errors
- Best Practices
Core Concepts
What is rospy?
rospy is a Python library that provides the tools needed to write ROS nodes. It acts as a client to the roscore (ROS Master), which manages the communication graph (nodes, topics, services, etc.). All rospy code must be run inside a source'd ROS workspace (source devel/setup.bash).
The Node
Every ROS program is a node. In rospy, you create a node with rospy.init_node().
import rospy
# Must be called before any other rospy functions are called.
# 'listener_node' is the name of our node.
# anonymous=True makes the name unique, which is good practice.
rospy.init_node('listener_node', anonymous=True)
- Name: The name of your node (e.g.,
/listener_node). anonymous=True: Appends a random number to the end of your node's name to prevent conflicts if you run multiple instances of the same script.
The Publisher
A Publisher sends messages over a Topic. Other nodes can subscribe to that topic to receive the messages.
from std_msgs.msg import String # Import the message type
pub = rospy.Publisher('chatter', String, queue_size=10)
'chatter': The name of the topic to publish on.String: The data type of the messages being published. You must import the message type from its corresponding package (e.g.,std_msgs.msg).queue_size=10: The maximum number of messages to buffer before dropping old ones. Important for handling publishers that are faster than subscribers.
The Subscriber
A Subscriber listens to a Topic and executes a callback function whenever a new message is received.

def chatter_callback(data):
"""This function is called every time a message is received."""
rospy.loginfo("I heard: %s", data.data)
# 'chatter_callback' is the function to call when a message arrives.
# 'chatter' is the topic to listen on.
# 'String' is the expected message type.
rospy.Subscriber('chatter', String, chatter_callback)
chatter_callback: The function that will be executed when a message is received. It must take one argument, which is the message object.data: The received message object. Its structure depends on the message type (e.g.,data.datafor astd_msgs/String).
The Service & Service Client (RPC)
Services are a request/repair communication model, like a function call. One node provides a Service, and another node calls it with a Request and gets a Response.
Service Server (Provider):
from std_srvs.srv import SetBool, SetBoolResponse
def handle_set_bool(req):
"""Callback function for the service."""
rospy.loginfo("Request: %s", req.data)
success = not req.data # Example logic: invert the boolean
message = "Success!" if success else "Failure!"
return SetBoolResponse(success, message)
# 'set_bool_service' is the name of the service.
# SetBool is the service type (defined in .srv files).
s = rospy.Service('set_bool_service', SetBool, handle_set_bool)
rospy.loginfo("Ready to toggle boolean.")
rospy.spin() # Keep the node alive
Service Client (Caller):
from std_srvs.srv import SetBool
# Wait for the service to be available
rospy.wait_for_service('set_bool_service')
try:
# Create a handle to the service
set_bool = rospy.ServiceProxy('set_bool_service', SetBool)
# Call the service
response = set_bool(True) # Send a request with data=True
rospy.loginfo("Response: %s, %s", response.success, response.message)
except rospy.ServiceException as e:
rospy.logerr("Service call failed: %s", e)
The Parameter Server
The Parameter Server is a global dictionary for storing configuration data. Nodes can get, set, and monitor parameters.
# Set a parameter
rospy.set_param('/my_node/param_name', 'hello world')
# Get a parameter
param_value = rospy.get_param('/my_node/param_name')
rospy.loginfo("Param value is: %s", param_value)
# Get a parameter with a default value if it doesn't exist
param_value = rospy.get_param('/my_node/param_name', 'default_value')
# Monitor a parameter for changes
def param_callback(value, param_name):
rospy.loginfo("Parameter %s changed to %s", param_name, value)
rospy.set_param('/my_node/param_name', 'initial_value')
rospy.set_param('/my_node/another_param', 123)
param_name = '/my_node/param_name'
rospy.set_param(param_name, 'initial_value')
rospy.loginfo("Monitoring parameter: %s", param_name)
rospy.get_param_server().set_param_updates(True, param_name, param_callback)
rospy Time and Duration
ROS has its own clock system. You should use rospy.Time and rospy.Duration for all time-related operations.
import time
# Get the current ROS time (can be wall time or simulated time)
now = rospy.Time.now()
rospy.loginfo("Current ROS time: %s", now)
# Create a specific time
t = rospy.Time(10, 500) # 10 seconds and 500 nanoseconds
# Create a duration
d = rospy.Duration(1, 0) # 1 second
# Sleep for a duration
rospy.sleep(rospy.Duration(0.5)) # Sleep for 0.5 seconds
# A common pattern for timing
start_time = rospy.Time.now()
# ... do some work ...
end_time = rospy.Time.now()
elapsed = end_time - start_time
rospy.loginfo("Operation took %s seconds", elapsed.to_sec())
Essential rospy Tutorials & Examples
"Hello World" Publisher (talker.py)
This node continuously publishes a "hello world" message.
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def talker():
# Create a Publisher object
pub = rospy.Publisher('chatter', String, queue_size=10)
# Initialize the node
rospy.init_node('talker', anonymous=True)
# Set the publishing rate to 10 Hz
rate = rospy.Rate(10)
while not rospy.is_shutdown():
# Create the message
hello_str = "hello world %s" % rospy.get_time()
# Log the message to the console
rospy.loginfo(hello_str)
# Publish the message
pub.publish(hello_str)
# Sleep to maintain the rate
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
"Hello World" Subscriber (listener.py)
This node listens to the chatter topic and prints any message it receives.
#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def chatter_callback(data):
"""Callback function executed when a message is received."""
rospy.loginfo("I heard: [%s]", data.data)
def listener():
# Initialize the node
rospy.init_node('listener', anonymous=True)
# Create a Subscriber object, which will call chatter_callback
rospy.Subscriber('chatter', String, chatter_callback)
# spin() simply keeps python from exiting until this node is stopped
rospy.spin()
if __name__ == '__main__':
listener()
To run these examples:
- Make the files executable:
chmod +x talker.py listener.py - Open two terminals.
- In both terminals, source your ROS workspace:
source devel/setup.bash - In the first terminal, start
roscore:roscore - In the second terminal, run the listener:
./listener.py - In a third terminal, run the talker:
./talker.py
Common Tasks & Useful Tips
Logging (rospy.loginfo)
Use the rospy.log family of functions for printing output. They are timestamped and can be filtered by log level.
rospy.loginfo("Informative message") # Default level
rospy.logwarn("Warning message") # Yellow text
rospy.logerr("Error message") # Red text
rospy.logdebug("Debug message") # Only shown if debug logging is enabled
Rate Control (rospy.Rate)
Use rospy.Rate to create a loop that runs at a specific frequency. This is much more accurate than time.sleep().
rate = rospy.Rate(10) # 10 Hz
while not rospy.is_shutdown():
# Do work
rate.sleep() # Blocks until the next 10 Hz cycle
Arguments (rospy.myargv)
When you run a ROS node with arguments (e.g., rosrun my_pkg my_node.py _foo:=bar), rospy intercepts them. If you need to parse your own arguments, use rospy.myargv.
import sys
import rospy
# sys.argv will contain ROS arguments like '__name:=...'
# rospy.myargv strips them out, leaving only your script's arguments.
argv = rospy.myargv(sys.argv)
# Now you can parse argv with argparse, for example.
# parser = argparse.ArgumentParser()
# parser.add_argument('--my_arg', type=int, default=5)
# args = parser.parse_args(argv[1:])
Shutdown Handling
Use rospy.is_shutdown() in your loops to check if the user has requested the node to stop (e.g., by pressing Ctrl+C).
while not rospy.is_shutdown():
# Do work
if some_error_condition:
rospy.logerr("A critical error occurred.")
rospy.signal_shutdown("Critical error") # Gracefully shut down the node
break
Waiting for things
It's good practice to wait for a service or a message to become available before trying to use them.
# Wait for a service to be advertised
rospy.wait_for_service('add_two_ints')
# Wait for a message to be published on a topic
# (Useful for setup in tests)
msg = rospy.wait_for_message('chatter', String, timeout=5.0)
Troubleshooting & Best Practices
Common Errors
ROS path is not set: You forgot tosource devel/setup.bashin your terminal.[ERROR] [167...]: master does not exist:roscoreis not running. Start it in another terminal.[ERROR] [167...]: Failed to contact master:roscoreis running, but your terminal's ROS environment variables (ROS_MASTER_URI,ROS_IP) are not set correctly. Sourcing your workspace usually fixes this.[ERROR] [167...]: Topic /chatter does not exist: You have a subscriber trying to connect to a topic that no publisher is advertising yet. Make sure your publisher is running before your subscriber, or userospy.wait_for_message.[ERROR] [167...]: Failed to load nodelet [/my_node]: Often a Python import error. Check yourCMakeLists.txtandpackage.xmlfor correct dependencies and paths.
Best Practices
- Use
rospy.spin(): If your node is purely a subscriber or service provider and doesn't have its own main loop, callrospy.spin()at the end of your script. This blocks the main thread and keeps your node alive, allowing the ROS communication infrastructure to call your callbacks. - Name Your Topics and Nodes Uniquely: Use
anonymous=Truefor nodes to avoid name collisions. Use descriptive, unique names for topics. - Use
queue_size: Always specify aqueue_sizefor publishers to prevent your node from crashing if subscribers are slow. - Keep Callbacks Fast: The code inside your subscriber callback should be as fast as possible. Long-running computations in a callback will block the entire node and can cause other messages to be dropped. Offload heavy work to a separate thread or process.
- Use Launch Files: For complex systems, don't run nodes manually. Use
.launchfiles withroslaunchto start multiple nodes with the correct parameters and remappings.
