Skip to content

Commit

Permalink
[1808] For HTTP/2 Use a minimum num of socket connections..
Browse files Browse the repository at this point in the history
before spawning multiple streams on any given socket connection.

 reactor#1808
  • Loading branch information
Samuel Cox committed Mar 9, 2022
1 parent 5758ea6 commit 714e576
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public interface Builder {
* @return {@code this}
*/
//Builder pushEnabled(boolean pushEnabled);

/**
* Sets the {@code SETTINGS_MIN_CONNECTIONS} value.
*
* @param minConnections the {@code SETTINGS_MIN_CONNECTIONS} value
* @return {@code this}
*/
Builder minConnections(int minConnections);
}

/**
Expand Down Expand Up @@ -157,6 +165,16 @@ public Boolean pushEnabled() {
return pushEnabled;
}

/**
* Returns the configured minimum connections to create before adding more than one stream.
* Defaults to 0.
*
* @return TODO
*/
public int minConnections() {
return minConnections;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -171,7 +189,8 @@ public boolean equals(Object o) {
Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) &&
Objects.equals(maxFrameSize, that.maxFrameSize) &&
maxHeaderListSize.equals(that.maxHeaderListSize) &&
Objects.equals(pushEnabled, that.pushEnabled);
Objects.equals(pushEnabled, that.pushEnabled) &&
minConnections == that.minConnections;
}

@Override
Expand All @@ -185,6 +204,7 @@ public int hashCode() {
final Integer maxFrameSize;
final Long maxHeaderListSize;
final Boolean pushEnabled;
final int minConnections;

Http2SettingsSpec(Build build) {
Http2Settings settings = build.http2Settings;
Expand All @@ -194,10 +214,12 @@ public int hashCode() {
maxFrameSize = settings.maxFrameSize();
maxHeaderListSize = settings.maxHeaderListSize();
pushEnabled = settings.pushEnabled();
minConnections = build.minConnections;
}

static final class Build implements Builder {
final Http2Settings http2Settings = Http2Settings.defaultSettings();
private int minConnections;

@Override
public Http2SettingsSpec build() {
Expand Down Expand Up @@ -241,5 +263,12 @@ public Builder pushEnabled(boolean pushEnabled) {
return this;
}
*/

@Override
public Builder minConnections(int minConnections) {
this.minConnections = minConnections;
return this;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime()));
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime(), this.config.http2MinConnections()));
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,20 +132,31 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.

long lastInteractionTimestamp;

Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime) {
if (poolConfig.allocationStrategy().getPermits(0) != 0) {
throw new IllegalArgumentException("No support for configuring minimum number of connections");
}
/**
* The minimum amount of connections to create before creating more than 1 stream
* on any given connection.
*/
int minConnections;

Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime, int minConnections) {
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
this.maxLifeTime = maxLifeTime;
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;
if (minConnections < 0) {
throw new IllegalArgumentException("HTTP/2 minimum connections must be non-negative.");
}
this.minConnections = minConnections;

recordInteractionTimestamp();
}

Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime) {
this(poolConfig, maxLifeTime, 0);
}

@Override
public Mono<PooledRef<Connection>> acquire() {
return new BorrowerMono(this, Duration.ZERO);
Expand Down Expand Up @@ -384,10 +395,15 @@ else if (sig.isOnError()) {
}
}

