API review
Proposer: Jeremy Leibs and Tim Field
Present at review:
- Patrick
- Josh
- Bhaskara
- Caroline
- Lorenz
- Tully
New API
C++
The new C++ API works on the premise of creating "views" of a bag using "queries". A Query is an abstract class which defines a function that filters whether or not the messages from a connection are to be included. This function has access to topic_name, datatype, md5sum, message definition as well as the connection header. Additionally, each Query can specify a start and end time for the range of times it includes.
Multiple queries can be added to a View, including queries from different bags. The View then provides an iterator interface across the bags, sorted based on time.
Python
The Python API is similar, except the the "query" is specified as optional arguments to the readMessages function, which returns the "view" as a generator.
C++
1 namespace rosbag {
2
3 namespace bagmode
4 {
5 //! The possible modes to open a bag in
6 enum BagMode
7 {
8 Read = 1 << 0,
9 Write = 1 << 1,
10 Append = 1 << 2
11 };
12 }
13
14 namespace compression
15 {
16 enum CompressionType
17 {
18 None = 0,
19 BZ2 = 1
20 };
21 }
22
23 class Bag
24 {
25 public:
26 Bag();
27
28 //! Open a bag file
29 Bag(std::string const& filename, uint32_t mode = bagmode::Read);
30
31 ~Bag();
32
33 //! Open a bag file
34 bool open(std::string const& filename, uint32_t mode = bagmode::Read);
35
36 //! Close the bag file (write to disk, append index, etc.)
37 void close();
38
39 //! Get the filename of the bag
40 std::string getFileName() const;
41
42 //! Get the mode the bag is in
43 uint32_t getMode() const;
44
45 //! Get the major-version of the open bagfile
46 uint32_t getMajorVersion() const;
47
48 //! Get the minor-version of the open bagfile
49 uint32_t getMinorVersion() const;
50
51 //! Get the current size of the bagfile (a lower bound on bag-file size)
52 uint64_t getSize() const;
53
54 //! Set the compression method to use for writing chunks
55 void setCompression(CompressionType compression);
56
57 //! Get the compression method to use for writing chunks
58 CompressionType getCompression() const;
59
60 //! Set the threshold for creating new chunks
61 void setChunkThreshold(uint32_t chunk_threshold);
62
63 //! Get the threshold for creating new chunks
64 uint32_t getChunkThreshold() const;
65
66 //! Write a message into the bag file
67 /*!
68 * \param topic The topic name
69 * \param event The message event to be added
70 *
71 * Can throw BagNotOpenException or BagIOException
72 */
73 template<class T>
74 void write(std::string const& topic, ros::MessageEvent<T> const& event);
75
76 //! Write a message into the bag file
77 /*!
78 * \param topic The topic name
79 * \param time Timestamp of the message
80 * \param msg The message to be added
81 *
82 * Can throw BagNotOpenException or BagIOException
83 */
84 template<class T>
85 void write(std::string const& topic, ros::Time const& time, T const& msg);
86
87 //! Write a message into the bag file
88 /*!
89 * \param topic The topic name
90 * \param time Timestamp of the message
91 * \param msg The message to be added
92 *
93 * Can throw BagNotOpenException or BagIOException
94 */
95 template<class T>
96 void write(std::string const& topic, ros::Time const& time, T& msg);
97
98 //! Write a message into the bag file
99 /*!
100 * \param topic The topic name
101 * \param time Timestamp of the message
102 * \param msg A MessageInstance
103 * \param connection_header A connection header.
104 *
105 * Can throw BagNotOpenException or BagIOException
106 */
107 void write(std::string const& topic, ros::Time const& time, MessageInstance const& msg,
108 boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
109 }
110
111 //! A struct grouping together the information about a connection, used by Query
112 struct ConnectionInfo
113 {
114 uint32_t id;
115 std::string topic;
116 std::string datatype;
117 std::string md5sum;
118 std::string msg_def;
119
120 boost::shared_ptr<ros::M_string> header;
121 };
122
123 class TopicQuery
124 {
125 public:
126 TopicQuery(std::string const& topic);
127 TopicQuery(std::vector<std::string> const& topics);
128
129 bool operator()(ConnectionInfo const*) const;
130
131 // ...
132 };
133
134 class TypeQuery
135 {
136 public:
137 TypeQuery(std::string const& type);
138 TypeQuery(std::vector<std::string> const& types);
139
140 bool operator()(ConnectionInfo const*) const;
141
142 // ...
143 };
144
145 class View
146 {
147 public:
148 //! An iterator that points to a MessageInstance from a bag
149 /*!
150 * This iterator dereferences to a MessageInstance by VALUE, since
151 * there does not actually exist a structure of MessageInstance
152 * from which to return a reference.
153 */
154 class iterator
155 : public boost::iterator_facade<iterator,
156 MessageInstance,
157 boost::forward_traversal_tag,
158 MessageInstance>
159 {
160 // ...
161 };
162
163 //! Typedef to const_iterator
164 typedef iterator const_iterator;
165
166 //! Create a view and add a query
167 /*!
168 * param bag The bag file on which to run this query
169 * param start_time The beginning of the time_range for the query
170 * param end_time The end of the time_range for the query
171 */
172 View(Bag const& bag, ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
173
174 //! Create a view and add a query
175 /*!
176 * param bag The bag file on which to run this query
177 * param query The actual query to evaluate which connections to include
178 * param start_time The beginning of the time_range for the query
179 * param end_time The end of the time_range for the query
180 */
181 View(Bag const& bag, boost::function<bool(ConnectionInfo const*)> query,
182 ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
183
184 ~View();
185
186 iterator begin();
187 iterator end();
188 uint32_t size() const;
189
190 //! Add a query to a view
191 /*!
192 * param bag The bag file on which to run this query
193 * param start_time The beginning of the time_range for the query
194 * param end_time The end of the time_range for the query
195 */
196 void addQuery(Bag const& bag, ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
197
198 //! Add a query to a view
199 /*!
200 * param bag The bag file on which to run this query
201 * param query The actual query to evaluate which topics to include
202 * param start_time The beginning of the time_range for the query
203 * param end_time The end of the time_range for the query
204 */
205 void addQuery(Bag const& bag, boost::function<bool(ConnectionInfo const*)> query,
206 ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX);
207 }
208
209 //! A class pointing into a bag file
210 /*!
211 * The MessageInstance class itself is fairly light weight. It
212 * simply contains a pointer to a bag-file and the index_entry
213 * necessary to get access to the corresponding data.
214 *
215 * It adheres to the necessary ros::message_traits to be directly
216 * serializable.
217 *
218 * Note, it has a private constructor and can only be returned via an
219 * iterator from a rosbag::View.
220 */
221 class MessageInstance
222 {
223 public:
224 ros::Time const& getTime() const;
225 std::string const& getTopic() const;
226 std::string const& getDataType() const;
227 std::string const& getMD5Sum() const;
228 std::string const& getMessageDefinition() const;
229
230 boost::shared_ptr<ros::M_string> getConnectionHeader() const;
231
232 std::string getCallerId() const;
233 bool isLatching() const;
234
235 //! Templated call to instantiate a message
236 /*!
237 * returns NULL pointer if incompatible
238 */
239 template<class T>
240 boost::shared_ptr<T const> instantiate() const;
241
242 //! Write serialized message contents out to a stream
243 template<typename Stream>
244 void write(Stream& stream) const;
245
246 //! Size of serialized message
247 uint32_t size() const;
248 };
249
250 //! Helper function to create AdvertiseOptions from a MessageInstance
251 /*!
252 * param msg The Message instance for which to generate adveritse options
253 * param queue_size The size of the outgoing queue
254 */
255 ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& msg, uint32_t queue_size);
256
257 } // namespace rosbag
258
Example usage for write:
Example usage for read:
1 rosbag::Bag bag;
2 bag.open("test.bag", rosbag::bagmode::Read);
3
4 std::vector<std::string> topics;
5 topics.push_back(std::string("chatter"));
6 topics.push_back(std::string("numbers"));
7
8 rosbag::View view(bag, rosbag::TopicQuery(topics));
9
10 foreach(rosbag::MessageInstance const m, view)
11 {
12 std_msgs::String::ConstPtr s = m.instantiate<std_msgs::String>();
13 if (s != NULL)
14 ASSERT_EQ(s->data, std::string("foo"));
15
16 std_msgs::Int32::ConstPtr i = m.instantiate<std_msgs::Int32>();
17 if (i != NULL)
18 ASSERT_EQ(i->data, 42);
19 }
20
21 bag.close();
Python
1 class ROSBagException(Exception):
2 """
3 Base class for exceptions in rosbag.
4 """
5 def __init__(self, value):
6 self.value = value
7
8 class ROSBagFormatException(ROSBagException):
9 """
10 Exceptions for errors relating to the bag file format.
11 """
12 def __init__(self, value):
13 ROSBagException.__init__(self, value)
14
15 class Compression:
16 """
17 Allowable compression types
18 """
19 NONE = 'none'
20 BZ2 = 'bz2'
21
22 class Bag(object):
23 """
24 Bag objects serialize messages to and from disk using the bag format.
25 """
26 def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=768 * 1024, options=None):
27 """
28 Open a bag file. The mode can be 'r', 'w', or 'a' for reading (default),
29 writing or appending. The file will be created if it doesn't exist
30 when opened for writing or appending; it will be truncated when opened
31 for writing. Simultaneous reading and writing is allowed when in writing
32 or appending mode.
33 @param f: filename of bag to open or a stream to read from
34 @type f: str or file
35 @param mode: mode, either 'r', 'w', or 'a'
36 @type mode: str
37 @param compression: compression mode, see Compression
38 @type compression: str
39 @param chunk_threshold: minimum number of uncompressed bytes per chunk
40 @type chunk_threshold: int
41 @param options: the bag options specified via a dictionary (currently, compression and chunk_threshold)
42 @type options: dict
43 @raise ValueError: if any argument is invalid
44 @raise ROSBagException: if an error occurs opening file
45 @raise ROSBagFormatException: if bag format is corrupted
46 """
47
48 def read_messages(self, topics=None, start_time=None, end_time=None, topic_filter=None, raw=False):
49 """
50 Read the messages from the bag file.
51 @param topic: list of topics or a single topic [optional]
52 @type topic: list(str) or str
53 @param start_time: earliest timestamp of message to return [optional]
54 @type start_time: U{roslib.rostime.Time}
55 @param end_time: latest timestamp of message to return [optional]
56 @type end_time: U{roslib.rostime.Time}
57 @param topic_filter: function to filter topics to include [optional]
58 @type topic_filter: function taking (topic, datatype, md5sum, msg_def) and returning bool
59 @param raw: if True, then generate tuples of (datatype, data, md5sum, position, pytype)
60 @type raw: bool
61 @return: generator of (topic, message, timestamp) tuples for each message in the bag file
62 @rtype: generator of tuples of (topic, message, timestamp)
63 """
64
65 def write(self, topic, msg, t=None, raw=False):
66 """
67 Write a message to the bag. Messages must be written in chronological order for a given topic.
68 @param topic: name of topic
69 @type topic: str
70 @param msg: message to add to bag, or tuple (if raw)
71 @type msg: Message or tuple of raw message data
72 @param t: ROS time of message publication
73 @type t: U{roslib.rostime.Time}
74 @param raw: if True, msg is in raw format, i.e. (msg_type, serialized_bytes, md5sum, pytype)
75 @type raw: bool
76 @raise ValueError: if arguments are invalid or bag is closed
77 @raise ROSBagException: if message isn't in chronological order
78 """
79
80 def get_index(self):
81 """
82 Get the index.
83 @return: the index
84 @rtype: dict of topic -> (U{roslib.rostime.Time}, position), where position depends on the bag format.
85 """
86
87 def flush(self):
88 """
89 Write the open chunk to disk so subsequent reads will read all messages.
90 @raise ValueError: if bag is closed
91 """
92
93 def close(self):
94 """
95 Close the bag file. Closing an already closed bag does nothing.
96 """
97
98 @property
99 def options(self):
100 """Get the options."""
101
102 @property
103 def filename(self):
104 """Get the filename."""
105
106 @property
107 def version(self):
108 """Get the version."""
109
110 @property
111 def mode(self):
112 """Get the mode."""
113
114 @property
115 def size(self):
116 """Get the size in bytes."""
117
118 def _get_compression(self):
119 """Get the compression."""
120
121 def _set_compression(self, compression):
122 """Set the compression."""
123
124 compression = property(_get_compression, _set_compression)
125
126 def _get_chunk_threshold(self):
127 """Get the chunk threshold."""
128
129 def _set_chunk_threshold(self, chunk_threshold):
130 """Set the chunk threshold."""
131
132 chunk_threshold = property(_get_chunk_threshold, _set_chunk_threshold)
Example usage for write:
Example usage for read:
Question / concerns / comments
Enter your thoughts on the API and any questions / concerns you have here. Please sign your name. Anything you want to address in the API review should be marked down here before the start of the meeting.
Tully
- is multiple compression types supported within one bag? What happens if I write some messages with one compression type and then call setCompression with another? What does getCompression return in that case?
Yes, compression types can vary by chunk (see Bags/Format/2.0). The set/getCompression methods refer to the compression method that will be used to write the next message to the bag.
If setCompression is called with a different compression type while a chunk is open, the chunk is closed.
getCompression returns the current compression method being used to write.
- does addQuery do an AND operator, OR operator or append to a list?
- addQuery essentially does an OR operator. The view provides access to the (sorted) aggregate of all messages returned from each bag with each query.
- What are the rules about modifying a view? What happens to existing iterators when a view is modified?
- Since Views and Bags are only additive (there is no removeQuery), iterators stay valid. Iteration semantics are the same as an STL Set. Your iterator always points to the same element, however, the value of *(iterator + 1) may change either through adding new Queries to the View or new elements to the bag.
Patrick
The Bag::write() methods should be compatible with the roscpp 1.1 serialization API, where it's valid to have messages that don't inherit from ros::Message. They should be templated on the message type, see ros::Publisher::publish().
Of course - this was an oversight. Our implementation of write() directly calls a function templated on message type, so this is an easy fix.
I'd leave out bagmode::Default and default mode to bagmode::Read, it's clearer to me.
- Sounds good. Updated.
What does getVersion() return as compared to getMajorVersion() and getMinorVersion()?
- e.g. a bag with version 1.2 would have version=102, majorVersion=1, minorVersion=2. Here are the implementations:
As you can see, getVersion() is redundant. I'll remove it.
- Btw, I suppose minor version is there to signify forward compatibility of bag file formats, but that convention doesn't seem to have been followed (1.0 to 1.1 to 1.2 were all forwards incompatible). I bumped the new format version to 2.0 to indicate forwards incompatibility.
What is CompressionType? An enum? What are the possible values?
- An enum. None and BZ2. (see above)
I'd like a Bag constructor that takes the filename and (optional) mode.
Sure. RAII FTW. I added an optional compression argument as well.
More:
I'm not sure the Query class really needs to exist. You could change View::addQuery() to
1 void addQuery(Bag& bag, const boost::function<bool(ConnectionInfo const*)>& query,
2 ros::Time const& start_time = ros::TIME_MIN,
3 ros::Time const& end_time = ros::TIME_MAX);
4
5 // And TopicQuery becomes
6 class TopicQuery
7 {
8 TopicQuery(std::vector<std::string> const& topics);
9
10 bool operator() (ConnectionInfo const*) const;
which is more flexible in the (unlikely?) case of implementing your own query filters.
In addQuery(), can bag be const? Does the view actually modify the bag?
TopicQuery and TypeQuery should have constructors taking just a single topic/type for convenience.
Josh
It seems a bit odd to have to attempt to instantiate an object to check its type... what about a MsgInstance::isType<>() call?
Python: why are _get_compression/_set_compression/_get_chunk.../_set_chunk... prefixed by an underscore?
Meeting agenda
To be filled out by proposer based on comments gathered during API review period
Conclusion
Package status change mark change manifest)
Add API for write using MessageEvent directly (DONE)
All ints need to become sized (DONE)
No compression in Constructor (DONE)
Remove Query class and replace with functor (DONE)
View::addQuery should take a const Bag (DONE)
Add isType to MessageInstance (DONE)
Go through Python coding standards (DONE)
In python readMessage make topics take a string vs. list (DONE)
Look at adding python and C++ calls to repack (we decided against; simple to implement in user code)