diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 207700622fb4..3e45fce3eb11 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -702,9 +702,10 @@ interface MessageConsumer extends AutoCloseable { /** * Pulls messages from the provided subscription. This method possibly returns no messages if no * message was available at the time the request was processed by the Pub/Sub service (i.e. the - * system is not allowed to wait until at least one message is available). Pulled messages have - * their acknowledge deadline automatically renewed until they are explicitly consumed using - * {@link Iterator#next()}. + * system is not allowed to wait until at least one message is available - + * return_immediately + * option is set to {@code true}). Pulled messages have their acknowledge deadline automatically + * renewed until they are explicitly consumed using {@link Iterator#next()}. * *
Example of pulling a maximum number of messages from a subscription. *
{@code @@ -728,9 +729,12 @@ interface MessageConsumer extends AutoCloseable { /** * Sends a request for pulling messages from the provided subscription. This method returns a * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator. - * This method possibly returns no messages if no message was available at the time the request - * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one - * message is available). + * When using this method the system is allowed to wait until at least one message is available + * rather than returning no messages (i.e. + * return_immediately + * option is set to {@code false}). The client may cancel the request by calling + * {@link Future#cancel(boolean)} if it does not wish to wait any longer. Notice that the Pub/Sub + * service might still return no messages if a timeout is reached on the service side. * *Example of asynchronously pulling a maximum number of messages from a subscription. *
{@code diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 494bb3ff43a1..369d87935cb6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -512,18 +512,13 @@ public Future> listSubscriptionsAsync(String topic, return listSubscriptionsAsync(topic, getOptions(), optionMap(options)); } - @Override - public Iterator pull(String subscription, int maxMessages) { - return get(pullAsync(subscription, maxMessages)); - } - - @Override - public Future > pullAsync(final String subscription, int maxMessages) { - PullRequest request = PullRequest.newBuilder().setReturnImmediately(true) + private Future > pullAsync(final String subscription, + int maxMessages, boolean returnImmediately) { + PullRequest request = PullRequest.newBuilder() .setSubscription( SubscriberApi.formatSubscriptionName(getOptions().getProjectId(), subscription)) .setMaxMessages(maxMessages) - .setReturnImmediately(true) + .setReturnImmediately(returnImmediately) .build(); PullFuture future = rpc.pull(request); future.addCallback(new PubSubRpc.PullCallback() { @@ -555,6 +550,16 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag }); } + @Override + public Iterator pull(String subscription, int maxMessages) { + return get(pullAsync(subscription, maxMessages, true)); + } + + @Override + public Future > pullAsync(String subscription, int maxMessages) { + return pullAsync(subscription, maxMessages, false); + } + @Override public MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index 76f321d0ebf9..bbdaab1bd1b2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -68,6 +68,8 @@ import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import org.joda.time.Duration; + import java.io.IOException; import java.util.Set; import java.util.concurrent.Future; @@ -77,6 +79,7 @@ public class DefaultPubSubRpc implements PubSubRpc { private final PublisherApi publisherApi; private final SubscriberApi subscriberApi; + private final SubscriberApi noTimeoutSubscriberApi; private final ScheduledExecutorService executor; private final ProviderManager providerManager; private final ExecutorFactory executorFactory; @@ -164,6 +167,12 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException { .applyToAllApiMethods(callSettingsBuilder); publisherApi = PublisherApi.create(pubBuilder.build()); subscriberApi = SubscriberApi.create(subBuilder.build()); + callSettingsBuilder.setRetrySettingsBuilder(callSettingsBuilder.getRetrySettingsBuilder() + .setTotalTimeout(Duration.millis(Long.MAX_VALUE)) + .setInitialRpcTimeout(Duration.millis(Long.MAX_VALUE)) + .setMaxRpcTimeout(Duration.millis(Long.MAX_VALUE))); + subBuilder.applyToAllApiMethods(callSettingsBuilder); + noTimeoutSubscriberApi = SubscriberApi.create(subBuilder.build()); } catch (Exception ex) { throw new IOException(ex); } @@ -256,9 +265,14 @@ public Future acknowledge(AcknowledgeRequest request) { return translate(subscriberApi.acknowledgeCallable().futureCall(request), false); } + private static PullFuture pull(SubscriberApi subscriberApi, PullRequest request) { + return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false)); + } + @Override public PullFuture pull(PullRequest request) { - return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false)); + return request.getReturnImmediately() + ? pull(subscriberApi, request) : pull(noTimeoutSubscriberApi, request); } @Override @@ -290,6 +304,7 @@ public void close() throws Exception { } closed = true; subscriberApi.close(); + noTimeoutSubscriberApi.close(); publisherApi.close(); providerManager.getChannel().shutdown(); executorFactory.release(executor); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index b30d7a0f8c7b..d9524af5925e 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -549,6 +549,24 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept assertTrue(pubsub().deleteTopic(topic)); } + @Test + public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-pull-messages-async-non-immediately-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-async-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Future > future = pubsub().pullAsync(subscription, 2); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + Iterator iterator = future.get(); + assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString()); + assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + @Test public void testPullAsyncNonExistingSubscription() throws ExecutionException, InterruptedException { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 8b5c32bbceae..b3fa6d5178fd 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -1302,7 +1302,7 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept PullRequest request = PullRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .setMaxMessages(42) - .setReturnImmediately(true) + .setReturnImmediately(false) .build(); List messageList = ImmutableList.of( ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1), @@ -1363,7 +1363,7 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE PullRequest request = PullRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .setMaxMessages(42) - .setReturnImmediately(true) + .setReturnImmediately(false) .build(); PubSubException exception = new PubSubException(new IOException(), false); PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);