杰瑞科技汇

Python rospy怎么用?基础教程或常见问题解答

Of course! rospy is the official Python client library for ROS (Robot Operating System). It's the primary way you'll write ROS nodes in Python. This guide will cover the essentials, from basic concepts to common tasks.

Python rospy怎么用?基础教程或常见问题解答-图1
(图片来源网络,侵删)

Table of Contents

  1. Core Concepts

    • What is rospy?
    • The Node
    • The Publisher
    • The Subscriber
    • The Service & Service Client (RPC)
    • The Parameter Server
    • rospy Time and Duration
  2. Essential rospy Tutorials & Examples

    • "Hello World" Publisher
    • "Hello World" Subscriber
    • Complete Talker/Listener Example
    • Simple Service Example
  3. Common Tasks & Useful Tips

    • Logging (rospy.loginfo)
    • Rate Control (rospy.Rate)
    • Arguments (rospy.myargv)
    • Shutdown Handling
    • Waiting for things (rospy.wait_for_message, rospy.wait_for_service)
  4. Troubleshooting & Best Practices

    Python rospy怎么用?基础教程或常见问题解答-图2
    (图片来源网络,侵删)
    • Common Errors
    • Best Practices

Core Concepts

What is rospy?

rospy is a Python library that provides the tools needed to write ROS nodes. It acts as a client to the roscore (ROS Master), which manages the communication graph (nodes, topics, services, etc.). All rospy code must be run inside a source'd ROS workspace (source devel/setup.bash).

The Node

Every ROS program is a node. In rospy, you create a node with rospy.init_node().

import rospy
# Must be called before any other rospy functions are called.
# 'listener_node' is the name of our node.
# anonymous=True makes the name unique, which is good practice.
rospy.init_node('listener_node', anonymous=True)
  • Name: The name of your node (e.g., /listener_node).
  • anonymous=True: Appends a random number to the end of your node's name to prevent conflicts if you run multiple instances of the same script.

The Publisher

A Publisher sends messages over a Topic. Other nodes can subscribe to that topic to receive the messages.

from std_msgs.msg import String # Import the message type
pub = rospy.Publisher('chatter', String, queue_size=10)
  • 'chatter': The name of the topic to publish on.
  • String: The data type of the messages being published. You must import the message type from its corresponding package (e.g., std_msgs.msg).
  • queue_size=10: The maximum number of messages to buffer before dropping old ones. Important for handling publishers that are faster than subscribers.

The Subscriber

A Subscriber listens to a Topic and executes a callback function whenever a new message is received.

Python rospy怎么用?基础教程或常见问题解答-图3
(图片来源网络,侵删)
def chatter_callback(data):
    """This function is called every time a message is received."""
    rospy.loginfo("I heard: %s", data.data)
# 'chatter_callback' is the function to call when a message arrives.
# 'chatter' is the topic to listen on.
# 'String' is the expected message type.
rospy.Subscriber('chatter', String, chatter_callback)
  • chatter_callback: The function that will be executed when a message is received. It must take one argument, which is the message object.
  • data: The received message object. Its structure depends on the message type (e.g., data.data for a std_msgs/String).

The Service & Service Client (RPC)

Services are a request/repair communication model, like a function call. One node provides a Service, and another node calls it with a Request and gets a Response.

Service Server (Provider):

from std_srvs.srv import SetBool, SetBoolResponse
def handle_set_bool(req):
    """Callback function for the service."""
    rospy.loginfo("Request: %s", req.data)
    success = not req.data  # Example logic: invert the boolean
    message = "Success!" if success else "Failure!"
    return SetBoolResponse(success, message)
# 'set_bool_service' is the name of the service.
# SetBool is the service type (defined in .srv files).
s = rospy.Service('set_bool_service', SetBool, handle_set_bool)
rospy.loginfo("Ready to toggle boolean.")
rospy.spin() # Keep the node alive

Service Client (Caller):

from std_srvs.srv import SetBool
# Wait for the service to be available
rospy.wait_for_service('set_bool_service')
try:
    # Create a handle to the service
    set_bool = rospy.ServiceProxy('set_bool_service', SetBool)
    # Call the service
    response = set_bool(True) # Send a request with data=True
    rospy.loginfo("Response: %s, %s", response.success, response.message)
except rospy.ServiceException as e:
    rospy.logerr("Service call failed: %s", e)

The Parameter Server

The Parameter Server is a global dictionary for storing configuration data. Nodes can get, set, and monitor parameters.

# Set a parameter
rospy.set_param('/my_node/param_name', 'hello world')
# Get a parameter
param_value = rospy.get_param('/my_node/param_name')
rospy.loginfo("Param value is: %s", param_value)
# Get a parameter with a default value if it doesn't exist
param_value = rospy.get_param('/my_node/param_name', 'default_value')
# Monitor a parameter for changes
def param_callback(value, param_name):
    rospy.loginfo("Parameter %s changed to %s", param_name, value)
rospy.set_param('/my_node/param_name', 'initial_value')
rospy.set_param('/my_node/another_param', 123)
param_name = '/my_node/param_name'
rospy.set_param(param_name, 'initial_value')
rospy.loginfo("Monitoring parameter: %s", param_name)
rospy.get_param_server().set_param_updates(True, param_name, param_callback)

rospy Time and Duration

ROS has its own clock system. You should use rospy.Time and rospy.Duration for all time-related operations.

