python rosbag2
时间: 2025-03-05 22:38:43 浏览: 96
### 使用Python在ROS 2中操作rosbag2
#### 安装必要的库
为了能够使用`rosbag2`功能,在Python环境中需要确保已经安装了`rosbag2_py`库。如果遇到错误提示"No module named 'rosbag2_py'",则表明该模块尚未安装[^2]。
#### 数据记录
对于数据的记录工作,可以通过调用特定API来启动录制过程。下面是一个简单的例子展示怎样创建一个新的Bag文件并指定要记录的话题:
```python
import rclpy
from rclpy.node import Node
from rosbag2_py import Writer, StorageOptions, ConverterOptions, TopicMetadata
from std_msgs.msg import String
class RecordNode(Node):
def __init__(self):
super().__init__('record_node')
storage_options = StorageOptions(uri='my_bag', storage_id='sqlite3')
converter_options = ConverterOptions(input_serialization_format='cdr', output_serialization_format='cdr')
writer = Writer()
writer.open(storage_options, converter_options)
topic_metadata = TopicMetadata(name='/chatter', type='std_msgs/msg/String', serialization_format='cdr')
writer.create_topic(topic_metadata)
msg = String(data="Hello World")
self.publisher_ = self.create_publisher(String, '/chatter', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, lambda: writer.write('/chatter', msg))
def main(args=None):
rclpy.init(args=args)
node = RecordNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
这段代码展示了如何设置一个名为`/chatter`的主题,并向其中写入字符串消息。这里选择了SQLite作为存储后端,这通常是默认选项之一[^4]。
#### 数据回放
当涉及到从已有的Bag文件中读取消息时,则可以利用相应的Reader API完成此任务。以下是关于如何打开现有Bag文件以及遍历其内部包含的消息的一个实例:
```python
import rclpy
from rclpy.node import Node
from rosbag2_py import SequentialReader, StorageOptions, ConverterOptions, TopicMetadata
from std_msgs.msg import String
class PlaybackNode(Node):
def __init__(self):
super().__init__('playback_node')
reader = SequentialReader()
storage_options = StorageOptions(uri='my_bag', storage_id='sqlite3')
converter_options = ConverterOptions(input_serialization_format='cdr', output_serialization_format='cdr')
reader.open(storage_options, converter_options)
topics_info = []
while True:
message_tuple = reader.read_next() # Returns tuple of (topic_name, serialized_message_bytes, timestamp_ns)
if not message_tuple:
break
topic_name, data, t = message_tuple
deserialized_msg = deserialize_cdr(data, str(TopicMetadata(type='std_msgs/msg/String')))
self.get_logger().info(f'Received on {topic_name}: "{deserialized_msg.data}" at time {t}')
def main(args=None):
rclpy.init(args=args)
node = PlaybackNode()
try:
rclpy.spin_once(node) # Only need to spin once as we're just reading the bag file.
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
上述脚本会逐条读取之前保存到Bag文件里的每一条消息,并打印出来。注意这里的反序列化部分依赖于具体的消息类型;在这个案例里假设所有的主题都携带的是标准字符串消息(`String`)类型的负载。
阅读全文
相关推荐
