// TODO think on better names for this. Some kind of signal that this only
// returns a connection from the
@Nullable
Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
int resourcesCount = resources.size();
while (resourcesCount > 0) {
// TODO this is the KEY idea. I'm curious on reactor-netty's team
// thought on this. Completely wrong place even if we do some name changes,
// cleanup, etc?
while (resourcesCount > (minConnections == 0 ? 0 : minConnections + 1)) {
// There are connections in the queue

resourcesCount--;
Expand Down Expand Up @@ -758,4 +774,4 @@ long lifeTime() {
return pool.clock.millis() - creationTimestamp;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ boolean checkProtocol(int protocol) {
return (_protocols & protocol) == protocol;
}

// TODO clean up names and whatnot. maybe better structure around http2 stuff if RN
// team does accept this.
int http2MinConnections() {
return http2Settings != null ? http2Settings.minConnections() : 0;
}

Http2Settings http2Settings() {
Http2Settings settings = Http2Settings.defaultSettings();

Expand Down Expand Up @@ -768,6 +774,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
final boolean acceptGzip;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final int http2MinConnections;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
final Function<String, String> uriTagValue;
Expand All @@ -776,6 +783,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
this.acceptGzip = initializer.acceptGzip;
this.decoder = initializer.decoder;
this.http2Settings = initializer.http2Settings;
this.http2MinConnections = initializer.http2MinConnections;
this.metricsRecorder = initializer.metricsRecorder;
this.observer = observer;
this.uriTagValue = initializer.uriTagValue;
Expand Down Expand Up @@ -812,6 +820,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig
final boolean acceptGzip;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final int http2MinConnections;
final ChannelMetricsRecorder metricsRecorder;
final ChannelOperations.OnSetup opsFactory;
final int protocols;
Expand All @@ -822,6 +831,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig
this.acceptGzip = config.acceptGzip;
this.decoder = config.decoder;
this.http2Settings = config.http2Settings();
this.http2MinConnections = config.http2MinConnections();
this.metricsRecorder = config.metricsRecorderInternal();
this.opsFactory = config.channelOperationsProvider();
this.protocols = config._protocols;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.netty.BaseHttpTest;
import reactor.netty.ByteBufFlux;
import reactor.netty.ByteBufMono;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.FutureMono;
import reactor.netty.NettyPipeline;
import reactor.netty.SocketUtils;
Expand All @@ -107,6 +111,7 @@
import reactor.netty.http.HttpProtocol;
import reactor.netty.http.HttpResources;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRoutes;
import reactor.netty.resources.ConnectionPoolMetrics;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
Expand Down Expand Up @@ -2080,6 +2085,111 @@ void testConnectionNoLifeTimeFixedPoolHttp2() throws Exception {
}
}

@ParameterizedTest
@ValueSource(ints = {0, 4})
void samstest(final int minConnections) throws Exception {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
.configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));

int maxConns = 10;
ConnectionProvider provider =
ConnectionProvider.builder("testConnectionNoLifeTimeFixedPoolHttp2")
.maxConnections(maxConns)
//.pendingAcquireTimeout(Duration.ofMillis(100))
.build();

final AtomicInteger ctr = new AtomicInteger(1);

final class ServerPaths {
final static String HANG = "/hang";
final static String HELLO = "/hello";
}

// TODO This test needs work to make it bullet-proof
final Consumer<HttpServerRoutes> routes = base -> {
base.get(ServerPaths.HELLO, (req, resp) -> {
System.out.println("hello called!");
return resp.sendString(Mono.just("Hello World!").doOnNext(s -> System.out.println("hello returning!")));
}).get(ServerPaths.HANG, (req, resp) -> {
System.out.println("hang called!");
return resp.sendString(
Mono.just("You waited for this?").delayElement(Duration.ofMillis(5_000))
.doOnNext(s -> System.out.println("hang returning!"))
);
});
};

final DisposableServer disposableServer =
createServer()
.doOnConnection(conn -> {
System.out.println(ctr.getAndIncrement() + ". A conn was made to the server!");
})
.protocol(HttpProtocol.H2).secure(spec -> spec.sslContext(serverCtx))
.route(routes)
.bindNow();

Set<ChannelId> channelIds = new java.util.HashSet<>();

assertThat(maxConns).isGreaterThan(minConnections);

final HttpClient client =
createClient(
provider,
disposableServer.port()
).protocol(HttpProtocol.H2)
.http2Settings(settings -> settings.minConnections(minConnections))
.secure(spec -> spec.sslContext(clientCtx))
.doOnConnected(conn -> {
channelIds.add(conn.channel().id());
});

try {
final Function<String, Flux<ChannelId>> getChannelIds = path ->
client.get()
.uri(path)
.responseConnection((res, conn) -> {
assertThat(client.configuration().checkProtocol(HttpClientConfig.h2))
.isTrue()
.describedAs("Should be an HTTP/2 connection");
final Channel streamParent = conn.channel().parent();
return Mono.just(streamParent.id())
.delayUntil(ch -> conn.inbound().receive());
});

// hows this for some hack..
final List<ChannelId> fastReqs = new ArrayList<>();
final List<ChannelId> hangingReqs = new ArrayList<>();

// I will clean all this up to avoid thread sleep + other hacks. Just fast-track to design PR..
Flux.range(1, 2).flatMap(ignore -> getChannelIds.apply("/hang"))
.subscribeOn(Schedulers.boundedElastic())
.subscribe(hangingReqs::add);

Thread.sleep(1_000);

Flux.range(1, 2).flatMap(ignore -> getChannelIds.apply("/hello"))
.subscribeOn(Schedulers.boundedElastic())
.subscribe(fastReqs::add);

Thread.sleep(10_000);

final List<ChannelId> ids = new ArrayList<>();
ids.addAll(hangingReqs);
ids.addAll(fastReqs);

final int expectedSize =
minConnections == 0 ? hangingReqs.size() : minConnections;
assertThat(channelIds).hasSize(expectedSize);
}
finally {
provider.disposeLater()
.block(Duration.ofSeconds(5));
}
}


@Test
void testConnectionNoLifeTimeElasticPoolHttp1() throws Exception {
ConnectionProvider provider =
Expand Down

0 comments on commit 714e576

Please sign in to comment.