From 75f6e55055e57fa913fd8f3e03c087cc17deb90d Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Thu, 20 Feb 2020 09:02:23 -0500 Subject: [PATCH 1/3] fix #301: QueuedChannel futures don't wait forever when delegates throw --- .../core/NeverThrowLimitedChannel.java | 50 +++++++++++++++++++ .../palantir/dialogue/core/QueuedChannel.java | 2 +- .../dialogue/core/QueuedChannelTest.java | 21 ++++++++ 3 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/NeverThrowLimitedChannel.java diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NeverThrowLimitedChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NeverThrowLimitedChannel.java new file mode 100644 index 000000000..8fa9fbb67 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NeverThrowLimitedChannel.java @@ -0,0 +1,50 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The contract of {@link LimitedChannel} requires that the {@link LimitedChannel#maybeExecute} method never throws. + * This is a defensive backstop so that callers can rely on this invariant. + */ +final class NeverThrowLimitedChannel implements LimitedChannel { + + private static final Logger log = LoggerFactory.getLogger(NeverThrowLimitedChannel.class); + private final LimitedChannel delegate; + + NeverThrowLimitedChannel(LimitedChannel delegate) { + this.delegate = delegate; + } + + @Override + public Optional> maybeExecute(Endpoint endpoint, Request request) { + try { + return delegate.maybeExecute(endpoint, request); + } catch (RuntimeException | Error e) { + log.error("Dialogue channels should never throw. This may be a bug in the channel implementation", e); + return Optional.of(Futures.immediateFailedFuture(e)); + } + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java index 2fb2c9681..f0852c8ef 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/QueuedChannel.java @@ -75,7 +75,7 @@ final class QueuedChannel implements Channel { @VisibleForTesting @SuppressWarnings("FutureReturnValueIgnored") QueuedChannel(LimitedChannel delegate, int maxQueueSize, DispatcherMetrics metrics) { - this.delegate = delegate; + this.delegate = new NeverThrowLimitedChannel(delegate); this.queuedCalls = new LinkedBlockingDeque<>(maxQueueSize); metrics.callsQueued(queuedCalls::size); diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java index 06a439652..b21ee4c8a 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.OngoingStubbing; @@ -114,6 +115,26 @@ public void testQueuedRequestExecutedOnNextSubmission() { verify(delegate, times(3)).maybeExecute(endpoint, request); } + @Test + public void testQueuedRequestExecutedOnNextSubmission_throws() throws ExecutionException, InterruptedException { + Request queuedRequest = Mockito.mock(Request.class); + when(delegate.maybeExecute(endpoint, queuedRequest)).thenReturn(Optional.empty()); + ListenableFuture queuedFuture = queuedChannel.execute(endpoint, queuedRequest); + verify(delegate, times(2)).maybeExecute(endpoint, queuedRequest); + assertThat(queuedFuture).isNotDone(); + + futureResponse.set(mockResponse); + when(delegate.maybeExecute(endpoint, request)).thenReturn(maybeResponse); + when(delegate.maybeExecute(endpoint, queuedRequest)).thenThrow(new NullPointerException("expected")); + ListenableFuture completed = queuedChannel.execute(endpoint, request); + assertThat(completed).isDone(); + assertThat(queuedFuture).isDone(); + assertThat(completed.get()).isEqualTo(mockResponse); + assertThatThrownBy(queuedFuture::get).hasRootCauseMessage("expected"); + verify(delegate, times(1)).maybeExecute(endpoint, request); + verify(delegate, times(3)).maybeExecute(endpoint, queuedRequest); + } + @Test @SuppressWarnings("FutureReturnValueIgnored") public void testQueuedRequestExecutedWhenRunningRequestCompletes() { From 0a811708f1d4c160b8194f9f98286cd80eeadd8d Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Thu, 20 Feb 2020 10:03:56 -0500 Subject: [PATCH 2/3] test docs --- .../java/com/palantir/dialogue/core/QueuedChannelTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java index b21ee4c8a..fe3cb7d0f 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java @@ -117,16 +117,20 @@ public void testQueuedRequestExecutedOnNextSubmission() { @Test public void testQueuedRequestExecutedOnNextSubmission_throws() throws ExecutionException, InterruptedException { + // First request is limited by the channel and queued Request queuedRequest = Mockito.mock(Request.class); when(delegate.maybeExecute(endpoint, queuedRequest)).thenReturn(Optional.empty()); ListenableFuture queuedFuture = queuedChannel.execute(endpoint, queuedRequest); verify(delegate, times(2)).maybeExecute(endpoint, queuedRequest); assertThat(queuedFuture).isNotDone(); + // Second request succeeds and the queued request is attempted, but throws an exception futureResponse.set(mockResponse); when(delegate.maybeExecute(endpoint, request)).thenReturn(maybeResponse); when(delegate.maybeExecute(endpoint, queuedRequest)).thenThrow(new NullPointerException("expected")); ListenableFuture completed = queuedChannel.execute(endpoint, request); + // Both results should be completed. The thrown exception should + // be converted into a failed future by NeverThrowLimitedChannel assertThat(completed).isDone(); assertThat(queuedFuture).isDone(); assertThat(completed.get()).isEqualTo(mockResponse); From e59ca671d4064caae8c0f081dbd4ea61f2e8e28d Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Thu, 20 Feb 2020 15:03:56 +0000 Subject: [PATCH 3/3] Add generated changelog entries --- changelog/@unreleased/pr-387.v2.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/@unreleased/pr-387.v2.yml diff --git a/changelog/@unreleased/pr-387.v2.yml b/changelog/@unreleased/pr-387.v2.yml new file mode 100644 index 000000000..634841076 --- /dev/null +++ b/changelog/@unreleased/pr-387.v2.yml @@ -0,0 +1,5 @@ +type: fix +fix: + description: QueuedChannel futures don't wait forever when delegates throw + links: + - https://github.com/palantir/dialogue/pull/387