Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #351 fix #302 fix #312 Replace QueuedChannel with a backoff based retryer #432

Merged
merged 17 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-432.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Replace QueuedChannel with a backoff based retryer
links:
- https://github.com/palantir/dialogue/pull/432
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

@FunctionalInterface
interface LimitedChannelListener {
import java.time.Duration;

/** Defines a strategy for waiting in between successive retries of an operation that is subject to failure. */
interface BackoffStrategy {

/**
* Invoked when requests may succeed. There is no guarantee that requests will be accepted.
* This is only necessary if edge-triggering is not sufficient. For example if permits are based
* on the number of active requests, when existing requests complete this is triggered automatically.
* LimitedChannel implementations <i>should</i> invoke {@link #onChannelReady()} when another
* event (scheduled timeout) allows requests to proceed.
*/
void onChannelReady();
/** Returns the next suggested backoff duration. */
Duration backoffDuration(int failures);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,17 @@

import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,45 +46,21 @@ final class BlacklistingChannel implements LimitedChannel {

private static final Logger log = LoggerFactory.getLogger(BlacklistingChannel.class);
private static final int NUM_PROBATION_REQUESTS = 5;
/*
* Shared single thread executor is reused between all blacklisting channels. If it becomes oversaturated
* we may wait longer than expected before resuming requests to blacklisted channels, but this is an
* edge case where things are already operating in a degraded state.
*/
private static final Supplier<ScheduledExecutorService> sharedScheduler = Suppliers.memoize(
() -> Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("dialogue-BlacklistingChannel-scheduler-%d")
.setDaemon(false)
.build()));

private final LimitedChannel delegate;
private final Duration duration;
private final Ticker ticker;
private final LimitedChannelListener listener;
private final ScheduledExecutorService scheduler;
private final AtomicReference<BlacklistState> channelBlacklistState;

BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener) {
this(delegate, duration, listener, Ticker.systemTicker(), sharedScheduler.get());
BlacklistingChannel(LimitedChannel delegate, Duration duration) {
this(delegate, duration, Ticker.systemTicker());
}

@VisibleForTesting
BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) {
this(delegate, duration, listener, ticker, sharedScheduler.get());
}

@VisibleForTesting
BlacklistingChannel(
LimitedChannel delegate,
Duration duration,
LimitedChannelListener listener,
Ticker ticker,
ScheduledExecutorService scheduler) {
BlacklistingChannel(LimitedChannel delegate, Duration duration, Ticker ticker) {
this.delegate = delegate;
this.duration = duration;
this.ticker = ticker;
this.listener = listener;
this.scheduler = scheduler;
this.channelBlacklistState = new AtomicReference<>();
}

Expand All @@ -112,15 +81,12 @@ public String toString() {

private final class BlacklistState {
private final AtomicReference<Probation> probation = new AtomicReference<>();
private final ScheduledFuture<?> scheduledFuture;
private final long blacklistUntilNanos;
private final int probationPermits;

BlacklistState(Duration duration, int probationPermits) {
this.blacklistUntilNanos = ticker.read() + duration.toNanos();
this.probationPermits = probationPermits;
this.scheduledFuture =
scheduler.schedule(this::maybeBeginProbation, duration.toNanos(), TimeUnit.NANOSECONDS);
}

Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
Expand Down Expand Up @@ -148,8 +114,6 @@ Optional<Probation> maybeBeginProbation() {
if (log.isDebugEnabled()) {
log.debug("Channel {} is entering probation", UnsafeArg.of("channel", delegate));
}
listener.onChannelReady();
scheduledFuture.cancel(false);
}
return Optional.ofNullable(probation.get());
}
Expand All @@ -161,7 +125,7 @@ void markSuccess() {
if (maybeProbation != null && maybeProbation.checkIfProbationIsComplete()) {
log.debug("Clearing probation state");
if (channelBlacklistState.compareAndSet(this, null)) {
listener.onChannelReady();
log.debug("Channel is no longer blacklisted");
} else {
log.debug("Blacklist state has already been updated");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.util.Optional;

/** Adapter from {@link Channel} to {@link LimitedChannel} which always returns a {@link Optional#isPresent() value}. */
final class LimitedChannelAdapter implements LimitedChannel {
final class ChannelToLimitedChannelAdapter implements LimitedChannel {

private final Channel delegate;

LimitedChannelAdapter(Channel delegate) {
ChannelToLimitedChannelAdapter(Channel delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "Channel");
}

Expand All @@ -39,6 +39,6 @@ public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Requ

@Override
public String toString() {
return "LimitedChannelAdapter{delegate=" + delegate + '}';
return "ChannelToLimitedChannelAdapter{delegate=" + delegate + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;

public final class Channels {

Expand All @@ -39,26 +38,24 @@ public static Channel create(Collection<? extends Channel> channels, ClientConfi
Preconditions.checkArgument(config.userAgent().isPresent(), "config.userAgent() must be specified");

DialogueClientMetrics clientMetrics = DialogueClientMetrics.of(config.taggedMetricRegistry());
// n.b. This becomes cleaner once we support reloadable channels, the queue can be created first, and
// each limited channel can be created later and passed a method reference to the queued channel.
DeferredLimitedChannelListener queueListener = new DeferredLimitedChannelListener();
List<LimitedChannel> limitedChannels = channels.stream()
// Instrument inner-most channel with metrics so that we measure only the over-the-wire-time
.map(channel -> new InstrumentedChannel(channel, clientMetrics))
// TracedChannel must wrap TracedRequestChannel to ensure requests have tracing headers.
.map(TracedRequestChannel::new)
.map(channel -> new TracedChannel(channel, "Dialogue-http-request"))
.map(LimitedChannelAdapter::new)
.map(ChannelToLimitedChannelAdapter::new)
.map(concurrencyLimiter(config, clientMetrics))
.map(channel -> new FixedLimitedChannel(channel, MAX_REQUESTS_PER_CHANNEL, clientMetrics))
.collect(ImmutableList.toImmutableList());

LimitedChannel limited = nodeSelectionStrategy(config, limitedChannels);
QueuedChannel queuedChannel = new QueuedChannel(limited, DispatcherMetrics.of(config.taggedMetricRegistry()));
queueListener.delegate = queuedChannel::schedule;
Channel channel = queuedChannel;
Channel channel = new LimitedChannelToChannelAdapter(limited);
channel = new TracedChannel(channel, "Dialogue-request-attempt");
channel = new RetryingChannel(channel, config.maxNumRetries(), config.serverQoS());
if (config.maxNumRetries() > 0) {
channel =
new RetryingChannel(channel, config.maxNumRetries(), config.backoffSlotSize(), config.serverQoS());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cute little optimization :)

channel = new UserAgentChannel(channel, config.userAgent().get());
channel = new DeprecationWarningChannel(channel, clientMetrics);
channel = new ContentDecodingChannel(channel);
Expand Down Expand Up @@ -97,15 +94,4 @@ private static Function<LimitedChannel, LimitedChannel> concurrencyLimiter(
throw new SafeIllegalStateException(
"Encountered unknown client QoS configuration", SafeArg.of("ClientQoS", clientQoS));
}

private static final class DeferredLimitedChannelListener implements LimitedChannelListener {
@Nullable
private LimitedChannelListener delegate;

@Override
public void onChannelReady() {
Preconditions.checkNotNull(delegate, "Delegate listener has not been initialized")
.onChannelReady();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import java.time.Duration;
import java.util.function.DoubleSupplier;

final class ExponentialBackoffStrategy implements BackoffStrategy {
carterkozak marked this conversation as resolved.
Show resolved Hide resolved

private final Duration backoffSlotSize;
private final DoubleSupplier random;

ExponentialBackoffStrategy(Duration backoffSlotSize, DoubleSupplier random) {
this.backoffSlotSize = backoffSlotSize;
this.random = random;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/random/jitter/ - it's fine for this to always return 1 in testing, and be a random implementation in prod?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important to get jitter in our simulations as well, otherwise the we may cause waves of requests. In tests using a constant is fine, but ideally we don't measure wall clock time in tests and watch the request rate/etc.

}

@Override
public Duration backoffDuration(int failures) {
if (failures == 0) {
return Duration.ZERO;
}
int upperBound = (int) Math.pow(2, failures - 1);
return Duration.ofNanos(Math.round(backoffSlotSize.toNanos() * random.getAsDouble() * upperBound));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
* Similar to {@link com.palantir.dialogue.Channel}, but may not actually execute the call (eg: when the channel is
* overloaded). Semantics match {@link com.palantir.dialogue.Channel} aside from returning an
* {@link Optional optional response}.
* Limited channels must limit exclusively based on the state of the {@link com.palantir.dialogue.Channel}, not
* the {@link Endpoint} or {@link Request} arguments, otherwise the caller (generally a {@link QueuedChannel})
* may prevent <i>all</i> requests from proceeding.
*/
interface LimitedChannel {
Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.dialogue.core;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.util.function.Supplier;

/** Adapter from {@link LimitedChannel} to {@link Channel} which produces a failed future when results are limited. */
final class LimitedChannelToChannelAdapter implements Channel {

// Avoid method reference allocations
@SuppressWarnings("UnnecessaryLambda")
private static final Supplier<ListenableFuture<Response>> limitedResultSupplier =
() -> Futures.immediateFailedFuture(new SafeRuntimeException("Failed to make a request"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're going to need to give people more information in this exception message (or possibly a comment here) otherwise we're going to be fielding questions in #dev-foundry-infra from people asking "why dialogue is breaking them" and telling them they can't make requests ;)

When people hit this, we probably don't want them to do their own retrying right? Perhaps we should even put some instrumentation on this, so we know exactly how many times our client refused to send a request out the door?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I like the idea of replacing the limited-channel result of Optional<future<response>> with a union of limited-reason or future<response>. Also like the idea of forcefully attempting a request through the limiter if all nodes have limited our result to avoid purely client-side badness.


private final LimitedChannel delegate;

LimitedChannelToChannelAdapter(LimitedChannel delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "LimitedChannel");
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return delegate.maybeExecute(endpoint, request).orElseGet(limitedResultSupplier);
}

@Override
public String toString() {
return "LimitedChannelToChannelAdapter{delegate=" + delegate + '}';
}
}
Loading