Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #301: QueuedChannel futures don't wait forever when delegates throw #387

Merged
merged 3 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-387.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: fix
fix:
description: QueuedChannel futures don't wait forever when delegates throw
links:
- https://github.com/palantir/dialogue/pull/387
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The contract of {@link LimitedChannel} requires that the {@link LimitedChannel#maybeExecute} method never throws.
* This is a defensive backstop so that callers can rely on this invariant.
*/
final class NeverThrowLimitedChannel implements LimitedChannel {

private static final Logger log = LoggerFactory.getLogger(NeverThrowLimitedChannel.class);
private final LimitedChannel delegate;

NeverThrowLimitedChannel(LimitedChannel delegate) {
this.delegate = delegate;
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
try {
return delegate.maybeExecute(endpoint, request);
} catch (RuntimeException | Error e) {
log.error("Dialogue channels should never throw. This may be a bug in the channel implementation", e);
return Optional.of(Futures.immediateFailedFuture(e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ final class QueuedChannel implements Channel {
@VisibleForTesting
@SuppressWarnings("FutureReturnValueIgnored")
QueuedChannel(LimitedChannel delegate, int maxQueueSize, DispatcherMetrics metrics) {
this.delegate = delegate;
this.delegate = new NeverThrowLimitedChannel(delegate);
this.queuedCalls = new LinkedBlockingDeque<>(maxQueueSize);

metrics.callsQueued(queuedCalls::size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.OngoingStubbing;

Expand Down Expand Up @@ -114,6 +115,30 @@ public void testQueuedRequestExecutedOnNextSubmission() {
verify(delegate, times(3)).maybeExecute(endpoint, request);
}

@Test
public void testQueuedRequestExecutedOnNextSubmission_throws() throws ExecutionException, InterruptedException {
// First request is limited by the channel and queued
Request queuedRequest = Mockito.mock(Request.class);
when(delegate.maybeExecute(endpoint, queuedRequest)).thenReturn(Optional.empty());
ListenableFuture<Response> queuedFuture = queuedChannel.execute(endpoint, queuedRequest);
verify(delegate, times(2)).maybeExecute(endpoint, queuedRequest);
assertThat(queuedFuture).isNotDone();

// Second request succeeds and the queued request is attempted, but throws an exception
futureResponse.set(mockResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it helpful to leave little comments explaining what you expect to be happening since just reading the test case requires tracing through the queuedChannel impl to understand why this is testing what it says it is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, updated.

when(delegate.maybeExecute(endpoint, request)).thenReturn(maybeResponse);
when(delegate.maybeExecute(endpoint, queuedRequest)).thenThrow(new NullPointerException("expected"));
ListenableFuture<Response> completed = queuedChannel.execute(endpoint, request);
// Both results should be completed. The thrown exception should
// be converted into a failed future by NeverThrowLimitedChannel
assertThat(completed).isDone();
assertThat(queuedFuture).isDone();
assertThat(completed.get()).isEqualTo(mockResponse);
assertThatThrownBy(queuedFuture::get).hasRootCauseMessage("expected");
verify(delegate, times(1)).maybeExecute(endpoint, request);
verify(delegate, times(3)).maybeExecute(endpoint, queuedRequest);
}

@Test
@SuppressWarnings("FutureReturnValueIgnored")
public void testQueuedRequestExecutedWhenRunningRequestCompletes() {
Expand Down