From f222bb966cdfcb34d8ce6c97380c8fd83e9882c4 Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Wed, 13 Nov 2024 10:07:48 -0500 Subject: [PATCH 1/5] Proof of concept retaining endpoint concurrency limits on refresh --- .../core/ConcurrencyLimitedChannel.java | 16 +++++---- .../dialogue/core/DialogueChannel.java | 36 ++++++++++++++++++- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java index 8da6c8912..5cbda920d 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java @@ -44,7 +44,12 @@ final class ConcurrencyLimitedChannel implements LimitedChannel { static final ChannelState.Key HOST_SPECIFIC_STATE_KEY = new ChannelState.Key<>( CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class, - ConcurrencyLimitedChannel::createHostSpecificState); + () -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL)); + + private static final ChannelState.Key + ENDPOINT_SPECIFIC_STATE_KEY = new ChannelState.Key<>( + CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class, + () -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL)); private final NeverThrowChannel delegate; private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter; @@ -63,10 +68,11 @@ static LimitedChannel createForHost(Config cf, Channel channel, int uriIndex, Ch * Creates a concurrency limited channel for per-endpoint limiting. * Metrics are not reported by this component per-endpoint, only by the per-endpoint queue. */ - static LimitedChannel createForEndpoint(Channel channel, String channelName, int uriIndex, Endpoint endpoint) { + static LimitedChannel createForEndpoint( + Channel channel, String channelName, int uriIndex, Endpoint endpoint, ChannelState endpointChannelState) { return new ConcurrencyLimitedChannel( channel, - new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL), + endpointChannelState.getState(ENDPOINT_SPECIFIC_STATE_KEY), new EndpointConcurrencyLimitedChannelInstrumentation(channelName, uriIndex, endpoint)); } @@ -79,10 +85,6 @@ static LimitedChannel createForEndpoint(Channel channel, String channelName, int this.channelNameForLogging = instrumentation.channelNameForLogging(); } - static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createHostSpecificState() { - return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL); - } - @Override public Optional> maybeExecute( Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) { diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index f84e1930f..e28bd90db 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -17,6 +17,8 @@ package com.palantir.dialogue.core; import com.codahale.metrics.Meter; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Ticker; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -247,12 +249,17 @@ private static ImmutableList createHostChannels( LimitedChannel limitedChannel; if (cf.isConcurrencyLimitingEnabled()) { Channel unlimited = channel; + EndpointChannelState endpointChannelState = channelState.getState(EndpointChannelState.KEY); channel = new ChannelToEndpointChannel(endpoint -> { if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) { return unlimited; } LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint( - unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint); + unlimited, + cf.channelName(), + uriIndexForInstrumentation, + endpoint, + endpointChannelState.get(endpoint)); return QueuedChannel.create(cf, endpoint, limited); }); limitedChannel = ConcurrencyLimitedChannel.createForHost( @@ -266,6 +273,33 @@ private static ImmutableList createHostChannels( return perUriChannels.build(); } + /** + * {@link ChannelState} provider for per-endpoint channels like the endpoint concurrency limiter. + * This object is held in the per-host state, and can be used to look up a {@link ChannelState} + * scoped to an individual {@link Endpoint}. + * {@link Endpoint} state is held in a weak-keyed cache, equivalent to the one used in + * {@link ChannelToEndpointChannel}. + * {@link Endpoint} objects are usually enums, which will never be garbage collected, however it's possible + * that callers may build an endpoint instances on a per-call basis, so the weak-keyed map is defensive + * against short-lived endpoints. + * We don't use the same map because the {@link ChannelToEndpointChannel} retains full channel state which + * may or may not be designed to be reused across reloads, and we aim to be more precise with state that is + * kept across uri changes. + */ + private record EndpointChannelState(LoadingCache cache) { + private static final ChannelState.Key KEY = + new ChannelState.Key<>(EndpointChannelState.class, EndpointChannelState::create); + + ChannelState get(Endpoint endpoint) { + return cache.get(endpoint); + } + + private static EndpointChannelState create() { + return new EndpointChannelState( + Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(_key -> new ChannelState())); + } + } + private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) { Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel); return endpoint -> { From 896bc03c57e49c3905573bc5a717b207aa20ab0f Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Mon, 18 Nov 2024 17:52:09 +0000 Subject: [PATCH 2/5] Add generated changelog entries --- changelog/@unreleased/pr-2418.v2.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/@unreleased/pr-2418.v2.yml diff --git a/changelog/@unreleased/pr-2418.v2.yml b/changelog/@unreleased/pr-2418.v2.yml new file mode 100644 index 000000000..7cd1a9d71 --- /dev/null +++ b/changelog/@unreleased/pr-2418.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: Proof of concept retaining endpoint concurrency limits on refresh + links: + - https://github.com/palantir/dialogue/pull/2418 From 9cd534d7c770adb8b39bc563c4f38337674c50ff Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Mon, 18 Nov 2024 17:52:17 +0000 Subject: [PATCH 3/5] Add generated changelog entries --- changelog/@unreleased/pr-2418.v2.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/@unreleased/pr-2418.v2.yml b/changelog/@unreleased/pr-2418.v2.yml index 7cd1a9d71..edf7d4ecb 100644 --- a/changelog/@unreleased/pr-2418.v2.yml +++ b/changelog/@unreleased/pr-2418.v2.yml @@ -1,5 +1,5 @@ type: improvement improvement: - description: Proof of concept retaining endpoint concurrency limits on refresh + description: Retain endpoint concurrency limits on node refresh links: - https://github.com/palantir/dialogue/pull/2418 From a5a0de3984b97034ce02134cbe3a3e37e2688eed Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Mon, 18 Nov 2024 12:55:57 -0500 Subject: [PATCH 4/5] comment --- .../main/java/com/palantir/dialogue/core/DialogueChannel.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index e28bd90db..84a4abf17 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -260,6 +260,9 @@ private static ImmutableList createHostChannels( uriIndexForInstrumentation, endpoint, endpointChannelState.get(endpoint)); + // Note that because the queue is recreated when nodes are refreshed, it's critical that + // the queue can force at least one request through at a time using the behavior introduced + // by https://github.com/palantir/dialogue/pull/2422 return QueuedChannel.create(cf, endpoint, limited); }); limitedChannel = ConcurrencyLimitedChannel.createForHost( From f812a2899bfe824d3c6f520ae40f8a0eefae757f Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Mon, 18 Nov 2024 13:06:19 -0500 Subject: [PATCH 5/5] test coverage --- .../core/ConcurrencyLimitedChannel.java | 5 ++-- .../core/ConcurrencyLimitedChannelTest.java | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java index 5cbda920d..f93eeb747 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/ConcurrencyLimitedChannel.java @@ -46,8 +46,9 @@ final class ConcurrencyLimitedChannel implements LimitedChannel { CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class, () -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL)); - private static final ChannelState.Key - ENDPOINT_SPECIFIC_STATE_KEY = new ChannelState.Key<>( + @VisibleForTesting + static final ChannelState.Key ENDPOINT_SPECIFIC_STATE_KEY = + new ChannelState.Key<>( CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class, () -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL)); diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/ConcurrencyLimitedChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/ConcurrencyLimitedChannelTest.java index f3aed1843..a52ebe4ef 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/ConcurrencyLimitedChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/ConcurrencyLimitedChannelTest.java @@ -118,6 +118,30 @@ public void testReuseCachedLimiterState_host() { assertThat(limiter.getInflight()).isEqualTo(2); } + @Test + public void testReuseCachedLimiterState_endpoint() { + String channelName = "channel"; + ChannelState state = new ChannelState(); + + // create two channels for the same endpoint, which should re-use the same AIMD state + LimitedChannel forEndpoint = + ConcurrencyLimitedChannel.createForEndpoint(delegate, channelName, 0, endpoint, state); + CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter = + state.getState(ConcurrencyLimitedChannel.ENDPOINT_SPECIFIC_STATE_KEY); + + assertThat(limiter.getInflight()).isEqualTo(0); + + forEndpoint.maybeExecute(endpoint, request, LimitEnforcement.DEFAULT_ENABLED); + assertThat(limiter.getInflight()).isEqualTo(1); + + // different uriIndex has no impact on whether state is shared, as indexes will shuffle when nodes go down + LimitedChannel forEndpoint2 = + ConcurrencyLimitedChannel.createForEndpoint(delegate, channelName, 1, endpoint, state); + forEndpoint2.maybeExecute(endpoint, request, LimitEnforcement.DEFAULT_ENABLED); + + assertThat(limiter.getInflight()).isEqualTo(2); + } + @Test public void testLimiterAvailable_successfulRequest_host() { mockHostLimitAvailable();