Skip to content

Commit c527c2e

Browse files
fix: Stop using api_core default timeouts in publish since they are
broken
1 parent f648f65 commit c527c2e

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

google/cloud/pubsub_v1/types.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from google.protobuf import timestamp_pb2
3636

3737
from google.api_core.protobuf_helpers import get_messages
38+
from google.api_core.timeout import ExponentialTimeout
3839

3940
from google.pubsub_v1.types import pubsub as pubsub_gapic_types
4041

@@ -191,7 +192,12 @@ class PublisherOptions(NamedTuple):
191192
"an instance of :class:`google.api_core.retry.Retry`."
192193
)
193194

194-
timeout: "OptionalTimeout" = gapic_v1.method.DEFAULT # use api_core default
195+
# Use ExponentialTimeout instead of api_core default because the default
196+
# value results in retries with zero deadline.
197+
# Refer https://github.com/googleapis/python-api-core/issues/654
198+
timeout: "OptionalTimeout" = ExponentialTimeout(
199+
initial=5, maximum=60, multiplier=1.3, deadline=600
200+
)
195201
(
196202
"Timeout settings for message publishing by the client. It should be "
197203
"compatible with :class:`~.pubsub_v1.types.TimeoutType`."

tests/unit/pubsub_v1/publisher/test_publisher_client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import sys
2020

2121
import grpc
22+
import math
2223

2324
# special case python < 3.8
2425
if sys.version_info.major == 3 and sys.version_info.minor < 8:
@@ -35,6 +36,7 @@
3536
from google.api_core import gapic_v1
3637
from google.api_core import retry as retries
3738
from google.api_core.gapic_v1.client_info import METRICS_METADATA_KEY
39+
from google.api_core.timeout import ExponentialTimeout
3840

3941
from google.cloud.pubsub_v1 import publisher
4042
from google.cloud.pubsub_v1 import types
@@ -646,12 +648,15 @@ def test_publish_new_batch_needed(creds):
646648

647649
# Actually mock the batch class now.
648650
batch_class = mock.Mock(spec=(), return_value=batch2)
651+
649652
client._set_batch_class(batch_class)
650653

651654
# Publish a message.
652655
future = client.publish(topic, b"foo", bar=b"baz")
653656
assert future is mock.sentinel.future
654657

658+
call_args = batch_class.call_args
659+
655660
# Check the mocks.
656661
batch_class.assert_called_once_with(
657662
client=mock.ANY,
@@ -660,8 +665,17 @@ def test_publish_new_batch_needed(creds):
660665
batch_done_callback=None,
661666
commit_when_full=True,
662667
commit_retry=gapic_v1.method.DEFAULT,
663-
commit_timeout=gapic_v1.method.DEFAULT,
668+
commit_timeout=mock.ANY,
664669
)
670+
commit_timeout_arg = call_args[1]["commit_timeout"]
671+
assert isinstance(commit_timeout_arg, ExponentialTimeout)
672+
assert math.isclose(commit_timeout_arg._initial, 5) is True
673+
assert math.isclose(commit_timeout_arg._maximum, 60) is True
674+
assert math.isclose(commit_timeout_arg._multiplier, 1.3) is True
675+
assert math.isclose(commit_timeout_arg._deadline, 600) is True
676+
# ExponentialTimeout(
677+
# initial=5, maximum=60, multiplier=1.3, deadline=600
678+
# ),
665679
message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"})
666680
wrapper = PublishMessageWrapper(message=message_pb)
667681
batch1.publish.assert_called_once_with(wrapper)

0 commit comments

Comments
 (0)