import time
# Get the current ROS time (can be wall time or simulated time)
now = rospy.Time.now()
rospy.loginfo("Current ROS time: %s", now)
# Create a specific time
t = rospy.Time(10, 500) # 10 seconds and 500 nanoseconds
# Create a duration
d = rospy.Duration(1, 0) # 1 second
# Sleep for a duration
rospy.sleep(rospy.Duration(0.5)) # Sleep for 0.5 seconds
# A common pattern for timing
start_time = rospy.Time.now()
# ... do some work ...
end_time = rospy.Time.now()
elapsed = end_time - start_time
rospy.loginfo("Operation took %s seconds", elapsed.to_sec())

Essential rospy Tutorials & Examples

"Hello World" Publisher (talker.py)

This node continuously publishes a "hello world" message.

#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def talker():
    # Create a Publisher object
    pub = rospy.Publisher('chatter', String, queue_size=10)
    # Initialize the node
    rospy.init_node('talker', anonymous=True)
    # Set the publishing rate to 10 Hz
    rate = rospy.Rate(10) 
    while not rospy.is_shutdown():
        # Create the message
        hello_str = "hello world %s" % rospy.get_time()
        # Log the message to the console
        rospy.loginfo(hello_str)
        # Publish the message
        pub.publish(hello_str)
        # Sleep to maintain the rate
        rate.sleep()
if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        pass

"Hello World" Subscriber (listener.py)

This node listens to the chatter topic and prints any message it receives.

#!/usr/bin/env python
import rospy
from std_msgs.msg import String
def chatter_callback(data):
    """Callback function executed when a message is received."""
    rospy.loginfo("I heard: [%s]", data.data)
def listener():
    # Initialize the node
    rospy.init_node('listener', anonymous=True)
    # Create a Subscriber object, which will call chatter_callback
    rospy.Subscriber('chatter', String, chatter_callback)
    # spin() simply keeps python from exiting until this node is stopped
    rospy.spin()
if __name__ == '__main__':
    listener()

To run these examples:

  1. Make the files executable: chmod +x talker.py listener.py
  2. Open two terminals.
  3. In both terminals, source your ROS workspace: source devel/setup.bash
  4. In the first terminal, start roscore: roscore
  5. In the second terminal, run the listener: ./listener.py
  6. In a third terminal, run the talker: ./talker.py

Common Tasks & Useful Tips

Logging (rospy.loginfo)

Use the rospy.log family of functions for printing output. They are timestamped and can be filtered by log level.

rospy.loginfo("Informative message")   # Default level
rospy.logwarn("Warning message")     # Yellow text
rospy.logerr("Error message")        # Red text
rospy.logdebug("Debug message")      # Only shown if debug logging is enabled

Rate Control (rospy.Rate)

Use rospy.Rate to create a loop that runs at a specific frequency. This is much more accurate than time.sleep().

rate = rospy.Rate(10) # 10 Hz
while not rospy.is_shutdown():
    # Do work
    rate.sleep() # Blocks until the next 10 Hz cycle

Arguments (rospy.myargv)

When you run a ROS node with arguments (e.g., rosrun my_pkg my_node.py _foo:=bar), rospy intercepts them. If you need to parse your own arguments, use rospy.myargv.

import sys
import rospy
# sys.argv will contain ROS arguments like '__name:=...'
# rospy.myargv strips them out, leaving only your script's arguments.
argv = rospy.myargv(sys.argv)
# Now you can parse argv with argparse, for example.
# parser = argparse.ArgumentParser()
# parser.add_argument('--my_arg', type=int, default=5)
# args = parser.parse_args(argv[1:])

Shutdown Handling

Use rospy.is_shutdown() in your loops to check if the user has requested the node to stop (e.g., by pressing Ctrl+C).

while not rospy.is_shutdown():
    # Do work
    if some_error_condition:
        rospy.logerr("A critical error occurred.")
        rospy.signal_shutdown("Critical error") # Gracefully shut down the node
        break

Waiting for things

It's good practice to wait for a service or a message to become available before trying to use them.

# Wait for a service to be advertised
rospy.wait_for_service('add_two_ints')
# Wait for a message to be published on a topic
# (Useful for setup in tests)
msg = rospy.wait_for_message('chatter', String, timeout=5.0)

Troubleshooting & Best Practices

Common Errors

  • ROS path is not set: You forgot to source devel/setup.bash in your terminal.
  • [ERROR] [167...]: master does not exist: roscore is not running. Start it in another terminal.
  • [ERROR] [167...]: Failed to contact master: roscore is running, but your terminal's ROS environment variables (ROS_MASTER_URI, ROS_IP) are not set correctly. Sourcing your workspace usually fixes this.
  • [ERROR] [167...]: Topic /chatter does not exist: You have a subscriber trying to connect to a topic that no publisher is advertising yet. Make sure your publisher is running before your subscriber, or use rospy.wait_for_message.
  • [ERROR] [167...]: Failed to load nodelet [/my_node]: Often a Python import error. Check your CMakeLists.txt and package.xml for correct dependencies and paths.

Best Practices

  1. Use rospy.spin(): If your node is purely a subscriber or service provider and doesn't have its own main loop, call rospy.spin() at the end of your script. This blocks the main thread and keeps your node alive, allowing the ROS communication infrastructure to call your callbacks.
  2. Name Your Topics and Nodes Uniquely: Use anonymous=True for nodes to avoid name collisions. Use descriptive, unique names for topics.
  3. Use queue_size: Always specify a queue_size for publishers to prevent your node from crashing if subscribers are slow.
  4. Keep Callbacks Fast: The code inside your subscriber callback should be as fast as possible. Long-running computations in a callback will block the entire node and can cause other messages to be dropped. Offload heavy work to a separate thread or process.
  5. Use Launch Files: For complex systems, don't run nodes manually. Use .launch files with roslaunch to start multiple nodes with the correct parameters and remappings.
分享:
扫描分享到社交APP
上一篇
下一篇