Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use returnImmediately=false and disable timeouts for pullAsync #1387

Merged
merged 2 commits into from
Nov 11, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,10 @@ interface MessageConsumer extends AutoCloseable {
/**
* Pulls messages from the provided subscription. This method possibly returns no messages if no
* message was available at the time the request was processed by the Pub/Sub service (i.e. the
* system is not allowed to wait until at least one message is available). Pulled messages have
* their acknowledge deadline automatically renewed until they are explicitly consumed using
* {@link Iterator#next()}.
* system is not allowed to wait until at least one message is available -
* <a href="https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PullRequest.FIELDS.bool.google.pubsub.v1.PullRequest.return_immediately">return_immediately</a>
* option is set to {@code true}). Pulled messages have their acknowledge deadline automatically
* renewed until they are explicitly consumed using {@link Iterator#next()}.
*
* <p>Example of pulling a maximum number of messages from a subscription.
* <pre> {@code
Expand All @@ -728,9 +729,12 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request for pulling messages from the provided subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
* This method possibly returns no messages if no message was available at the time the request
* was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
* message is available).
* When using this method the system is allowed to wait until at least one message is available
* rather than returning no messages (i.e.
* <a href="https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PullRequest.FIELDS.bool.google.pubsub.v1.PullRequest.return_immediately">return_immediately</a>
* option is set to {@code false}). The client may cancel the request by calling
* {@link Future#cancel(boolean)} if it does not wish to wait any longer. Notice that the Pub/Sub
* service might still return no messages if a timeout is reached on the service side.
*
* <p>Example of asynchronously pulling a maximum number of messages from a subscription.
* <pre> {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,18 +512,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
return listSubscriptionsAsync(topic, getOptions(), optionMap(options));
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
return get(pullAsync(subscription, maxMessages));
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(final String subscription, int maxMessages) {
private Future<Iterator<ReceivedMessage>> pullAsync(final String subscription,
int maxMessages, boolean returnImmediately) {
PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
.setSubscription(
SubscriberApi.formatSubscriptionName(getOptions().getProjectId(), subscription))
.setMaxMessages(maxMessages)
.setReturnImmediately(true)
.setReturnImmediately(returnImmediately)

This comment was marked as spam.

This comment was marked as spam.

.build();
PullFuture future = rpc.pull(request);
future.addCallback(new PubSubRpc.PullCallback() {
Expand Down Expand Up @@ -555,6 +550,16 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag
});
}

@Override
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
return get(pullAsync(subscription, maxMessages, true));
}

@Override
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
return pullAsync(subscription, maxMessages, false);
}

@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
Expand All @@ -77,6 +79,7 @@ public class DefaultPubSubRpc implements PubSubRpc {

private final PublisherApi publisherApi;
private final SubscriberApi subscriberApi;
private final SubscriberApi noTimeoutSubscriberApi;
private final ScheduledExecutorService executor;
private final ProviderManager providerManager;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
Expand Down Expand Up @@ -164,6 +167,12 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
.applyToAllApiMethods(callSettingsBuilder);
publisherApi = PublisherApi.create(pubBuilder.build());
subscriberApi = SubscriberApi.create(subBuilder.build());
callSettingsBuilder.setRetrySettingsBuilder(callSettingsBuilder.getRetrySettingsBuilder()
.setTotalTimeout(Duration.millis(Long.MAX_VALUE))
.setInitialRpcTimeout(Duration.millis(Long.MAX_VALUE))
.setMaxRpcTimeout(Duration.millis(Long.MAX_VALUE)));
subBuilder.applyToAllApiMethods(callSettingsBuilder);
noTimeoutSubscriberApi = SubscriberApi.create(subBuilder.build());
} catch (Exception ex) {
throw new IOException(ex);
}
Expand Down Expand Up @@ -256,9 +265,14 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
return translate(subscriberApi.acknowledgeCallable().futureCall(request), false);
}

private static PullFuture pull(SubscriberApi subscriberApi, PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
}

@Override
public PullFuture pull(PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
return request.getReturnImmediately()
? pull(subscriberApi, request) : pull(noTimeoutSubscriberApi, request);
}

@Override
Expand Down Expand Up @@ -290,6 +304,7 @@ public void close() throws Exception {
}
closed = true;
subscriberApi.close();
noTimeoutSubscriberApi.close();
publisherApi.close();
providerManager.getChannel().shutdown();
executorFactory.release(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,24 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
assertTrue(pubsub().deleteTopic(topic));
}

@Test
public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException {
String topic = formatForTest("test-pull-messages-async-non-immediately-topic");
pubsub().create(TopicInfo.of(topic));
String subscription = formatForTest("test-pull-messages-async-subscription");
pubsub().create(SubscriptionInfo.of(topic, subscription));
Future<Iterator<ReceivedMessage>> future = pubsub().pullAsync(subscription, 2);
Message message1 = Message.of("payload1");
Message message2 = Message.of("payload2");
List<String> messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
assertEquals(2, messageIds.size());
Iterator<ReceivedMessage> iterator = future.get();
assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
assertTrue(pubsub().deleteSubscription(subscription));
assertTrue(pubsub().deleteTopic(topic));
}

@Test
public void testPullAsyncNonExistingSubscription()
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.setReturnImmediately(false)
.build();
List<ReceivedMessage> messageList = ImmutableList.of(
ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1),
Expand Down Expand Up @@ -1363,7 +1363,7 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
.setReturnImmediately(true)
.setReturnImmediately(false)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);
Expand Down