Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow saving timestamp-less messages to Cache (using rospy.Time.now() as their timestamp). #806

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions utilities/message_filters/src/message_filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@ class Cache(SimpleFilter):

Given a stream of messages, the most recent ``cache_size`` messages
are cached in a ring buffer, from which time intervals of the cache
can then be retrieved by the client.
can then be retrieved by the client. The ``allow_headerless``
option specifies whether to allow storing headerless messages with
current ROS time instead of timestamp. You should avoid this as
much as you can, since the delays are unpredictable.
"""

def __init__(self, f, cache_size = 1):
def __init__(self, f, cache_size = 1, allow_headerless = False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please remove the spaces around the = sign (following PEP 8) (also for the other keyword).

SimpleFilter.__init__(self)
self.connectInput(f)
self.cache_size = cache_size
Expand All @@ -99,18 +102,26 @@ def __init__(self, f, cache_size = 1):
# Array to store msgs times, auxiliary structure to facilitate
# sorted insertion
self.cache_times = []
# Whether to allow storing headerless messages with current ROS
# time instead of timestamp.
self._allow_headerless = allow_headerless
Copy link
Member

@dirk-thomas dirk-thomas May 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be ok to make the property "public" so that users can check the property later. So I would propose to remove the leading underscore.

Same for the other class.


def connectInput(self, f):
self.incoming_connection = f.registerCallback(self.add)

def add(self, msg):
# Cannot use message filters with non-stamped messages
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
rospy.logwarn("Cannot use message filters with non-stamped messages")
return
if self._allow_headerless:
stamp = rospy.Time.now()
else:
rospy.logwarn("Cannot use message filters with non-stamped messages. "
"Use the 'allow_headerless' constructor option to "
"auto-assign ROS time to headerless messages.")
return
Copy link
Member

@dirk-thomas dirk-thomas May 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to turn around the condition to error / return early:

if not self._allow_headerless:
    rospy.logwarn("Cannot use message filters with non-stamped messages. "
                  "Use the 'allow_headerless' constructor option to "
                  "auto-assign ROS time to headerless messages.")
    return
stamp = rospy.Time.now()

Same for the other class.

else:
stamp = msg.header.stamp

# Insert sorted
stamp = msg.header.stamp
self.cache_times.append(stamp)
self.cache_msgs.append(msg)

Expand All @@ -125,19 +136,22 @@ def add(self, msg):
def getInterval(self, from_stamp, to_stamp):
"""Query the current cache content between from_stamp to to_stamp."""
assert from_stamp <= to_stamp
return [m for m in self.cache_msgs
if m.header.stamp >= from_stamp and m.header.stamp <= to_stamp]

return [msg for (msg, time) in zip(self.cache_msgs, self.cache_times)
if from_stamp <= time <= to_stamp]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not valid logic. The first operator results in a boolean which is then used in the second comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand you objection correctly, I think it is incorrect :) Check http://stackoverflow.com/questions/26502775/pycharm-simplify-chained-comparison

Copy link
Member

@dirk-thomas dirk-thomas May 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omg scary syntax, I guess I just learned something new. Thanks for the reference. Please forget my comment.


def getElemAfterTime(self, stamp):
"""Return the oldest element after or equal the passed time stamp."""
newer = [m for m in self.cache_msgs if m.header.stamp >= stamp]
newer = [msg for (msg, time) in zip(self.cache_msgs, self.cache_times)
if time >= stamp]
if not newer:
return None
return newer[0]

def getElemBeforeTime(self, stamp):
"""Return the newest element before or equal the passed time stamp."""
older = [m for m in self.cache_msgs if m.header.stamp <= stamp]
older = [msg for (msg, time) in zip(self.cache_msgs, self.cache_times)
if time <= stamp]
if not older:
return None
return older[-1]
Expand All @@ -153,6 +167,11 @@ def getOldestTime(self):
if not self.cache_times:
return None
return self.cache_times[0]

def getLast(self):
if self.getLastestTime() is None:
return None
return self.getElemAfterTime(self.getLastestTime())


class TimeSynchronizer(SimpleFilter):
Expand Down Expand Up @@ -209,16 +228,30 @@ class ApproximateTimeSynchronizer(TimeSynchronizer):
:class:`ApproximateTimeSynchronizer` synchronizes incoming message filters by the
timestamps contained in their messages' headers. The API is the same as TimeSynchronizer
except for an extra `slop` parameter in the constructor that defines the delay (in seconds)
with which messages can be synchronized
with which messages can be synchronized. The ``allow_headerless`` option specifies whether
to allow storing headerless messages with current ROS time instead of timestamp. You should
avoid this as much as you can, since the delays are unpredictable.
"""

