Skip to content

Commit

Permalink
Fix race condition in recv()'s usage of self.call.
Browse files Browse the repository at this point in the history
  • Loading branch information
theacodes committed Sep 11, 2018
1 parent 63dc7ab commit c6dc8fd
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 147 deletions.
38 changes: 3 additions & 35 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,21 +207,8 @@ def open(self):
else:
call.add_done_callback(self._on_call_done)

import time
import random
from unittest import mock
from google.cloud.pubsub_v1.proto import pubsub_pb2

mock_call = mock.MagicMock(wraps=call)

def mock_next():
#time.sleep(random.uniform(0.0, 1.0))
return pubsub_pb2.StreamingPullResponse()

mock_call.__next__.side_effect = mock_next

self._request_generator = request_generator
self.call = mock_call
self.call = call

def close(self):
"""Closes the stream."""
Expand Down Expand Up @@ -252,7 +239,6 @@ def send(self, request):
# to mean something semantically different.
if self.call.is_active():
self._request_queue.put(request)
pass
else:
# calling next should cause the call to raise.
next(self.call)
Expand Down Expand Up @@ -356,7 +342,7 @@ def _reopen(self):
# Another thread already managed to re-open this stream.
if self.call is not None and self.call.is_active():
_LOGGER.debug('Stream was already re-established.')
#return
return

self.call = None
# Request generator should exit cleanly since the RPC its bound to
Expand Down Expand Up @@ -454,8 +440,7 @@ def _recv(self):
return next(call)

def recv(self):
return self._recoverable(
super(ResumableBidiRpc, self).recv)
return self._recoverable(self._recv)

@property
def is_active(self):
Expand All @@ -471,16 +456,6 @@ def is_active(self):
return self.call is not None and not self._finalized


def meanie_thread_main(rpc):
import time
import random

while True:
time.sleep(random.uniform(0, 0.1))
print('Wahaha')
rpc._reopen()


class BackgroundConsumer(object):
"""A bi-directional stream consumer that runs in a separate thread.
Expand Down Expand Up @@ -536,13 +511,6 @@ def _thread_main(self):
self._bidi_rpc.add_done_callback(self._on_call_done)
self._bidi_rpc.open()

import functools
meanie_thread = threading.Thread(
name='meanie thread',
target=functools.partial(meanie_thread_main, self._bidi_rpc))
meanie_thread.deamon = True
meanie_thread.start()

while self._bidi_rpc.is_active:
# Do not allow the paused status to change at all during this
# section. There is a condition where we could be resumed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ def send(self, request):
"""Queue a request to be sent to the RPC."""
if self._UNARY_REQUESTS:
try:
pass
#self._send_unary_request(request)
self._send_unary_request(request)
except exceptions.GoogleAPICallError as exc:
_LOGGER.debug(
'Exception while sending unary RPC. This is typically '
Expand Down
110 changes: 0 additions & 110 deletions pubsub/test.py

This file was deleted.

0 comments on commit c6dc8fd

Please sign in to comment.