Skip to content

Commit

Permalink
fix: update resume token for restarting BiDi streams (#10282)
Browse files Browse the repository at this point in the history
  • Loading branch information
crwilcox authored Jan 31, 2020
1 parent f0bcbd6 commit 970ed4c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
16 changes: 11 additions & 5 deletions firestore/google/cloud/firestore_v1/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ def __init__(
self._closing = threading.Lock()
self._closed = False

initial_request = firestore_pb2.ListenRequest(
database=self._firestore._database_string, add_target=self._targets
)
self.resume_token = None

rpc_request = self._get_rpc_request

if ResumableBidiRpc is None:
ResumableBidiRpc = self.ResumableBidiRpc # FBO unit tests
Expand All @@ -224,7 +224,7 @@ def __init__(
self._api.transport.listen,
should_recover=_should_recover,
should_terminate=_should_terminate,
initial_request=initial_request,
initial_request=rpc_request,
metadata=self._firestore._rpc_metadata,
)

Expand Down Expand Up @@ -252,13 +252,19 @@ def __init__(
self.has_pushed = False

# The server assigns and updates the resume token.
self.resume_token = None
if BackgroundConsumer is None: # FBO unit tests
BackgroundConsumer = self.BackgroundConsumer

self._consumer = BackgroundConsumer(self._rpc, self.on_snapshot)
self._consumer.start()

def _get_rpc_request(self):
if self.resume_token is not None:
self._targets["resume_token"] = self.resume_token
return firestore_pb2.ListenRequest(
database=self._firestore._database_string, add_target=self._targets
)

@property
def is_active(self):
"""bool: True if this manager is actively streaming.
Expand Down
8 changes: 7 additions & 1 deletion firestore/tests/unit/v1/test_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,12 @@ def test__reset_docs(self):
self.assertEqual(inst.resume_token, None)
self.assertFalse(inst.current)

def test_resume_token_sent_on_recovery(self):
inst = self._makeOne()
inst.resume_token = b"ABCD0123"
request = inst._get_rpc_request()
self.assertEqual(request.add_target.resume_token, b"ABCD0123")


class DummyFirestoreStub(object):
def Listen(self): # pragma: NO COVER
Expand Down Expand Up @@ -922,7 +928,7 @@ def __init__(
self.start_rpc = start_rpc
self.should_recover = should_recover
self.should_terminate = should_terminate
self.initial_request = initial_request
self.initial_request = initial_request()
self.metadata = metadata
self.closed = False
self.callbacks = []
Expand Down

0 comments on commit 970ed4c

Please sign in to comment.