Skip to content

Commit

Permalink
fix #351 fix #302 fix #312 Replace QueuedChannel with a backoff based…
Browse files Browse the repository at this point in the history
… retryer (#432)

Replace QueuedChannel with a backoff based retryer
  • Loading branch information
carterkozak authored Feb 26, 2020
1 parent 7b456ec commit 7d32799
Show file tree
Hide file tree
Showing 89 changed files with 374 additions and 943 deletions.
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-432.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Replace QueuedChannel with a backoff based retryer
links:
- https://github.com/palantir/dialogue/pull/432
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,17 @@

import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,45 +46,21 @@ final class BlacklistingChannel implements LimitedChannel {

private static final Logger log = LoggerFactory.getLogger(BlacklistingChannel.class);
private static final int NUM_PROBATION_REQUESTS = 5;
/*
* Shared single thread executor is reused between all blacklisting channels. If it becomes oversaturated
* we may wait longer than expected before resuming requests to blacklisted channels, but this is an
* edge case where things are already operating in a degraded state.
*/
private static final Supplier<ScheduledExecutorService> sharedScheduler = Suppliers.memoize(
() -> Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
.setNameFormat("dialogue-BlacklistingChannel-scheduler-%d")
.setDaemon(false)
.build()));

private final LimitedChannel delegate;
private final Duration duration;
private final Ticker ticker;
private final LimitedChannelListener listener;
private final ScheduledExecutorService scheduler;
private final AtomicReference<BlacklistState> channelBlacklistState;

BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener) {
this(delegate, duration, listener, Ticker.systemTicker(), sharedScheduler.get());
BlacklistingChannel(LimitedChannel delegate, Duration duration) {
this(delegate, duration, Ticker.systemTicker());
}

@VisibleForTesting
BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) {
this(delegate, duration, listener, ticker, sharedScheduler.get());
}

