Skip to content

Commit

Permalink
Prevent unhandled background error on SPM shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 23, 2019
1 parent d9127d7 commit db14d1d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
22 changes: 18 additions & 4 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,31 @@ def drop(self, items):
Args:
items(Sequence[DropRequest]): The items to drop.
"""
self._manager.leaser.remove(items)
self._manager.maybe_resume_consumer()
# If the manager is in the process of being shut down, the leaser might
# not exist on it anymore, thus we need to obtain our own reference to
# it and check it for None to avoid errors.
# Nevertheless, calling maybe_resume_consumer() in the manager shutdown
# state is still fine, as it is effectively a no-op at that point.
leaser = getattr(self._manager, "leaser", None)
if leaser is not None:
leaser.remove(items)
self._manager.maybe_resume_consumer()

def lease(self, items):
"""Add the given messages to lease management.
Args:
items(Sequence[LeaseRequest]): The items to lease.
"""
self._manager.leaser.add(items)
self._manager.maybe_pause_consumer()
# If the manager is in the process of being shut down, the leaser might
# not exist on it anymore, thus we need to obtain our own reference to
# it and check it for None to avoid errors.
# Nevertheless, calling maybe_pause_consumer() in the manager shutdown
# state is still fine, as it is effectively a no-op at that point.
leaser = getattr(self._manager, "leaser", None)
if leaser is not None:
leaser.add(items)
self._manager.maybe_pause_consumer()

def modify_ack_deadline(self, items):
"""Modify the ack deadline for the given messages.
Expand Down
26 changes: 26 additions & 0 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ def test_lease():
manager.maybe_pause_consumer.assert_called_once()


def test_lease_no_manager_leaser():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
manager.leaser = None # simulate manager being shut down
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

items = [requests.LeaseRequest(ack_id="ack_id_string", byte_size=10)]
dispatcher_.lease(items) # no error

manager.maybe_pause_consumer.assert_not_called()


def test_drop():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
Expand All @@ -121,6 +134,19 @@ def test_drop():
manager.maybe_resume_consumer.assert_called_once()


def test_drop_no_manager_leaser():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
manager.leaser = None # simulate manager being shut down
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

items = [requests.DropRequest(ack_id="ack_id_string", byte_size=10)]
dispatcher_.drop(items) # no error

manager.maybe_resume_consumer.assert_not_called()


def test_nack():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
Expand Down

0 comments on commit db14d1d

Please sign in to comment.