Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retain endpoint concurrency limits on node refresh #2418

Merged
merged 5 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2418.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Retain endpoint concurrency limits on node refresh
links:
- https://github.com/palantir/dialogue/pull/2418
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ final class ConcurrencyLimitedChannel implements LimitedChannel {
static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter> HOST_SPECIFIC_STATE_KEY =
new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
ConcurrencyLimitedChannel::createHostSpecificState);
() -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL));

@VisibleForTesting
static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter> ENDPOINT_SPECIFIC_STATE_KEY =
new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
() -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL));

private final NeverThrowChannel delegate;
private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter;
Expand All @@ -63,10 +69,11 @@ static LimitedChannel createForHost(Config cf, Channel channel, int uriIndex, Ch
* Creates a concurrency limited channel for per-endpoint limiting.
* Metrics are not reported by this component per-endpoint, only by the per-endpoint queue.
*/
static LimitedChannel createForEndpoint(Channel channel, String channelName, int uriIndex, Endpoint endpoint) {
static LimitedChannel createForEndpoint(
Channel channel, String channelName, int uriIndex, Endpoint endpoint, ChannelState endpointChannelState) {
return new ConcurrencyLimitedChannel(
channel,
new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL),
endpointChannelState.getState(ENDPOINT_SPECIFIC_STATE_KEY),
new EndpointConcurrencyLimitedChannelInstrumentation(channelName, uriIndex, endpoint));
}

Expand All @@ -79,10 +86,6 @@ static LimitedChannel createForEndpoint(Channel channel, String channelName, int
this.channelNameForLogging = instrumentation.channelNameForLogging();
}

static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createHostSpecificState() {
return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL);
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(
Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.palantir.dialogue.core;

import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -247,12 +249,20 @@ private static ImmutableList<LimitedChannel> createHostChannels(
LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
Channel unlimited = channel;
EndpointChannelState endpointChannelState = channelState.getState(EndpointChannelState.KEY);
channel = new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
unlimited,
cf.channelName(),
uriIndexForInstrumentation,
endpoint,
endpointChannelState.get(endpoint));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will fail in an edge case where the endpoint limiter is completely full from a previous refresh, so the queued channel created on the next line cannot sense the boundary condition to begin allowing requests through

// Note that because the queue is recreated when nodes are refreshed, it's critical that
// the queue can force at least one request through at a time using the behavior introduced
// by https://github.com/palantir/dialogue/pull/2422
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(
Expand All @@ -266,6 +276,33 @@ private static ImmutableList<LimitedChannel> createHostChannels(
return perUriChannels.build();
}

/**
* {@link ChannelState} provider for per-endpoint channels like the endpoint concurrency limiter.
* This object is held in the per-host state, and can be used to look up a {@link ChannelState}
* scoped to an individual {@link Endpoint}.
* {@link Endpoint} state is held in a weak-keyed cache, equivalent to the one used in
* {@link ChannelToEndpointChannel}.
* {@link Endpoint} objects are usually enums, which will never be garbage collected, however it's possible
* that callers may build an endpoint instances on a per-call basis, so the weak-keyed map is defensive
* against short-lived endpoints.
* We don't use the same map because the {@link ChannelToEndpointChannel} retains full channel state which
* may or may not be designed to be reused across reloads, and we aim to be more precise with state that is
* kept across uri changes.
*/
private record EndpointChannelState(LoadingCache<Endpoint, ChannelState> cache) {
private static final ChannelState.Key<EndpointChannelState> KEY =
new ChannelState.Key<>(EndpointChannelState.class, EndpointChannelState::create);

ChannelState get(Endpoint endpoint) {
return cache.get(endpoint);
}

private static EndpointChannelState create() {
return new EndpointChannelState(
Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(_key -> new ChannelState()));
}
}

private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) {
Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel);
return endpoint -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ public void testReuseCachedLimiterState_host() {
assertThat(limiter.getInflight()).isEqualTo(2);
}

@Test
public void testReuseCachedLimiterState_endpoint() {
String channelName = "channel";
ChannelState state = new ChannelState();

// create two channels for the same endpoint, which should re-use the same AIMD state
LimitedChannel forEndpoint =
ConcurrencyLimitedChannel.createForEndpoint(delegate, channelName, 0, endpoint, state);
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter =
state.getState(ConcurrencyLimitedChannel.ENDPOINT_SPECIFIC_STATE_KEY);

assertThat(limiter.getInflight()).isEqualTo(0);

forEndpoint.maybeExecute(endpoint, request, LimitEnforcement.DEFAULT_ENABLED);
assertThat(limiter.getInflight()).isEqualTo(1);

// different uriIndex has no impact on whether state is shared, as indexes will shuffle when nodes go down
LimitedChannel forEndpoint2 =
ConcurrencyLimitedChannel.createForEndpoint(delegate, channelName, 1, endpoint, state);
forEndpoint2.maybeExecute(endpoint, request, LimitEnforcement.DEFAULT_ENABLED);

assertThat(limiter.getInflight()).isEqualTo(2);
}

@Test
public void testLimiterAvailable_successfulRequest_host() {
mockHostLimitAvailable();
Expand Down
Loading