Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differentiate between limited and blacklisted requests #422

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-422.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: Enable channel blacklisting by default and do not count requests to
blacklisted hosts against retry limit
links:
- https://github.com/palantir/dialogue/pull/422
5 changes: 4 additions & 1 deletion dialogue-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ dependencies {
testRuntimeOnly 'org.slf4j:slf4j-simple'

annotationProcessor 'org.immutables:value'
annotationProcessor 'org.derive4j:derive4j'
compileOnly 'org.derive4j:derive4j-annotation'
compileOnly 'org.immutables:value::annotations'

testAnnotationProcessor 'org.immutables:value'
compile 'org.immutables:value::annotations'
}

configurations.testCompileClasspath.exclude module: 'junit' // prefer junit5
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
Expand Down Expand Up @@ -49,7 +50,7 @@
* unblacklisted. Without this functionality, hundreds of requests could be sent to a still-broken
* server before the first of them returns and tells us it's still broken.
*/
final class BlacklistingChannel implements LimitedChannel {
final class BlacklistingChannel implements CompositeLimitedChannel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we tweak the names so that your CompositeLimitedChannel just becomes LimitedChannel and then rename LimitedChannel -> NodeSelectionChannel (because that's our PinUntilError, RoundRobin implementations)?


private static final Logger log = LoggerFactory.getLogger(BlacklistingChannel.class);
private static final int NUM_PROBATION_REQUESTS = 5;
Expand All @@ -64,25 +65,26 @@ final class BlacklistingChannel implements LimitedChannel {
.setDaemon(false)
.build()));

private final LimitedChannel delegate;
private final CompositeLimitedChannel delegate;
private final Duration duration;
private final Ticker ticker;
private final LimitedChannelListener listener;
private final ScheduledExecutorService scheduler;
private final AtomicReference<BlacklistState> channelBlacklistState;

BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener) {
BlacklistingChannel(CompositeLimitedChannel delegate, Duration duration, LimitedChannelListener listener) {
this(delegate, duration, listener, Ticker.systemTicker(), sharedScheduler.get());
}

@VisibleForTesting
BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) {
BlacklistingChannel(
CompositeLimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) {
this(delegate, duration, listener, ticker, sharedScheduler.get());
}

@VisibleForTesting
BlacklistingChannel(
LimitedChannel delegate,
CompositeLimitedChannel delegate,
Duration duration,
LimitedChannelListener listener,
Ticker ticker,
Expand All @@ -96,29 +98,27 @@ final class BlacklistingChannel implements LimitedChannel {
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
public LimitedResponse maybeExecute(Endpoint endpoint, Request request) {
BlacklistState state = channelBlacklistState.get();
if (state != null) {
BlacklistStage stage = state.maybeProgressAndGet();
if (stage instanceof BlacklistUntil) {
return Optional.empty();
return LimitedResponses.blacklisted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than LimitedResponses.blacklisted, what do you think of LimitedResponses.backOff ?

}

if (stage instanceof Probation) {
Probation probation = (Probation) stage;
if (probation.acquireStartPermit()) {
log.debug("Probation channel request allowed");
return delegate.maybeExecute(endpoint, request).map(future -> DialogueFutures.addDirectCallback(
future, new BlacklistingCallback(Optional.of(probation))));
return wrap(delegate.maybeExecute(endpoint, request), Optional.of(probation));
} else {
log.debug("Probation channel request not allowed");
return Optional.empty();
return LimitedResponses.blacklisted();
}
}
}

return delegate.maybeExecute(endpoint, request)
.map(future -> DialogueFutures.addDirectCallback(future, new BlacklistingCallback(Optional.empty())));
return wrap(delegate.maybeExecute(endpoint, request), Optional.empty());
}

private void blacklist() {
Expand All @@ -131,6 +131,13 @@ private void probationComplete() {
listener.onChannelReady();
}

private LimitedResponse wrap(LimitedResponse limitedResponse, Optional<Probation> probation) {
LimitedResponses.getResponse(limitedResponse).ifPresent(response -> {
Futures.addCallback(response, new BlacklistingCallback(probation), MoreExecutors.directExecutor());
});
return limitedResponse;
}

private final class BlacklistingCallback implements FutureCallback<Response> {
private final Optional<Probation> probationPermit;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ public static Channel create(
// n.b. This becomes cleaner once we support reloadable channels, the queue can be created first, and
// each limited channel can be created later and passed a method reference to the queued channel.
DeferredLimitedChannelListener queueListener = new DeferredLimitedChannelListener();
List<LimitedChannel> limitedChannels = channels.stream()
List<CompositeLimitedChannel> limitedChannels = channels.stream()
// Instrument inner-most channel with metrics so that we measure only the over-the-wire-time
.map(channel -> new InstrumentedChannel(channel, clientMetrics))
// TracedChannel must wrap TracedRequestChannel to ensure requests have tracing headers.
.map(TracedRequestChannel::new)
.map(channel -> new TracedChannel(channel, "Dialogue-http-request"))
.map(LimitedChannelAdapter::new)
.map(CompositeLimitedChannelAdapter::new)
.map(concurrencyLimiter(config, clientMetrics))
.map(channel -> new BlacklistingChannel(channel, config.failedUrlCooldown(), queueListener))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after any failure, this will put the channel into probation mode by default (failedUrlCooldown == 0 using the default configuration).
This could be problematic for timelock, perhaps we should check if clientQos is disabled for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya good catch

.map(channel -> new FixedLimitedChannel(channel, MAX_REQUESTS_PER_CHANNEL, clientMetrics))
.collect(ImmutableList.toImmutableList());

Expand All @@ -68,9 +69,12 @@ public static Channel create(
return channel;
}

private static LimitedChannel nodeSelectionStrategy(ClientConfiguration config, List<LimitedChannel> channels) {
private static LimitedChannel nodeSelectionStrategy(
ClientConfiguration config, List<CompositeLimitedChannel> channels) {
// no fancy node selection heuristic can save us if our one node goes down
if (channels.size() == 1) {
return channels.get(0); // no fancy node selection heuristic can save us if our one node goes down
return (endpoint, request) ->
LimitedResponses.getResponse(channels.get(0).maybeExecute(endpoint, request));
}

switch (config.nodeSelectionStrategy()) {
Expand All @@ -85,7 +89,7 @@ private static LimitedChannel nodeSelectionStrategy(ClientConfiguration config,
"Unknown NodeSelectionStrategy", SafeArg.of("unknown", config.nodeSelectionStrategy()));
}

private static Function<LimitedChannel, LimitedChannel> concurrencyLimiter(
private static Function<CompositeLimitedChannel, CompositeLimitedChannel> concurrencyLimiter(
ClientConfiguration config, DialogueClientMetrics metrics) {
ClientConfiguration.ClientQoS clientQoS = config.clientQoS();
switch (clientQoS) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;

public interface CompositeLimitedChannel {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we should have both LimitedChannel and CompositeLimitedChannel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to better separate concerns. Node selection strategies should know about blacklisting and limiting while queueing/retrying only needs to know about whether a request occurred or not

LimitedResponse maybeExecute(Endpoint endpoint, Request request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,23 @@
*/
package com.palantir.dialogue.core;

import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import com.palantir.logsafe.Preconditions;
import java.util.Optional;

/** Adapter from {@link Channel} to {@link LimitedChannel} which always returns a {@link Optional#isPresent() value}. */
final class LimitedChannelAdapter implements LimitedChannel {
/** Adapter from {@link Channel} to {@link CompositeLimitedChannel} which always returns a response value}. */
final class CompositeLimitedChannelAdapter implements CompositeLimitedChannel {

private final Channel delegate;

LimitedChannelAdapter(Channel delegate) {
CompositeLimitedChannelAdapter(Channel delegate) {
this.delegate = Preconditions.checkNotNull(delegate, "Channel");
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
return Optional.of(delegate.execute(endpoint, request));
public LimitedResponse maybeExecute(Endpoint endpoint, Request request) {
return LimitedResponses.response(delegate.execute(endpoint, request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.netflix.concurrency.limits.limit.WindowedLimit;
Expand All @@ -36,22 +37,22 @@
* requests allowed to a particular channel. If the channel's concurrency limit has been reached, the
* {@link #maybeExecute} method returns empty.
*/
final class ConcurrencyLimitedChannel implements LimitedChannel {
final class ConcurrencyLimitedChannel implements CompositeLimitedChannel {
private static final Void NO_CONTEXT = null;
private static final Ticker SYSTEM_NANOTIME = System::nanoTime;

private final Meter limitedMeter;
private final LimitedChannel delegate;
private final CompositeLimitedChannel delegate;
private final Limiter<Void> limiter;

@VisibleForTesting
ConcurrencyLimitedChannel(LimitedChannel delegate, Limiter<Void> limiter, DialogueClientMetrics metrics) {
this.delegate = new NeverThrowLimitedChannel(delegate);
ConcurrencyLimitedChannel(CompositeLimitedChannel delegate, Limiter<Void> limiter, DialogueClientMetrics metrics) {
this.delegate = new NeverThrowCompositeLimitedChannel(delegate);
this.limitedMeter = metrics.limited(getClass().getSimpleName());
this.limiter = limiter;
}

static ConcurrencyLimitedChannel create(LimitedChannel delegate, DialogueClientMetrics metrics) {
static ConcurrencyLimitedChannel create(CompositeLimitedChannel delegate, DialogueClientMetrics metrics) {
return new ConcurrencyLimitedChannel(
delegate, ConcurrencyLimitedChannel.createLimiter(SYSTEM_NANOTIME), metrics);
}
Expand All @@ -74,20 +75,18 @@ static Limiter<Void> createLimiter(Ticker nanoTimeClock) {
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
public LimitedResponse maybeExecute(Endpoint endpoint, Request request) {
Optional<Limiter.Listener> maybeListener = limiter.acquire(NO_CONTEXT);
if (maybeListener.isPresent()) {
Limiter.Listener listener = maybeListener.get();
Optional<ListenableFuture<Response>> result = delegate.maybeExecute(endpoint, request);
if (result.isPresent()) {
DialogueFutures.addDirectCallback(result.get(), new LimiterCallback(listener));
} else {
listener.onIgnore();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we no longer calling listener.onIgnore in the other two cases? I think using a visitor on the limitedResponse would be more reassuring to me here

}
return result;
LimitedResponse limitedResponse = delegate.maybeExecute(endpoint, request);
LimitedResponses.getResponse(limitedResponse).ifPresent(response -> {
Futures.addCallback(response, new LimiterCallback(listener), MoreExecutors.directExecutor());
});
return limitedResponse;
} else {
limitedMeter.mark();
return Optional.empty();
return LimitedResponses.limited();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
/**
* Channel with a fixed number of available permits. This can be used to enforce a per-route request limit.
*/
final class FixedLimitedChannel implements LimitedChannel {
final class FixedLimitedChannel implements CompositeLimitedChannel {
private static final Logger log = LoggerFactory.getLogger(FixedLimitedChannel.class);

private final LimitedChannel delegate;
private final CompositeLimitedChannel delegate;
private final AtomicInteger usedPermits = new AtomicInteger(0);
private final Meter limitedMeter;
private final int totalPermits;
private final Runnable returnPermit;

FixedLimitedChannel(LimitedChannel delegate, int totalPermits, DialogueClientMetrics metrics) {
FixedLimitedChannel(CompositeLimitedChannel delegate, int totalPermits, DialogueClientMetrics metrics) {
this.delegate = delegate;
this.totalPermits = totalPermits;
this.limitedMeter = metrics.limited(getClass().getSimpleName());
Expand All @@ -50,22 +50,23 @@ final class FixedLimitedChannel implements LimitedChannel {
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) {
public LimitedResponse maybeExecute(Endpoint endpoint, Request request) {
boolean failedToOptimisticallyAcquirePermit = usedPermits.incrementAndGet() > totalPermits;
if (failedToOptimisticallyAcquirePermit) {
returnPermit.run();
limitedMeter.mark();
logExhaustion(endpoint);
return Optional.empty();
return LimitedResponses.limited();
}
boolean resetOptimisticallyConsumedPermit = true;
try {
Optional<ListenableFuture<Response>> result = delegate.maybeExecute(endpoint, request);
LimitedResponse response = delegate.maybeExecute(endpoint, request);
Optional<ListenableFuture<Response>> result = LimitedResponses.getResponse(response);
if (result.isPresent()) {
result.get().addListener(returnPermit, MoreExecutors.directExecutor());
resetOptimisticallyConsumedPermit = false;
}
return result;
return response;
} finally {
if (resetOptimisticallyConsumedPermit) {
returnPermit.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.dialogue.core;

import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Response;
import org.derive4j.Data;

@Data
public interface LimitedResponse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to only be used by RoundRobinChannel, is that correct?

Copy link
Contributor Author

@ferozco ferozco Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, all selection strategies convert from LimitedReponses to Optional<Response>. The idea is that selection strategies need more information to properly select which channel to make a request to

interface Cases<T> {
T blacklisted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only set by blacklisting channel? As a consumer how do I handle this differently from a limited response? The blacklisted channel will become un-blacklisted eventually, much like a limited channel will open up.

I think docs would help me a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some docs


T limited();

T response(ListenableFuture<Response> response);
}

<T> T matches(Cases<T> cases);
}
Loading