@VisibleForTesting
BlacklistingChannel(
LimitedChannel delegate,
Duration duration,
LimitedChannelListener listener,
Ticker ticker,
ScheduledExecutorService scheduler) {
BlacklistingChannel(LimitedChannel delegate, Duration duration, Ticker ticker) {
this.delegate = delegate;
this.duration = duration;
this.ticker = ticker;
this.listener = listener;
this.scheduler = scheduler;
this.channelBlacklistState = new AtomicReference<>();
}

Expand All @@ -112,15 +81,12 @@ public String toString() {

private final class BlacklistState {
private final AtomicReference<Probation> probation = new AtomicReference<>();
private final ScheduledFuture<?> scheduledFuture;
private final long blacklistUntilNanos;
private final int probationPermits;

BlacklistState(Duration duration, int probationPermits) {
this.blacklistUntilNanos = ticker.read() + duration.toNanos();
this.probationPermits = probationPermits;
this.scheduledFuture =
scheduler.schedule(this::maybeBeginProbation, duration.toNanos(), TimeUnit.NANOSECONDS);
}

Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
Expand Down Expand Up @@ -148,8 +114,6 @@ Optional<Probation> maybeBeginProbation() {
if (log.isDebugEnabled()) {
log.debug("Channel {} is entering probation", UnsafeArg.of("channel", delegate));
}
listener.onChannelReady();
scheduledFuture.cancel(false);
}
return Optional.ofNullable(probation.get());
}
Expand All @@ -161,7 +125,7 @@ void markSuccess() {
if (maybeProbation != null && maybeProbation.checkIfProbationIsComplete()) {
log.debug("Clearing probation state");
if (channelBlacklistState.compareAndSet(this, null)) {
listener.onChannelReady();
log.debug("Channel is no longer blacklisted");
} else {
log.debug("Blacklist state has already been updated");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.util.Optional;

/** Adapter from {@link Channel} to {@link LimitedChannel} which always returns a {@link Optional#isPresent() value}. */
final class LimitedChannelAdapter implements LimitedChannel {
final class ChannelToLimitedChannelAdapter implements LimitedChannel {

private final Channel delegate;

LimitedChannelAdapter(Channel delegate) {
ChannelToLimitedChannelAdapter(Channel delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "Channel");
}

Expand All @@ -39,6 +39,6 @@ public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Requ

@Override
public String toString() {
return "LimitedChannelAdapter{delegate=" + delegate + '}';
return "ChannelToLimitedChannelAdapter{delegate=" + delegate + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;

public final class Channels {

Expand All @@ -39,26 +38,24 @@ public static Channel create(Collection<? extends Channel> channels, ClientConfi
Preconditions.checkArgument(config.userAgent().isPresent(), "config.userAgent() must be specified");

DialogueClientMetrics clientMetrics = DialogueClientMetrics.of(config.taggedMetricRegistry());
// n.b. This becomes cleaner once we support reloadable channels, the queue can be created first, and
// each limited channel can be created later and passed a method reference to the queued channel.
DeferredLimitedChannelListener queueListener = new DeferredLimitedChannelListener();
List<LimitedChannel> limitedChannels = channels.stream()
// Instrument inner-most channel with metrics so that we measure only the over-the-wire-time
.map(channel -> new InstrumentedChannel(channel, clientMetrics))
// TracedChannel must wrap TracedRequestChannel to ensure requests have tracing headers.
.map(TracedRequestChannel::new)
.map(channel -> new TracedChannel(channel, "Dialogue-http-request"))
.map(LimitedChannelAdapter::new)
.map(ChannelToLimitedChannelAdapter::new)
.map(concurrencyLimiter(config, clientMetrics))
.map(channel -> new FixedLimitedChannel(channel, MAX_REQUESTS_PER_CHANNEL, clientMetrics))
.collect(ImmutableList.toImmutableList());

LimitedChannel limited = nodeSelectionStrategy(config, limitedChannels);
QueuedChannel queuedChannel = new QueuedChannel(limited, DispatcherMetrics.of(config.taggedMetricRegistry()));
queueListener.delegate = queuedChannel::schedule;
Channel channel = queuedChannel;
Channel channel = new LimitedChannelToChannelAdapter(limited);
channel = new TracedChannel(channel, "Dialogue-request-attempt");
channel = new RetryingChannel(channel, config.maxNumRetries(), config.serverQoS());
if (config.maxNumRetries() > 0) {
channel =
new RetryingChannel(channel, config.maxNumRetries(), config.backoffSlotSize(), config.serverQoS());
}
channel = new UserAgentChannel(channel, config.userAgent().get());
channel = new DeprecationWarningChannel(channel, clientMetrics);
channel = new ContentDecodingChannel(channel);
Expand Down Expand Up @@ -97,15 +94,4 @@ private static Function<LimitedChannel, LimitedChannel> concurrencyLimiter(
throw new SafeIllegalStateException(
"Encountered unknown client QoS configuration", SafeArg.of("ClientQoS", clientQoS));
}

private static final class DeferredLimitedChannelListener implements LimitedChannelListener {
@Nullable
private LimitedChannelListener delegate;

@Override
public void onChannelReady() {
Preconditions.checkNotNull(delegate, "Delegate listener has not been initialized")
.onChannelReady();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
* Similar to {@link com.palantir.dialogue.Channel}, but may not actually execute the call (eg: when the channel is
* overloaded). Semantics match {@link com.palantir.dialogue.Channel} aside from returning an
* {@link Optional optional response}.
* Limited channels must limit exclusively based on the state of the {@link com.palantir.dialogue.Channel}, not
* the {@link Endpoint} or {@link Request} arguments, otherwise the caller (generally a {@link QueuedChannel})
* may prevent <i>all</i> requests from proceeding.
*/
interface LimitedChannel {
Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.dialogue.core;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.util.function.Supplier;

/** Adapter from {@link LimitedChannel} to {@link Channel} which produces a failed future when results are limited. */
final class LimitedChannelToChannelAdapter implements Channel {

// Avoid method reference allocations
@SuppressWarnings("UnnecessaryLambda")
private static final Supplier<ListenableFuture<Response>> limitedResultSupplier =
() -> Futures.immediateFailedFuture(new SafeRuntimeException("Failed to make a request"));

private final LimitedChannel delegate;

LimitedChannelToChannelAdapter(LimitedChannel delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "LimitedChannel");
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return delegate.maybeExecute(endpoint, request).orElseGet(limitedResultSupplier);
}

@Override
public String toString() {
return "LimitedChannelToChannelAdapter{delegate=" + delegate + '}';
}
}
Loading

0 comments on commit 7d32799

Please sign in to comment.