def __init__(self, fs, queue_size, slop):
def __init__(self, fs, queue_size, slop, allow_headerless=False):
TimeSynchronizer.__init__(self, fs, queue_size)
self.slop = rospy.Duration.from_sec(slop)
self._allow_headerless = allow_headerless

def add(self, msg, my_queue):
if not hasattr(msg, 'header') or not hasattr(msg.header, 'stamp'):
if self._allow_headerless:
stamp = rospy.Time.now()
else:
rospy.logwarn("Cannot use message filters with non-stamped messages. "
"Use the 'allow_headerless' constructor option to "
"auto-assign ROS time to headerless messages.")
return
else:
stamp = msg.header.stamp

self.lock.acquire()
my_queue[msg.header.stamp] = msg
my_queue[stamp] = msg
while len(my_queue) > self.queue_size:
del my_queue[min(my_queue)]
for vv in itertools.product(*[list(q.keys()) for q in self.queues]):
Expand Down
26 changes: 26 additions & 0 deletions utilities/message_filters/test/test_approxsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def __init__(self, stamp, data):
self.header.stamp = stamp
self.data = data

class MockHeaderlessMessage:
def __init__(self, data):
self.data = data

class MockFilter(message_filters.SimpleFilter):
pass

Expand Down Expand Up @@ -92,6 +96,28 @@ def test_approx(self):
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

# Scramble sequences of length N of headerless and header-having messages.
# Make sure that TimeSequencer recombines them.
rospy.rostime.set_rostime_initialized(True)
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(rospy.Time(t), random.random()) for t in range(N)]
seq1 = [MockHeaderlessMessage(random.random()) for t in range(N)]
# random.shuffle(seq0)
ts = ApproximateTimeSynchronizer([m0, m1], N, 0.1, allow_headerless=True)
ts.registerCallback(self.cb_collector_2msg)
self.collector = []
for msg in random.sample(seq0, N):
m0.signalMessage(msg)
self.assertEqual(self.collector, [])
for i in random.sample(range(N), N):
msg = seq1[i]
rospy.rostime._set_rostime(rospy.Time(i+0.05))
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))

if __name__ == '__main__':
if 1:
rostest.unitrun('camera_calibration', 'testapproxsync', TestApproxSync)
Expand Down
35 changes: 35 additions & 0 deletions utilities/message_filters/test/test_message_filters_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,41 @@ def test_all_funcs(self):
self.assertEqual(s, rospy.Time(1),
"wrong message discarded")

def test_headerless(self):
sub = Subscriber("/empty", String)
cache = Cache(sub, 5, allow_headerless=False)

msg = String()
cache.add(msg)

self.assertIsNone(cache.getElemAfterTime(rospy.Time(0)),
"Headerless message invalidly added.")

cache = Cache(sub, 5, allow_headerless=True)

rospy.rostime.set_rostime_initialized(True)

rospy.rostime._set_rostime(rospy.Time(0))
cache.add(msg)

s = cache.getElemAfterTime(rospy.Time(0))
self.assertEqual(s, msg,
"invalid msg returned in headerless scenario")

s = cache.getElemAfterTime(rospy.Time(1))
self.assertIsNone(s, "invalid msg returned in headerless scenario")

rospy.rostime._set_rostime(rospy.Time(2))
cache.add(msg)

s = cache.getInterval(rospy.Time(0), rospy.Time(1))
self.assertEqual(s, [msg],
"invalid msg returned in headerless scenario")

s = cache.getInterval(rospy.Time(0), rospy.Time(2))
self.assertEqual(s, [msg, msg],
"invalid msg returned in headerless scenario")


if __name__ == '__main__':
import rosunit
Expand Down