diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 9fb489967..ae3635892 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -163,7 +163,8 @@ def _process_requests( ack_reqs_dict: Dict[str, requests.AckRequest], errors_dict: Optional[Dict[str, str]], ): - """Process requests by referring to error_status and errors_dict. + """Process requests when exactly-once delivery is enabled by referring to + error_status and errors_dict. The errors returned by the server in as `error_status` or in `errors_dict` are used to complete the request futures in `ack_reqs_dict` (with a success @@ -599,14 +600,23 @@ def send_unary_ack( error_status = _get_status(exc) ack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: - status = status_pb2.Status() - # Choose a non-retriable error code so the futures fail with - # exceptions. - status.code = code_pb2.UNKNOWN + exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() # Makes sure to complete futures so they don't block forever. - _process_requests(status, ack_reqs_dict, None) + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + if exactly_once_delivery_enabled: + e = AcknowledgeError( + AcknowledgeStatus.OTHER, "RetryError while sending ack RPC." + ) + req.future.set_exception(e) + else: + req.future.set_result(AcknowledgeStatus.SUCCESS) + _LOGGER.debug( - "RetryError while sending unary RPC. Waiting on a transient " + "RetryError while sending ack RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", exc_info=False, ) @@ -615,9 +625,23 @@ def send_unary_ack( self._on_rpc_done(exc) raise - requests_completed, requests_to_retry = _process_requests( - error_status, ack_reqs_dict, ack_errors_dict - ) + if self._exactly_once_delivery_enabled(): + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, ack_errors_dict + ) + else: + requests_completed = [] + requests_to_retry = [] + # When exactly-once delivery is NOT enabled, acks/modacks are considered + # best-effort. So, they always succeed even if the RPC fails. + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + req.future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(req) + return requests_completed, requests_to_retry def send_unary_modack( @@ -655,14 +679,24 @@ def send_unary_modack( error_status = _get_status(exc) modack_errors_dict = _get_ack_errors(exc) except exceptions.RetryError as exc: - status = status_pb2.Status() - # Choose a non-retriable error code so the futures fail with - # exceptions. - status.code = code_pb2.UNKNOWN + exactly_once_delivery_enabled = self._exactly_once_delivery_enabled() # Makes sure to complete futures so they don't block forever. - _process_requests(status, ack_reqs_dict, None) + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + if exactly_once_delivery_enabled: + e = AcknowledgeError( + AcknowledgeStatus.OTHER, + "RetryError while sending modack RPC.", + ) + req.future.set_exception(e) + else: + req.future.set_result(AcknowledgeStatus.SUCCESS) + _LOGGER.debug( - "RetryError while sending unary RPC. Waiting on a transient " + "RetryError while sending modack RPC. Waiting on a transient " "error resolution for too long, will now trigger shutdown.", exc_info=False, ) @@ -671,9 +705,23 @@ def send_unary_modack( self._on_rpc_done(exc) raise - requests_completed, requests_to_retry = _process_requests( - error_status, ack_reqs_dict, modack_errors_dict - ) + if self._exactly_once_delivery_enabled(): + requests_completed, requests_to_retry = _process_requests( + error_status, ack_reqs_dict, modack_errors_dict + ) + else: + requests_completed = [] + requests_to_retry = [] + # When exactly-once delivery is NOT enabled, acks/modacks are considered + # best-effort. So, they always succeed even if the RPC fails. + for req in ack_reqs_dict.values(): + # Futures may be present even with exactly-once delivery + # disabled, in transition periods after the setting is changed on + # the subscription. + if req.future: + req.future.set_result(AcknowledgeStatus.SUCCESS) + requests_completed.append(req) + return requests_completed, requests_to_retry def heartbeat(self) -> bool: diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index e9554deda..8a1460951 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -521,6 +521,67 @@ def test_send_unary_ack(): ) +def test_send_unary_ack_exactly_once_enabled_with_futures(): + manager = make_manager() + manager._exactly_once_enabled = True + + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) + + manager._client.acknowledge.assert_called_once_with( + subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_send_unary_ack_exactly_once_disabled_with_futures(): + manager = make_manager() + + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + manager.send_unary_ack(ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict) + + manager._client.acknowledge.assert_called_once_with( + subscription=manager._subscription, ack_ids=["ack_id1", "ack_id2"] + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + def test_send_unary_modack(): manager = make_manager() @@ -552,6 +613,81 @@ def test_send_unary_modack(): ) +def test_send_unary_modack_exactly_once_enabled_with_futures(): + manager = make_manager() + manager._exactly_once_enabled = True + + future1 = futures.Future() + future2 = futures.Future() + future3 = futures.Future() + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=future1), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=future2), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=future3), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=[10, 20, 20], + ack_reqs_dict=ack_reqs_dict, + ) + + manager._client.modify_ack_deadline.assert_has_calls( + [ + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id3"], + ack_deadline_seconds=10, + ), + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id4", "ack_id5"], + ack_deadline_seconds=20, + ), + ], + any_order=True, + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_send_unary_modack_exactly_once_disabled_with_futures(): + manager = make_manager() + + future1 = futures.Future() + future2 = futures.Future() + future3 = futures.Future() + ack_reqs_dict = { + "ack_id3": requests.ModAckRequest(ack_id="ack_id3", seconds=60, future=future1), + "ack_id4": requests.ModAckRequest(ack_id="ack_id4", seconds=60, future=future2), + "ack_id5": requests.ModAckRequest(ack_id="ack_id5", seconds=60, future=future3), + } + manager.send_unary_modack( + modify_deadline_ack_ids=["ack_id3", "ack_id4", "ack_id5"], + modify_deadline_seconds=[10, 20, 20], + ack_reqs_dict=ack_reqs_dict, + ) + + manager._client.modify_ack_deadline.assert_has_calls( + [ + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id3"], + ack_deadline_seconds=10, + ), + mock.call( + subscription=manager._subscription, + ack_ids=["ack_id4", "ack_id5"], + ack_deadline_seconds=20, + ), + ], + any_order=True, + ) + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future3.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + def test_send_unary_ack_api_call_error(caplog): caplog.set_level(logging.DEBUG) @@ -606,10 +742,123 @@ def test_send_unary_modack_api_call_error(caplog): assert "The front fell off" in caplog.text -def test_send_unary_ack_retry_error(caplog): +def test_send_unary_ack_retry_error_exactly_once_disabled_no_futures(caplog): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.acknowledge.side_effect = error + + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending ack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_ack_retry_error_exactly_once_disabled_with_futures(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.acknowledge.side_effect = error + + future1 = futures.Future() + future2 = futures.Future() + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future1, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=future2, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending ack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + assert future1.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + assert future2.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_send_unary_ack_retry_error_exactly_once_enabled_no_futures(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.acknowledge.side_effect = error + + ack_reqs_dict = { + "ack_id1": requests.AckRequest( + ack_id="ack_id1", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + ), + "ack_id2": requests.AckRequest( + ack_id="ack_id2", + byte_size=0, + time_to_ack=20, + ordering_key="", + future=None, + ), + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_ack( + ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict + ) + + assert "RetryError while sending ack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_ack_retry_error_exactly_once_enabled_with_futures(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True error = exceptions.RetryError( "Too long a transient error", cause=Exception("Out of time!") @@ -639,7 +888,7 @@ def test_send_unary_ack_retry_error(caplog): ack_ids=["ack_id1", "ack_id2"], ack_reqs_dict=ack_reqs_dict ) - assert "RetryError while sending unary RPC" in caplog.text + assert "RetryError while sending ack RPC" in caplog.text assert "signaled streaming pull manager shutdown" in caplog.text assert isinstance(future1.exception(), subscriber_exceptions.AcknowledgeError) assert ( @@ -651,10 +900,94 @@ def test_send_unary_ack_retry_error(caplog): ) -def test_send_unary_modack_retry_error(caplog): +def test_send_unary_modack_retry_error_exactly_once_disabled_no_future(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=None) + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) + + assert "RetryError while sending modack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_modack_retry_error_exactly_once_disabled_with_futures( + caplog, +): caplog.set_level(logging.DEBUG) manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = False + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + future = futures.Future() + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=future) + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) + + assert "RetryError while sending modack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + assert future.result() == subscriber_exceptions.AcknowledgeStatus.SUCCESS + + +def test_send_unary_modack_retry_error_exactly_once_enabled_no_futures( + caplog, +): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True + + error = exceptions.RetryError( + "Too long a transient error", cause=Exception("Out of time!") + ) + manager._client.modify_ack_deadline.side_effect = error + + ack_reqs_dict = { + "ackid1": requests.ModAckRequest(ack_id="ackid1", seconds=60, future=None) + } + with pytest.raises(exceptions.RetryError): + manager.send_unary_modack( + modify_deadline_ack_ids=["ackid1"], + modify_deadline_seconds=[0], + ack_reqs_dict=ack_reqs_dict, + ) + + assert "RetryError while sending modack RPC" in caplog.text + assert "signaled streaming pull manager shutdown" in caplog.text + + +def test_send_unary_modack_retry_error_exactly_once_enabled_with_futures( + caplog, +): + caplog.set_level(logging.DEBUG) + + manager, _, _, _, _, _ = make_running_manager() + manager._exactly_once_enabled = True error = exceptions.RetryError( "Too long a transient error", cause=Exception("Out of time!") @@ -672,7 +1005,7 @@ def test_send_unary_modack_retry_error(caplog): ack_reqs_dict=ack_reqs_dict, ) - assert "RetryError while sending unary RPC" in caplog.text + assert "RetryError while sending modack RPC" in caplog.text assert "signaled streaming pull manager shutdown" in caplog.text assert isinstance(future.exception(), subscriber_exceptions.AcknowledgeError) assert (