Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement optional queueing for rospy publications (#169) #308

Merged
merged 1 commit into from
Nov 18, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions clients/rospy/src/rospy/impl/tcpros_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,78 @@ def topic_connection_handler(self, sock, client_addr, header):
topic.add_connection(transport)


class QueuedConnection(object):
"""
It wraps a Transport instance and behaves like one
but it queues the data written to it and relays them
asynchronously to the wrapped instance.
"""

def __init__(self, connection, queue_size):
"""
ctor.
@param connection: the wrapped transport instance
@type connection: Transport
@param queue_size: the maximum size of the queue, zero means infinite
@type queue_size: int
"""
super(QueuedConnection, self).__init__()
self._connection = connection
self._queue_size = queue_size

self._lock = threading.Lock()
self._cond_data_available = threading.Condition(self._lock)
self._cond_queue_swapped = threading.Condition(self._lock)
self._queue = []
self._waiting = False
self._error = None

self._thread = threading.Thread(target=self._run)
self._thread.start()

def __getattr__(self, name):
if name.startswith('__'):
raise AttributeError(name)
return getattr(self._connection, name)

def write_data(self, data):
with self._lock:
# if there was previously an error within the dispatch thread raise it
if self._error:
error = self._error
self._error = None
raise error
# pop oldest data if queue limit is reached
if self._queue_size > 0 and len(self._queue) == self._queue_size:
del self._queue[0]
self._queue.append(data)
self._cond_data_available.notify()
# ensure that thread has actually swapped the queues and is processig them
# if it was waiting for being notified
# to enforce behavior to be as close as possible to blocking
if self._waiting:
self._cond_queue_swapped.wait()
return True

def _run(self):
while not self._connection.done:
queue = []
with self._lock:
# wait for available data
while not self._queue and not self._connection.done:
self._waiting = True
self._cond_data_available.wait(1.0)
self._waiting = False
if self._queue:
self._cond_queue_swapped.notify()
# take all data from queue for processing outside of the lock
if self._queue:
queue = self._queue
self._queue = []
# relay all data
for data in queue:
try:
self._connection.write_data(data)
except Exception as e:
with self._cond:
self._error = e
13 changes: 12 additions & 1 deletion clients/rospy/src/rospy/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def isstring(s):

from rospy.impl.registration import get_topic_manager, set_topic_manager, Registration, get_registration_listeners
from rospy.impl.tcpros import get_tcpros_handler, DEFAULT_BUFF_SIZE
from rospy.impl.tcpros_pubsub import QueuedConnection

_logger = logging.getLogger('rospy.topics')

Expand Down Expand Up @@ -738,7 +739,7 @@ class Publisher(Topic):
Class for registering as a publisher of a ROS topic.
"""

def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None):
def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False, latch=False, headers=None, queue_size=None):
"""
Constructor
@param name: resource name of topic, e.g. 'laser'.
Expand Down Expand Up @@ -768,6 +769,8 @@ def __init__(self, name, data_class, subscriber_listener=None, tcp_nodelay=False
self.impl.enable_latch()
if headers:
self.impl.add_headers(headers)
if queue_size is not None:
self.impl.set_queue_size(queue_size)

def publish(self, *args, **kwds):
"""
Expand Down Expand Up @@ -827,6 +830,9 @@ def __init__(self, name, data_class):
self.is_latch = False
self.latch = None

# maximum queue size for publishing messages
self.queue_size = None

#STATS
self.message_data_sent = 0

Expand Down Expand Up @@ -858,6 +864,9 @@ def enable_latch(self):
"""
self.is_latch = True

def set_queue_size(self, queue_size):
self.queue_size = queue_size

def get_stats(self): # STATS
"""
Get the stats for this topic publisher
Expand Down Expand Up @@ -900,6 +909,8 @@ def add_connection(self, c):
@return: True if connection was added
@rtype: bool
"""
if self.queue_size is not None:
c = QueuedConnection(c, self.queue_size)
super(_PublisherImpl, self).add_connection(c)
def publish_single(data):
self.publish(data, connection_override=c)
Expand Down