Skip to content

Commit

Permalink
Cache AddressResolverGroupMetrics per resolverGroup and recorder (#1548)
Browse files Browse the repository at this point in the history
Fixes #1547
  • Loading branch information
violetagg authored Mar 12, 2021
1 parent 21dea09 commit 53be7f6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import reactor.netty.channel.ChannelMetricsRecorder;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import static reactor.netty.Metrics.ERROR;
Expand All @@ -35,11 +38,19 @@
*/
final class AddressResolverGroupMetrics<T extends SocketAddress> extends AddressResolverGroup<T> {

static final ConcurrentMap<Integer, AddressResolverGroupMetrics<?>> cache = PlatformDependent.newConcurrentHashMap();

static AddressResolverGroupMetrics<?> getOrCreate(
AddressResolverGroup<?> resolverGroup, ChannelMetricsRecorder recorder) {
return cache.computeIfAbsent(Objects.hash(resolverGroup, recorder),
key -> new AddressResolverGroupMetrics<>(resolverGroup, recorder));
}

final AddressResolverGroup<T> resolverGroup;

final ChannelMetricsRecorder recorder;

AddressResolverGroupMetrics(AddressResolverGroup<T> resolverGroup, ChannelMetricsRecorder recorder) {
private AddressResolverGroupMetrics(AddressResolverGroup<T> resolverGroup, ChannelMetricsRecorder recorder) {
this.resolverGroup = resolverGroup;
this.recorder = recorder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ protected void proxyProvider(ProxyProvider proxyProvider) {
protected AddressResolverGroup<?> resolverInternal() {
AddressResolverGroup<?> resolverGroup = resolver != null ? resolver : defaultAddressResolverGroup();
if (metricsRecorder != null) {
return new AddressResolverGroupMetrics<>(resolverGroup,
return AddressResolverGroupMetrics.getOrCreate(resolverGroup,
Objects.requireNonNull(metricsRecorder.get(), "Metrics recorder supplier returned null"));
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
Expand All @@ -83,6 +84,7 @@
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.resolver.AddressResolverGroup;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultEventExecutor;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -2647,16 +2649,26 @@ private void doTestProtocolsAndDefaultSslProviderAvailability(HttpClient client,
}

@Test
void testSameNameResolver_WithConnectionPool() {
doTestSameNameResolver(true);
void testSameNameResolver_WithConnectionPoolNoMetrics() {
doTestSameNameResolver(true, false);
}

@Test
void testSameNameResolver_NoConnectionPool() {
doTestSameNameResolver(false);
void testSameNameResolver_WithConnectionPoolWithMetrics() {
doTestSameNameResolver(true, true);
}

private void doTestSameNameResolver(boolean useConnectionPool) {
@Test
void testSameNameResolver_NoConnectionPoolNoMetrics() {
doTestSameNameResolver(false, false);
}

@Test
void testSameNameResolver_NoConnectionPoolWithMetrics() {
doTestSameNameResolver(false, true);
}

private void doTestSameNameResolver(boolean useConnectionPool, boolean enableMetrics) {
disposableServer =
createServer()
.handle((req, res) -> res.sendString(Mono.just("doTestSameNameResolver")))
Expand All @@ -2666,8 +2678,10 @@ private void doTestSameNameResolver(boolean useConnectionPool) {
AtomicReference<List<AddressResolverGroup<?>>> resolvers = new AtomicReference<>(new ArrayList<>());
Flux.range(0, 2)
.flatMap(i -> {
// HttpClient creation multiple times is deliberate
HttpClient client = useConnectionPool ? createClient(port) : createClientNewConnection(port);
return client.doOnConnect(config -> resolvers.get().add(config.resolverInternal()))
return client.metrics(enableMetrics, Function.identity())
.doOnConnect(config -> resolvers.get().add(config.resolverInternal()))
.get()
.uri("/")
.responseContent()
Expand All @@ -2682,4 +2696,63 @@ private void doTestSameNameResolver(boolean useConnectionPool) {
assertThat(resolvers.get()).isNotNull();
assertThat(resolvers.get().get(0)).isSameAs(resolvers.get().get(1));
}

@Test
void testIssue1547() throws Exception {
disposableServer =
createServer()
.handle((req, res) -> res.sendString(Mono.just("testIssue1547")))
.bindNow();

NioEventLoopGroup loop = new NioEventLoopGroup(1);
AtomicReference<List<AddressResolverGroup<?>>> resolvers = new AtomicReference<>(new ArrayList<>());
AtomicReference<List<AddressResolverGroup<?>>> resolversInternal = new AtomicReference<>(new ArrayList<>());
try {
HttpClient client = createClientNewConnection(disposableServer.port()).runOn(useNative -> loop);

Flux.range(0, 2)
.flatMap(i -> client.metrics(true, Function.identity())
.doOnConnect(config -> {
resolvers.get().add(config.resolver());
resolversInternal.get().add(config.resolverInternal());
})
.get()
.uri("/")
.responseContent()
.aggregate()
.asString())
.as(StepVerifier::create)
.expectNext("testIssue1547", "testIssue1547")
.expectComplete()
.verify(Duration.ofSeconds(5));

assertThat(resolvers.get()).isNotNull();
assertThat(resolvers.get().get(0))
.isSameAs(resolvers.get().get(1))
.isInstanceOf(DnsAddressResolverGroup.class);

assertThat(resolversInternal.get()).isNotNull();
assertThat(resolversInternal.get().get(0)).isSameAs(resolversInternal.get().get(1));
assertThat(resolversInternal.get().get(0).getClass().getSimpleName()).isEqualTo("AddressResolverGroupMetrics");
}
finally {
// Closing the executor cleans the AddressResolverGroup internal structures and closes the resolver
loop.shutdownGracefully()
.get(500, TimeUnit.SECONDS);
}

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() ->
resolvers.get()
.get(0)
.getResolver(loop.next()))
.withMessage("executor not accepting a task");

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() ->
resolversInternal.get()
.get(0)
.getResolver(loop.next()))
.withMessage("executor not accepting a task");
}
}

0 comments on commit 53be7f6

Please sign in to comment.