Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cache with weak keys in ChannelToEndpointChannel #2367

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-2367.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: feature
feature:
description: Dialogue produces timer metrics for all endpoints.
links:
- https://github.com/palantir/dialogue/pull/2367
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,25 @@

package com.palantir.dialogue.core;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.dialogue.Channel;
import com.palantir.dialogue.Endpoint;
import com.palantir.dialogue.Request;
import com.palantir.dialogue.Response;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

final class ChannelToEndpointChannel implements Channel {

private final Function<Endpoint, Channel> adapter;
private final Map<Object, Channel> cache;
private final LoadingCache<Endpoint, Channel> cache;

ChannelToEndpointChannel(Function<Endpoint, Channel> adapter) {
this.adapter = adapter;
this.cache = new ConcurrentHashMap<>();
ChannelToEndpointChannel(Function<Endpoint, Channel> loader) {
this.cache = Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(loader::apply);
}

@Override
public ListenableFuture<Response> execute(Endpoint endpoint, Request request) {
return channelFor(endpoint).execute(endpoint, request);
}

private Channel channelFor(Endpoint endpoint) {
return cache.computeIfAbsent(key(endpoint), _key -> adapter.apply(endpoint));
}

/**
* Constant {@link Endpoint endpoints} may be safely used as cache keys, as opposed to dynamically created
* {@link Endpoint} objects which would result in a memory leak.
*/
static boolean isConstant(Endpoint endpoint) {
// The conjure generator creates endpoints as enum values, which can safely be cached because they aren't
// dynamically created.
return endpoint instanceof Enum;
}

/**
* Creates a cache key for the given endpoint. Some consumers (CJR feign shim) may not use endpoint enums, so we
* cannot safely hold references to potentially short-lived objects. In such cases we use a string value based on
* the service-name endpoint-name tuple.
*/
private static Object key(Endpoint endpoint) {
return isConstant(endpoint) ? endpoint : stringKey(endpoint);
}

private static String stringKey(Endpoint endpoint) {
return endpoint.serviceName() + '.' + endpoint.endpointName();
return cache.get(endpoint).execute(endpoint, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,25 @@ private static ImmutableList<LimitedChannel> createHostChannels(Config cf, List<
.build());
channel = RetryOtherValidatingChannel.create(cf, channel);
channel = HostMetricsChannel.create(cf, channel, targetUri.uri());
Channel tracingChannel =
channel =
new TraceEnrichingChannel(channel, DialogueTracing.tracingTags(cf, uriIndexForInstrumentation));
channel = cf.isConcurrencyLimitingEnabled()
? new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return tracingChannel;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
tracingChannel, cf.channelName(), uriIndexForInstrumentation, endpoint);
return QueuedChannel.create(cf, endpoint, limited);
})
: tracingChannel;
LimitedChannel limitedChannel = cf.isConcurrencyLimitingEnabled()
? ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation)
: new ChannelToLimitedChannelAdapter(channel);

LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
Channel unlimited = channel;
channel = new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(cf, channel, uriIndexForInstrumentation);
} else {
limitedChannel = new ChannelToLimitedChannelAdapter(channel);
}

perUriChannels.add(limitedChannel);
}
return perUriChannels.build();
Expand All @@ -253,11 +257,7 @@ private static EndpointChannelFactory createEndpointChannelFactory(Channel multi
channel = new RangeAcceptsIdentityEncodingChannel(channel);
channel = ContentEncodingChannel.of(channel, endpoint);
channel = TracedChannel.create(cf, channel, endpoint);
if (ChannelToEndpointChannel.isConstant(endpoint)) {
// Avoid producing metrics for non-constant endpoints which may produce
// high cardinality.
channel = TimingEndpointChannel.create(cf, channel, endpoint);
}
channel = TimingEndpointChannel.create(cf, channel, endpoint);
channel = new RequestBodyValidationChannel(channel);
channel = new InterruptionChannel(channel);
return new NeverThrowEndpointChannel(channel); // this must come last as a defensive backstop
Expand Down
Loading