diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 2af1b8dcd2a8..4beab699d92a 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -148,21 +148,6 @@ private Publisher(Builder builder) throws IOException { .setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE)); } - Set retryCodes; - if (enableMessageOrdering) { - retryCodes = EnumSet.allOf(StatusCode.Code.class); - } else { - retryCodes = - EnumSet.of( - StatusCode.Code.ABORTED, - StatusCode.Code.CANCELLED, - StatusCode.Code.DEADLINE_EXCEEDED, - StatusCode.Code.INTERNAL, - StatusCode.Code.RESOURCE_EXHAUSTED, - StatusCode.Code.UNKNOWN, - StatusCode.Code.UNAVAILABLE); - } - PublisherStubSettings.Builder stubSettings = PublisherStubSettings.newBuilder() .setCredentialsProvider(builder.credentialsProvider) @@ -170,7 +155,14 @@ private Publisher(Builder builder) throws IOException { .setTransportChannelProvider(builder.channelProvider); stubSettings .publishSettings() - .setRetryableCodes(retryCodes) + .setRetryableCodes(EnumSet.of( + StatusCode.Code.ABORTED, + StatusCode.Code.CANCELLED, + StatusCode.Code.DEADLINE_EXCEEDED, + StatusCode.Code.INTERNAL, + StatusCode.Code.RESOURCE_EXHAUSTED, + StatusCode.Code.UNKNOWN, + StatusCode.Code.UNAVAILABLE)) .setRetrySettings(retrySettingsBuilder.build()) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); @@ -263,6 +255,18 @@ public void run() { return outstandingPublish.publishResult; } + /** + * There may be non-recoverable problems with a request for an ordering key. In that case, all + * subsequent requests will fail until this method is called. If the key is not currently paused, + * calling this method will be a no-op. + * + * @param key The key for which to resume publishing. + */ + public void resumePublish(String key) { + Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); + sequentialExecutor.resumePublish(key); + } + private void setupAlarm() { if (!messagesBatches.isEmpty()) { if (!activeAlarm.getAndSet(true)) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 8adc87cd3c58..5530e2b65958 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -23,10 +23,14 @@ import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; + +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; @@ -147,6 +151,8 @@ static class CallbackExecutor extends SequentialExecutor { new CancellationException( "Execution cancelled because executing previous runnable failed."); + private final Set keysWithErrors = Collections.synchronizedSet(new HashSet()); + CallbackExecutor(Executor executor) { super(executor); } @@ -186,6 +192,11 @@ ApiFuture submit(final String key, final Callable> callable) // Step 1: create a future for the user final SettableApiFuture future = SettableApiFuture.create(); + if (keysWithErrors.contains(key)) { + future.setException(CANCELLATION_EXCEPTION); + return future; + } + // Step 2: create the CancellableRunnable // Step 3: add the task to queue via `execute` CancellableRunnable task = @@ -213,6 +224,7 @@ public void onSuccess(T msg) { // Step 5.2: on failure @Override public void onFailure(Throwable e) { + keysWithErrors.add(key); future.setException(e); cancelQueuedTasks(key, CANCELLATION_EXCEPTION); } @@ -233,7 +245,11 @@ public void cancel(Throwable e) { return future; } - /** Cancels every task in the queue assoicated with {@code key}. */ + void resumePublish(String key) { + keysWithErrors.remove(key); + } + + /** Cancels every task in the queue associated with {@code key}. */ private void cancelQueuedTasks(final String key, Throwable e) { // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, // so that no more tasks are scheduled. diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index e38009f1e7bd..319c1de62e5f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -443,6 +444,95 @@ public void testEnableMessageOrdering_dontSendWhileInflight() throws Exception { publisher.shutdown(); } + @Test + /** + * Make sure that resume publishing works as expected: + * + *
    + *
  1. publish with key orderA which returns a failure.
  2. + *
  3. publish with key orderA again, which should fail immediately
  4. + *
  5. publish with key orderB, which should succeed
  6. + *
  7. resume publishing on key orderA
  8. + *
  9. publish with key orderA, which should now succeed
  10. + *
+ * + */ + public void testResumePublish() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .setEnableMessageOrdering(true) + .build(); + + ApiFuture future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA"); + ApiFuture future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA"); + + fakeExecutor.advanceTime(Duration.ZERO); + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + // This exception should stop future publishing to the same key + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + + fakeExecutor.advanceTime(Duration.ZERO); + + try { + future1.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + } + + // Submit new requests with orderA that should fail. + ApiFuture future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA"); + ApiFuture future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA"); + + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + try { + future4.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // Submit a new request with orderB, which should succeed + ApiFuture future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB"); + ApiFuture future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB"); + + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6")); + + Assert.assertEquals("5", future5.get()); + Assert.assertEquals("6", future6.get()); + + // Resume publishing of "orderA", which should now succeed + publisher.resumePublish("orderA"); + + ApiFuture future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA"); + ApiFuture future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA"); + + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8")); + + Assert.assertEquals("7", future7.get()); + Assert.assertEquals("8", future8.get()); + + publisher.shutdown(); + } + private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish(