From 69fb651de8f547c7d48d57ea9cc7c78a492d77d9 Mon Sep 17 00:00:00 2001 From: Dirk Thomas Date: Tue, 29 Oct 2013 16:14:59 -0700 Subject: [PATCH] implement optional queueing for rospy publications (#169) --- clients/rospy/src/rospy/impl/tcpros_pubsub.py | 75 +++++++++++++++++++ clients/rospy/src/rospy/topics.py | 13 +++- 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/clients/rospy/src/rospy/impl/tcpros_pubsub.py b/clients/rospy/src/rospy/impl/tcpros_pubsub.py index c996de1806..e786bb9cf5 100644 --- a/clients/rospy/src/rospy/impl/tcpros_pubsub.py +++ b/clients/rospy/src/rospy/impl/tcpros_pubsub.py @@ -355,3 +355,78 @@ def topic_connection_handler(self, sock, client_addr, header): topic.add_connection(transport) +class QueuedConnection(object): + """ + It wraps a Transport instance and behaves like one + but it queues the data written to it and relays them + asynchronously to the wrapped instance. + """ + + def __init__(self, connection, queue_size): + """ + ctor. + @param connection: the wrapped transport instance + @type connection: Transport + @param queue_size: the maximum size of the queue, zero means infinite + @type queue_size: int + """ + super(QueuedConnection, self).__init__() + self._connection = connection + self._queue_size = queue_size + + self._lock = threading.Lock() + self._cond_data_available = threading.Condition(self._lock) + self._cond_queue_swapped = threading.Condition(self._lock) + self._queue = [] + self._waiting = False + self._error = None + + self._thread = threading.Thread(target=self._run) + self._thread.start() + + def __getattr__(self, name): + if name.startswith('__'): + raise AttributeError(name) + return getattr(self._connection, name) + + def write_data(self, data): + with self._lock: + # if there was previously an error within the dispatch thread raise it + if self._error: + error = self._error + self._error = None + raise error + # pop oldest data if queue limit is reached + if self._queue_size > 0 and len(self._queue) == self._queue_size: + del self._queue[0] + self._queue.append(data) + self._cond_data_available.notify() + # ensure that thread has actually swapped the queues and is processig them + # if it was waiting for being notified + # to enforce behavior to be as close as possible to blocking + if self._waiting: + self._cond_queue_swapped.wait() + return True + + def _run(self): + while not self._connection.done: + queue = [] + with self._lock: + # wait for available data + while not self._queue and not self._connection.done: + self._waiting = True + self._cond_data_available.wait(1.0) + self._waiting = False + if self._queue: + self._cond_queue_swapped.notify() + # take all data from queue for processing outside of the lock + if self._queue: + queue = self._queue + self._queue = [] + # relay all data + for data in queue: + try: + self._connection.write_data(data) + except Exception as e: + with self._cond: + self._error = e diff --git a/clients/rospy/src/rospy/topics.py b/clients/rospy/src/rospy/topics.py index 5143bb6b42..60cae58141 100644 --- a/clients/rospy/src/rospy/topics.py +++ b/clients/rospy/src/rospy/topics.py @@ -92,6 +92,7 @@ def isstring(s): from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE +from rospy.impl.tcpros_pubsub import QueuedConnection _logger = logging.getLogger('rospy.topics') @@ -738,7 +739,7 @@ class Publisher(Topic): Class for registering as a publisher of a ROS topic. """ - def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None): + def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None): """ Constructor @param name: resource name of topic, e.g. 'laser'. @@ -768,6 +769,8 @@ def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False self.impl.enable_latch() if headers: self.impl.add_headers(headers) + if queue_size is not None: + self.impl.set_queue_size(queue_size) def publish(self, *args, **kwds): """ @@ -827,6 +830,9 @@ def __init__(self, name, data_class): self.is_latch = False self.latch = None + # maximum queue size for publishing messages + self.queue_size = None + #STATS self.message_data_sent = 0 @@ -858,6 +864,9 @@ def enable_latch(self): """ self.is_latch = True + def set_queue_size(self, queue_size): + self.queue_size = queue_size + def get_stats(self): # STATS """ Get the stats for this topic publisher @@ -900,6 +909,8 @@ def add_connection(self, c): @return: True if connection was added @rtype: bool """ + if self.queue_size is not None: + c = QueuedConnection(c, self.queue_size) super(_PublisherImpl, self).add_connection(c) def publish_single(data): self.publish(data, connection_override=c)