From db14d1ddec5236e68ac0ba8a44b072c41969a40b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 23 May 2019 18:51:01 +0200 Subject: [PATCH] Prevent unhandled background error on SPM shutdown --- .../subscriber/_protocol/dispatcher.py | 22 +++++++++++++--- .../pubsub_v1/subscriber/test_dispatcher.py | 26 +++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index e41341afab3d8..95a932eca794e 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -125,8 +125,15 @@ def drop(self, items): Args: items(Sequence[DropRequest]): The items to drop. """ - self._manager.leaser.remove(items) - self._manager.maybe_resume_consumer() + # If the manager is in the process of being shut down, the leaser might + # not exist on it anymore, thus we need to obtain our own reference to + # it and check it for None to avoid errors. + # Nevertheless, calling maybe_resume_consumer() in the manager shutdown + # state is still fine, as it is effectively a no-op at that point. + leaser = getattr(self._manager, "leaser", None) + if leaser is not None: + leaser.remove(items) + self._manager.maybe_resume_consumer() def lease(self, items): """Add the given messages to lease management. @@ -134,8 +141,15 @@ def lease(self, items): Args: items(Sequence[LeaseRequest]): The items to lease. """ - self._manager.leaser.add(items) - self._manager.maybe_pause_consumer() + # If the manager is in the process of being shut down, the leaser might + # not exist on it anymore, thus we need to obtain our own reference to + # it and check it for None to avoid errors. + # Nevertheless, calling maybe_pause_consumer() in the manager shutdown + # state is still fine, as it is effectively a no-op at that point. + leaser = getattr(self._manager, "leaser", None) + if leaser is not None: + leaser.add(items) + self._manager.maybe_pause_consumer() def modify_ack_deadline(self, items): """Modify the ack deadline for the given messages. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 0e1e9744f6d9f..5fccfe6764cb0 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -108,6 +108,19 @@ def test_lease(): manager.maybe_pause_consumer.assert_called_once() +def test_lease_no_manager_leaser(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + manager.leaser = None # simulate manager being shut down + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.LeaseRequest(ack_id="ack_id_string", byte_size=10)] + dispatcher_.lease(items) # no error + + manager.maybe_pause_consumer.assert_not_called() + + def test_drop(): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True @@ -121,6 +134,19 @@ def test_drop(): manager.maybe_resume_consumer.assert_called_once() +def test_drop_no_manager_leaser(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + manager.leaser = None # simulate manager being shut down + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [requests.DropRequest(ack_id="ack_id_string", byte_size=10)] + dispatcher_.drop(items) # no error + + manager.maybe_resume_consumer.assert_not_called() + + def test_nack(): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True