Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queue blocking #464

Merged
merged 6 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from collections import deque
from threading import Thread, Condition
from time import time
import traceback
Expand Down Expand Up @@ -107,7 +108,7 @@ def __init__(self, previous_handler):
Thread.__init__(self)
MessageHandler.__init__(self, previous_handler)
self.daemon = True
self.queue = []
self.queue = deque(maxlen=self.queue_length)
self.c = Condition()
self.alive = True
self.start()
Expand All @@ -116,8 +117,6 @@ def handle_message(self, msg):
with self.c:
should_notify = len(self.queue) == 0
self.queue.append(msg)
if len(self.queue) > self.queue_length:
del self.queue[0:len(self.queue) - self.queue_length]
if should_notify:
self.c.notify()

Expand All @@ -130,8 +129,10 @@ def transition(self):
return ThrottleMessageHandler(self)
else:
with self.c:
if len(self.queue) > self.queue_length:
del self.queue[0:len(self.queue) - self.queue_length]
old_queue = self.queue
self.queue = deque(maxlen=self.queue_length)
while len(old_queue) > 0:
self.queue.append(old_queue.popleft())
self.c.notify()
return self

Expand All @@ -146,21 +147,21 @@ def finish(self):

def run(self):
while self.alive:
msg = None
with self.c:
while self.alive and (self.time_remaining() > 0 or len(self.queue) == 0):
if len(self.queue) == 0:
self.c.wait()
else:
self.c.wait(self.time_remaining())
if len(self.queue) == 0:
self.c.wait()
else:
self.c.wait(self.time_remaining())
if self.alive and self.time_remaining() == 0 and len(self.queue) > 0:
try:
MessageHandler.handle_message(self, self.queue[0])
except:
traceback.print_exc(file=sys.stderr)
del self.queue[0]
msg = self.queue.popleft()
if msg is not None:
try:
MessageHandler.handle_message(self, msg)
except:
traceback.print_exc(file=sys.stderr)
while self.time_remaining() == 0 and len(self.queue) > 0:
try:
MessageHandler.handle_message(self, self.queue[0])
MessageHandler.handle_message(self, self.queue.popleft())
except:
traceback.print_exc(file=sys.stderr)
del self.queue[0]
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ def cb(msg):

handler.finish()

def test_queue_message_handler_dropping(self):
received = {"msgs": []}

def cb(msg):
received["msgs"].append(msg)
time.sleep(1)

queue_length = 5
msgs = range(queue_length * 5)

handler = subscribe.MessageHandler(None, cb)

handler = handler.set_queue_length(queue_length)
self.assertIsInstance(handler, subscribe.QueueMessageHandler)

# send all messages at once.
# only the first and the last queue_length should get through,
# because the callbacks are blocked.
for x in msgs:
handler.handle_message(x)
# yield the thread so the first callback can append,
# otherwise the first handled value is non-deterministic.
time.sleep(0)

# wait long enough for all the callbacks, and then some.
time.sleep(queue_length + 3)

try:
self.assertEqual([msgs[0]] + msgs[-queue_length:], received["msgs"])
except:
handler.finish()
raise

handler.finish()

def test_queue_message_handler_rate(self):
handler = subscribe.MessageHandler(None, self.dummy_cb)
self.help_test_queue_rate(handler, 50, 10)
Expand Down