Skip to content

Commit

Permalink
Ensure HttpClient uses the new ConnectionProvider after HttpResources…
Browse files Browse the repository at this point in the history
…#reset (#2020)

Fixes #1943
  • Loading branch information
violetagg authored Feb 7, 2022
1 parent 0dc2225 commit 50dd4b7
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;

import io.netty.resolver.AddressResolverGroup;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -46,6 +47,10 @@ public final class HttpResources extends TcpResources {
public static void disposeLoopsAndConnections() {
HttpResources resources = httpResources.getAndSet(null);
if (resources != null) {
ConnectionProvider provider = resources.http2ConnectionProvider.get();
if (provider != null) {
provider.dispose();
}
resources._dispose();
}
}
Expand Down Expand Up @@ -83,7 +88,12 @@ public static Mono<Void> disposeLoopsAndConnectionsLater(Duration quietPeriod, D
return Mono.defer(() -> {
HttpResources resources = httpResources.getAndSet(null);
if (resources != null) {
return resources._disposeLater(quietPeriod, timeout);
ConnectionProvider provider = resources.http2ConnectionProvider.get();
Mono<Void> disposeProvider = Mono.empty();
if (provider != null) {
disposeProvider = provider.disposeLater();
}
return Mono.when(disposeProvider, resources._disposeLater(quietPeriod, timeout));
}
return Mono.empty();
});
Expand Down Expand Up @@ -130,15 +140,39 @@ public static HttpResources set(LoopResources loops) {
return getOrCreate(httpResources, loops, null, ON_HTTP_NEW, "http");
}

final AtomicReference<ConnectionProvider> http2ConnectionProvider;

HttpResources(LoopResources loops, ConnectionProvider provider) {
super(loops, provider);
http2ConnectionProvider = new AtomicReference<>();
}

@Override
public AddressResolverGroup<?> getOrCreateDefaultResolver() {
return super.getOrCreateDefaultResolver();
}

/**
* Safely checks whether a {@link ConnectionProvider} for HTTP/2 traffic exists
* and proceed with a creation if it does not exist.
*
* @param create the create function provides the current {@link ConnectionProvider} for HTTP/1.1 traffic
* in case some {@link ConnectionProvider} configuration is needed.
* @return an existing or new {@link ConnectionProvider} for HTTP/2 traffic
* @since 1.0.16
*/
public ConnectionProvider getOrCreateHttp2ConnectionProvider(Function<ConnectionProvider, ConnectionProvider> create) {
ConnectionProvider provider = http2ConnectionProvider.get();
if (provider == null) {
ConnectionProvider newProvider = create.apply(this);
if (!http2ConnectionProvider.compareAndSet(null, newProvider)) {
newProvider.dispose();
}
provider = getOrCreateHttp2ConnectionProvider(create);
}
return provider;
}

static final BiFunction<LoopResources, ConnectionProvider, HttpResources> ON_HTTP_NEW;

static final AtomicReference<HttpResources> httpResources;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,6 @@
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.http.Http2SettingsSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;
import reactor.netty.resources.ConnectionProvider;
Expand Down Expand Up @@ -390,7 +389,7 @@ public interface RedirectSendHandler extends BiFunction<HttpClientRequest, Netty
* @return a {@link HttpClient}
*/
public static HttpClient create() {
return new HttpClientConnect(new HttpConnectionProvider(HttpResources.get(), Http2Resources::get));
return new HttpClientConnect(new HttpConnectionProvider());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,15 @@
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.HttpResources;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.transport.TransportConfig;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand All @@ -43,51 +45,60 @@ public Mono<? extends Connection> acquire(
@Nullable Supplier<? extends SocketAddress> remoteAddress,
@Nullable AddressResolverGroup<?> resolverGroup) {
if (((HttpClientConfig) config)._protocols == HttpClientConfig.h11) {
return http1ConnectionProvider.acquire(config, connectionObserver, remoteAddress, resolverGroup);
return http1ConnectionProvider().acquire(config, connectionObserver, remoteAddress, resolverGroup);
}
else if (h2ConnectionProviderSupplier != null) {
return h2ConnectionProviderSupplier.get().acquire(config, connectionObserver, remoteAddress, resolverGroup);
else if (http1ConnectionProvider == null) {
return HttpResources.get().getOrCreateHttp2ConnectionProvider(HTTP2_CONNECTION_PROVIDER_FACTORY)
.acquire(config, connectionObserver, remoteAddress, resolverGroup);
}
else {
return getOrCreate(http1ConnectionProvider).acquire(config, connectionObserver, remoteAddress, resolverGroup);
return getOrCreate().acquire(config, connectionObserver, remoteAddress, resolverGroup);
}
}

@Override
public void disposeWhen(SocketAddress address) {
http1ConnectionProvider.disposeWhen(address);
http1ConnectionProvider().disposeWhen(address);
}

@Override
public int maxConnections() {
return http1ConnectionProvider.maxConnections();
return http1ConnectionProvider().maxConnections();
}

@Override
public Map<SocketAddress, Integer> maxConnectionsPerHost() {
return http1ConnectionProvider.maxConnectionsPerHost();
return http1ConnectionProvider().maxConnectionsPerHost();
}

final ConnectionProvider http1ConnectionProvider;
final Supplier<ConnectionProvider> h2ConnectionProviderSupplier;

HttpConnectionProvider(ConnectionProvider http1ConnectionProvider) {
this(http1ConnectionProvider, null);
final AtomicReference<ConnectionProvider> h2ConnectionProvider = new AtomicReference<>();

HttpConnectionProvider() {
this(null);
}

HttpConnectionProvider(ConnectionProvider http1ConnectionProvider, @Nullable Supplier<ConnectionProvider> h2ConnectionProviderSupplier) {
HttpConnectionProvider(@Nullable ConnectionProvider http1ConnectionProvider) {
this.http1ConnectionProvider = http1ConnectionProvider;
this.h2ConnectionProviderSupplier = h2ConnectionProviderSupplier;
}

ConnectionProvider getOrCreate(ConnectionProvider http1ConnectionProvider) {
ConnectionProvider getOrCreate() {
ConnectionProvider provider = h2ConnectionProvider.get();
if (provider == null) {
h2ConnectionProvider.compareAndSet(null, new Http2ConnectionProvider(http1ConnectionProvider));
provider = getOrCreate(http1ConnectionProvider);
ConnectionProvider newProvider = HTTP2_CONNECTION_PROVIDER_FACTORY.apply(http1ConnectionProvider);
if (!h2ConnectionProvider.compareAndSet(null, newProvider)) {
newProvider.dispose();
}
provider = getOrCreate();
}
return provider;
}

final AtomicReference<ConnectionProvider> h2ConnectionProvider = new AtomicReference<>();
ConnectionProvider http1ConnectionProvider() {
return http1ConnectionProvider != null ? http1ConnectionProvider : HttpResources.get();
}

static final Function<ConnectionProvider, ConnectionProvider> HTTP2_CONNECTION_PROVIDER_FACTORY =
Http2ConnectionProvider::new;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,6 +105,7 @@
import reactor.netty.http.Http11SslContextSpec;
import reactor.netty.http.Http2SslContextSpec;
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.server.HttpServer;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
Expand Down Expand Up @@ -3130,4 +3131,67 @@ private void doTestSharedNameResolver(HttpClient client, boolean sharedClient) t
.block();
}
}

@Test
void testIssue1943Http11() {
doTestIssue1943(HttpProtocol.HTTP11);
}

@Test
void testIssue1943H2C() {
doTestIssue1943(HttpProtocol.H2C);
}

private void doTestIssue1943(HttpProtocol protocol) {
LoopResources serverLoop = LoopResources.create("testIssue1943");
disposableServer =
createServer()
.protocol(protocol)
.runOn(serverLoop)
.handle((req, res) -> res.sendString(Mono.just("testIssue1943")))
.bindNow();

HttpClient client = createClient(disposableServer.port()).protocol(protocol);
HttpClientConfig config = client.configuration();

LoopResources loopResources1 = config.loopResources();
ConnectionProvider provider1 = ((HttpConnectionProvider) config.connectionProvider()).http1ConnectionProvider();
AddressResolverGroup<?> resolverGroup1 = config.defaultAddressResolverGroup();

try {
client.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.expectNext("testIssue1943")
.expectComplete()
.verify(Duration.ofSeconds(5));

HttpResources.reset();

LoopResources loopResources2 = config.loopResources();
ConnectionProvider provider2 = ((HttpConnectionProvider) config.connectionProvider()).http1ConnectionProvider();
AddressResolverGroup<?> resolverGroup2 = config.defaultAddressResolverGroup();

assertThat(loopResources1).isNotSameAs(loopResources2);
assertThat(provider1).isNotSameAs(provider2);
assertThat(resolverGroup1).isNotSameAs(resolverGroup2);

client.get()
.uri("/")
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.expectNext("testIssue1943")
.expectComplete()
.verify(Duration.ofSeconds(5));
}
finally {
serverLoop.disposeLater()
.block(Duration.ofSeconds(5));
}
}
}
Loading

0 comments on commit 50dd4b7

Please sign in to comment.