Skip to content

Message feed APIs

Michel Hidalgo edited this page Oct 28, 2024 · 1 revision

These APIs wrap message_filters to provide a simpler UX when dealing with streams of messages, including but not limited to ROS 2 topic subscriptions.

Message feeds are the stateful generalization of standard ROS 2 message filters. Any message filter can become a feed, allowing:

  • latest message retrieval
  • message history retrieval
  • message callback registration (and de-registration)
  • message stream iteration as they arrive, one-by-one or in batches
  • incoming message waits -- any message or those matching a predicate

Like message filters, most message feeds can be chained. This is true for all but those that externally source messages, ROS 2 topic subscriptions being the prime example of this. Other message feeds built into bdai_ros2_wrappers offer a vehicle for generic map-filter-reduce patterns, time synchronization across multiple message feeds, and synchronized tf lookups.

Important

While any message filter can become a feed, standard ROS 2 message filters are usually not thread-safe. See bdai_ros2_wrappers.filters for thread-safe (re)implementations.

Common use cases

Note

The following snippets make use of standard ROS 2 examples. You may use the publishers in the examples_rclpy_minimal_publisher package to test these.

Looping over topic messages

from contextlib import closing
from std_msgs.msg import String

from bdai_ros2_wrappers.subscription import Subscription
import bdai_ros2_wrappers.process as ros_process

@ros_process.main()
def main() -> None:
    topic_data = Subscription(String, "topic")
    with closing(topic_data.stream()) as stream:
        for message in stream:
            print(message.data)

if __name__ == "__main__":
    main()

Note that the topic message stream is managed and closed explicitly by the contextlib.closing context manager. This is important to stop message buffering as soon as it is no longer necessary. This can be deferred to the garbage collector but it may result in needless buffering for (much) longer.

Waiting for the next topic message

from std_msgs.msg import String

from bdai_ros2_wrappers.subscription import Subscription
from bdai_ros2_wrappers.futures import unwrap_future
import bdai_ros2_wrappers.process as ros_process

@ros_process.main()
def main() -> None:
    topic_data = Subscription(String, "topic")
    while main.context.ok():
        message = unwrap_future(topic_data.update, timeout_sec=5.0)
        print(message.data)

if __name__ == "__main__":
    main()

Note that the future update is unwrapped rather than waited on. If the future update does not become available in time, future unwrapping will raise.

Waiting for a specific topic message

from std_msgs.msg import Int32
from std_msgs.msg import String

from bdai_ros2_wrappers.subscription import Subscription
from bdai_ros2_wrappers.futures import unwrap_future
import bdai_ros2_wrappers.process as ros_process

def to_int32(message: String) -> Int32:
    return Int32(data=int(message.data.rpartition(" ")[-1]))

@ros_process.main()
def main() -> None:
    topic_data = Subscription(String, "topic")
    while main.context.ok():
        message = unwrap_future(topic_data.matching_update(
            lambda message: to_int32(message).data % 5 == 0
        ), timeout_sec=5.0)
        print(message.data)

if __name__ == "__main__":
    main()

Setting up a message callback

from std_msgs.msg import String

from bdai_ros2_wrappers.subscription import Subscription
import bdai_ros2_wrappers.process as ros_process

@ros_process.main()
def main() -> None:
    topic_data = Subscription(String, "topic")
    topic_data.recall(lambda message: print(message.data))
    main.wait_for_shutdown()

if __name__ == "__main__":
    main()

Synchronizing topic messages

from contextlib import closing
from std_msgs.msg import String

from bdai_ros2_wrappers.feeds import SynchronizedMessageFeed
from bdai_ros2_wrappers.futures import unwrap_future
from bdai_ros2_wrappers.subscription import Subscription
import bdai_ros2_wrappers.process as ros_process

@ros_process.main()
def main() -> None:
    topics_data = SynchronizedMessageFeed(
        Subscription(String, "topic0"),
        Subscription(String, "topic1"),
        allow_headerless=True
    )
    with closing(topics_data.stream()) as stream:
        for a, b in stream:
            print("a:", a.data, "matches b:", b.data)

if __name__ == "__main__":
    main()

Adapting topic messages

from contextlib import closing
from typing import Optional
from std_msgs.msg import String
from std_msgs.msg import Int32

from bdai_ros2_wrappers.feeds import AdaptedMessageFeed
from bdai_ros2_wrappers.subscription import Subscription
import bdai_ros2_wrappers.process as ros_process

def to_int32(message: String) -> Int32:
    return Int32(data=int(message.data.rpartition(" ")[-1]))

def keep_even(message: Int32) -> Optional[Int32]:
    return message if message.data % 2 == 0 else None

@ros_process.main()
def main() -> None:
    topic_data = AdaptedMessageFeed(
        Subscription(String, "topic"),
        lambda msg: keep_even(to_int32(msg))
    )
    with closing(topic_data.stream()) as stream:
        for message in stream:
            print(message.data)

if __name__ == "__main__":
    main()

Note that the adapter logic not only transforms the message type but also filters them. Returning None stops message propagation down the chain of message feeds.

Fetch the last 10 topic messages

from contextlib import closing

from std_msgs.msg import String

from bdai_ros2_wrappers.subscription import Subscription
from bdai_ros2_wrappers.time import Duration
import bdai_ros2_wrappers.process as ros_process

@ros_process.main()
def main() -> None:
    clock = main.node.get_clock()
    with closing(Subscription(String, "topic", history_length=10)) as topic_data:
        print("Waiting for 5 seconds...")
        clock.sleep_until(clock.now() + Duration(seconds=5.0))
    for message in topic_data.history:
        print(message.data)

if __name__ == "__main__":
    main()

Reference

Read up bdai_ros2_wrappers API documentation for further reference on all the features that message feed APIs offer.