-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Differentiate between limited and blacklisted requests #422
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,8 @@ | |
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Suppliers; | ||
import com.google.common.util.concurrent.FutureCallback; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
import com.palantir.dialogue.Endpoint; | ||
import com.palantir.dialogue.Request; | ||
|
@@ -49,7 +50,7 @@ | |
* unblacklisted. Without this functionality, hundreds of requests could be sent to a still-broken | ||
* server before the first of them returns and tells us it's still broken. | ||
*/ | ||
final class BlacklistingChannel implements LimitedChannel { | ||
final class BlacklistingChannel implements CompositeLimitedChannel { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(BlacklistingChannel.class); | ||
private static final int NUM_PROBATION_REQUESTS = 5; | ||
|
@@ -64,25 +65,26 @@ final class BlacklistingChannel implements LimitedChannel { | |
.setDaemon(false) | ||
.build())); | ||
|
||
private final LimitedChannel delegate; | ||
private final CompositeLimitedChannel delegate; | ||
private final Duration duration; | ||
private final Ticker ticker; | ||
private final LimitedChannelListener listener; | ||
private final ScheduledExecutorService scheduler; | ||
private final AtomicReference<BlacklistState> channelBlacklistState; | ||
|
||
BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener) { | ||
BlacklistingChannel(CompositeLimitedChannel delegate, Duration duration, LimitedChannelListener listener) { | ||
this(delegate, duration, listener, Ticker.systemTicker(), sharedScheduler.get()); | ||
} | ||
|
||
@VisibleForTesting | ||
BlacklistingChannel(LimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) { | ||
BlacklistingChannel( | ||
CompositeLimitedChannel delegate, Duration duration, LimitedChannelListener listener, Ticker ticker) { | ||
this(delegate, duration, listener, ticker, sharedScheduler.get()); | ||
} | ||
|
||
@VisibleForTesting | ||
BlacklistingChannel( | ||
LimitedChannel delegate, | ||
CompositeLimitedChannel delegate, | ||
Duration duration, | ||
LimitedChannelListener listener, | ||
Ticker ticker, | ||
|
@@ -96,29 +98,27 @@ final class BlacklistingChannel implements LimitedChannel { | |
} | ||
|
||
@Override | ||
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) { | ||
public LimitedResponse maybeExecute(Endpoint endpoint, Request request) { | ||
BlacklistState state = channelBlacklistState.get(); | ||
if (state != null) { | ||
BlacklistStage stage = state.maybeProgressAndGet(); | ||
if (stage instanceof BlacklistUntil) { | ||
return Optional.empty(); | ||
return LimitedResponses.blacklisted(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than |
||
} | ||
|
||
if (stage instanceof Probation) { | ||
Probation probation = (Probation) stage; | ||
if (probation.acquireStartPermit()) { | ||
log.debug("Probation channel request allowed"); | ||
return delegate.maybeExecute(endpoint, request).map(future -> DialogueFutures.addDirectCallback( | ||
future, new BlacklistingCallback(Optional.of(probation)))); | ||
return wrap(delegate.maybeExecute(endpoint, request), Optional.of(probation)); | ||
} else { | ||
log.debug("Probation channel request not allowed"); | ||
return Optional.empty(); | ||
return LimitedResponses.blacklisted(); | ||
} | ||
} | ||
} | ||
|
||
return delegate.maybeExecute(endpoint, request) | ||
.map(future -> DialogueFutures.addDirectCallback(future, new BlacklistingCallback(Optional.empty()))); | ||
return wrap(delegate.maybeExecute(endpoint, request), Optional.empty()); | ||
} | ||
|
||
private void blacklist() { | ||
|
@@ -131,6 +131,13 @@ private void probationComplete() { | |
listener.onChannelReady(); | ||
} | ||
|
||
private LimitedResponse wrap(LimitedResponse limitedResponse, Optional<Probation> probation) { | ||
LimitedResponses.getResponse(limitedResponse).ifPresent(response -> { | ||
Futures.addCallback(response, new BlacklistingCallback(probation), MoreExecutors.directExecutor()); | ||
}); | ||
return limitedResponse; | ||
} | ||
|
||
private final class BlacklistingCallback implements FutureCallback<Response> { | ||
private final Optional<Probation> probationPermit; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,14 +42,15 @@ public static Channel create( | |
// n.b. This becomes cleaner once we support reloadable channels, the queue can be created first, and | ||
// each limited channel can be created later and passed a method reference to the queued channel. | ||
DeferredLimitedChannelListener queueListener = new DeferredLimitedChannelListener(); | ||
List<LimitedChannel> limitedChannels = channels.stream() | ||
List<CompositeLimitedChannel> limitedChannels = channels.stream() | ||
// Instrument inner-most channel with metrics so that we measure only the over-the-wire-time | ||
.map(channel -> new InstrumentedChannel(channel, clientMetrics)) | ||
// TracedChannel must wrap TracedRequestChannel to ensure requests have tracing headers. | ||
.map(TracedRequestChannel::new) | ||
.map(channel -> new TracedChannel(channel, "Dialogue-http-request")) | ||
.map(LimitedChannelAdapter::new) | ||
.map(CompositeLimitedChannelAdapter::new) | ||
.map(concurrencyLimiter(config, clientMetrics)) | ||
.map(channel -> new BlacklistingChannel(channel, config.failedUrlCooldown(), queueListener)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after any failure, this will put the channel into probation mode by default (failedUrlCooldown == 0 using the default configuration). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ya good catch |
||
.map(channel -> new FixedLimitedChannel(channel, MAX_REQUESTS_PER_CHANNEL, clientMetrics)) | ||
.collect(ImmutableList.toImmutableList()); | ||
|
||
|
@@ -68,9 +69,12 @@ public static Channel create( | |
return channel; | ||
} | ||
|
||
private static LimitedChannel nodeSelectionStrategy(ClientConfiguration config, List<LimitedChannel> channels) { | ||
private static LimitedChannel nodeSelectionStrategy( | ||
ClientConfiguration config, List<CompositeLimitedChannel> channels) { | ||
// no fancy node selection heuristic can save us if our one node goes down | ||
if (channels.size() == 1) { | ||
return channels.get(0); // no fancy node selection heuristic can save us if our one node goes down | ||
return (endpoint, request) -> | ||
LimitedResponses.getResponse(channels.get(0).maybeExecute(endpoint, request)); | ||
} | ||
|
||
switch (config.nodeSelectionStrategy()) { | ||
|
@@ -85,7 +89,7 @@ private static LimitedChannel nodeSelectionStrategy(ClientConfiguration config, | |
"Unknown NodeSelectionStrategy", SafeArg.of("unknown", config.nodeSelectionStrategy())); | ||
} | ||
|
||
private static Function<LimitedChannel, LimitedChannel> concurrencyLimiter( | ||
private static Function<CompositeLimitedChannel, CompositeLimitedChannel> concurrencyLimiter( | ||
ClientConfiguration config, DialogueClientMetrics metrics) { | ||
ClientConfiguration.ClientQoS clientQoS = config.clientQoS(); | ||
switch (clientQoS) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.dialogue.core; | ||
|
||
import com.palantir.dialogue.Endpoint; | ||
import com.palantir.dialogue.Request; | ||
|
||
public interface CompositeLimitedChannel { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. package private There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason we should have both LimitedChannel and CompositeLimitedChannel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was to better separate concerns. Node selection strategies should know about blacklisting and limiting while queueing/retrying only needs to know about whether a request occurred or not |
||
LimitedResponse maybeExecute(Endpoint endpoint, Request request); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,8 @@ | |
import com.github.benmanes.caffeine.cache.Ticker; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.util.concurrent.FutureCallback; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.Futures; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.netflix.concurrency.limits.Limiter; | ||
import com.netflix.concurrency.limits.limit.AIMDLimit; | ||
import com.netflix.concurrency.limits.limit.WindowedLimit; | ||
|
@@ -36,22 +37,22 @@ | |
* requests allowed to a particular channel. If the channel's concurrency limit has been reached, the | ||
* {@link #maybeExecute} method returns empty. | ||
*/ | ||
final class ConcurrencyLimitedChannel implements LimitedChannel { | ||
final class ConcurrencyLimitedChannel implements CompositeLimitedChannel { | ||
private static final Void NO_CONTEXT = null; | ||
private static final Ticker SYSTEM_NANOTIME = System::nanoTime; | ||
|
||
private final Meter limitedMeter; | ||
private final LimitedChannel delegate; | ||
private final CompositeLimitedChannel delegate; | ||
private final Limiter<Void> limiter; | ||
|
||
@VisibleForTesting | ||
ConcurrencyLimitedChannel(LimitedChannel delegate, Limiter<Void> limiter, DialogueClientMetrics metrics) { | ||
this.delegate = new NeverThrowLimitedChannel(delegate); | ||
ConcurrencyLimitedChannel(CompositeLimitedChannel delegate, Limiter<Void> limiter, DialogueClientMetrics metrics) { | ||
this.delegate = new NeverThrowCompositeLimitedChannel(delegate); | ||
this.limitedMeter = metrics.limited(getClass().getSimpleName()); | ||
this.limiter = limiter; | ||
} | ||
|
||
static ConcurrencyLimitedChannel create(LimitedChannel delegate, DialogueClientMetrics metrics) { | ||
static ConcurrencyLimitedChannel create(CompositeLimitedChannel delegate, DialogueClientMetrics metrics) { | ||
return new ConcurrencyLimitedChannel( | ||
delegate, ConcurrencyLimitedChannel.createLimiter(SYSTEM_NANOTIME), metrics); | ||
} | ||
|
@@ -74,20 +75,18 @@ static Limiter<Void> createLimiter(Ticker nanoTimeClock) { | |
} | ||
|
||
@Override | ||
public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Request request) { | ||
public LimitedResponse maybeExecute(Endpoint endpoint, Request request) { | ||
Optional<Limiter.Listener> maybeListener = limiter.acquire(NO_CONTEXT); | ||
if (maybeListener.isPresent()) { | ||
Limiter.Listener listener = maybeListener.get(); | ||
Optional<ListenableFuture<Response>> result = delegate.maybeExecute(endpoint, request); | ||
if (result.isPresent()) { | ||
DialogueFutures.addDirectCallback(result.get(), new LimiterCallback(listener)); | ||
} else { | ||
listener.onIgnore(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we no longer calling |
||
} | ||
return result; | ||
LimitedResponse limitedResponse = delegate.maybeExecute(endpoint, request); | ||
LimitedResponses.getResponse(limitedResponse).ifPresent(response -> { | ||
Futures.addCallback(response, new LimiterCallback(listener), MoreExecutors.directExecutor()); | ||
}); | ||
return limitedResponse; | ||
} else { | ||
limitedMeter.mark(); | ||
return Optional.empty(); | ||
return LimitedResponses.limited(); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.palantir.dialogue.core; | ||
|
||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.palantir.dialogue.Response; | ||
import org.derive4j.Data; | ||
|
||
@Data | ||
public interface LimitedResponse { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This appears to only be used by RoundRobinChannel, is that correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, all selection strategies convert from LimitedReponses to |
||
interface Cases<T> { | ||
T blacklisted(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this only set by blacklisting channel? As a consumer how do I handle this differently from a limited response? The blacklisted channel will become un-blacklisted eventually, much like a limited channel will open up. I think docs would help me a lot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add some docs |
||
|
||
T limited(); | ||
|
||
T response(ListenableFuture<Response> response); | ||
} | ||
|
||
<T> T matches(Cases<T> cases); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we tweak the names so that your CompositeLimitedChannel just becomes LimitedChannel and then rename LimitedChannel -> NodeSelectionChannel (because that's our PinUntilError, RoundRobin implementations)?