Skip to content

Commit

Permalink
Proof of concept retaining endpoint concurrency limits on refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
carterkozak committed Nov 18, 2024
1 parent 1e8fb68 commit f222bb9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ final class ConcurrencyLimitedChannel implements LimitedChannel {
static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter> HOST_SPECIFIC_STATE_KEY =
new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
ConcurrencyLimitedChannel::createHostSpecificState);
() -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL));

private static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter>
ENDPOINT_SPECIFIC_STATE_KEY = new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
() -> new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL));

private final NeverThrowChannel delegate;
private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter;
Expand All @@ -63,10 +68,11 @@ static LimitedChannel createForHost(Config cf, Channel channel, int uriIndex, Ch
* Creates a concurrency limited channel for per-endpoint limiting.
* Metrics are not reported by this component per-endpoint, only by the per-endpoint queue.
*/
static LimitedChannel createForEndpoint(Channel channel, String channelName, int uriIndex, Endpoint endpoint) {
static LimitedChannel createForEndpoint(
Channel channel, String channelName, int uriIndex, Endpoint endpoint, ChannelState endpointChannelState) {
return new ConcurrencyLimitedChannel(
channel,
new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL),
endpointChannelState.getState(ENDPOINT_SPECIFIC_STATE_KEY),
new EndpointConcurrencyLimitedChannelInstrumentation(channelName, uriIndex, endpoint));
}

Expand All @@ -79,10 +85,6 @@ static LimitedChannel createForEndpoint(Channel channel, String channelName, int
this.channelNameForLogging = instrumentation.channelNameForLogging();
}

static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createHostSpecificState() {
return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL);
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(
Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.palantir.dialogue.core;

import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -247,12 +249,17 @@ private static ImmutableList<LimitedChannel> createHostChannels(
LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
Channel unlimited = channel;
EndpointChannelState endpointChannelState = channelState.getState(EndpointChannelState.KEY);
channel = new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
unlimited,
cf.channelName(),
uriIndexForInstrumentation,
endpoint,
endpointChannelState.get(endpoint));
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(
Expand All @@ -266,6 +273,33 @@ private static ImmutableList<LimitedChannel> createHostChannels(
return perUriChannels.build();
}

/**
* {@link ChannelState} provider for per-endpoint channels like the endpoint concurrency limiter.
* This object is held in the per-host state, and can be used to look up a {@link ChannelState}
* scoped to an individual {@link Endpoint}.
* {@link Endpoint} state is held in a weak-keyed cache, equivalent to the one used in
* {@link ChannelToEndpointChannel}.
* {@link Endpoint} objects are usually enums, which will never be garbage collected, however it's possible
* that callers may build an endpoint instances on a per-call basis, so the weak-keyed map is defensive
* against short-lived endpoints.
* We don't use the same map because the {@link ChannelToEndpointChannel} retains full channel state which
* may or may not be designed to be reused across reloads, and we aim to be more precise with state that is
* kept across uri changes.
*/
private record EndpointChannelState(LoadingCache<Endpoint, ChannelState> cache) {
private static final ChannelState.Key<EndpointChannelState> KEY =
new ChannelState.Key<>(EndpointChannelState.class, EndpointChannelState::create);

ChannelState get(Endpoint endpoint) {
return cache.get(endpoint);
}

private static EndpointChannelState create() {
return new EndpointChannelState(
Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(_key -> new ChannelState()));
}
}

private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) {
Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel);
return endpoint -> {
Expand Down

0 comments on commit f222bb9

Please sign in to comment.