Skip to content

Commit

Permalink
Add API for configuring minimum connections for HTTP/2 connection pool
Browse files Browse the repository at this point in the history
Fixes #1808
  • Loading branch information
violetagg committed Apr 19, 2022
1 parent a6aa5a4 commit 4926455
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@

import java.util.Objects;

import static io.netty.handler.codec.http2.Http2CodecUtil.NUM_STANDARD_SETTINGS;

/**
* A configuration builder to fine tune the {@link Http2Settings}.
*
Expand Down Expand Up @@ -78,6 +80,15 @@ public interface Builder {
*/
Builder maxHeaderListSize(long maxHeaderListSize);

/**
* Sets the {@code SETTINGS_MIN_CONNECTIONS} value.
*
* @param minConnections the {@code SETTINGS_MAX_HEADER_LIST_SIZE} value
* @return {@code this}
* @since 1.0.19
*/
Builder minConnections(int minConnections);

/**
* Sets the {@code SETTINGS_ENABLE_PUSH} value.
*
Expand Down Expand Up @@ -147,6 +158,18 @@ public Long maxHeaderListSize() {
return maxHeaderListSize;
}

/**
* Returns the configured {@code SETTINGS_MIN_CONNECTIONS} value or
* the default {@code 0}.
*
* @return the configured {@code SETTINGS_MIN_CONNECTIONS} value or
* the default {@code 0}.
* @since 1.0.19
*/
public Integer minConnections() {
return minConnections;
}

/**
* Returns the configured {@code SETTINGS_ENABLE_PUSH} value or null.
*
Expand All @@ -171,19 +194,24 @@ public boolean equals(Object o) {
Objects.equals(maxConcurrentStreams, that.maxConcurrentStreams) &&
Objects.equals(maxFrameSize, that.maxFrameSize) &&
maxHeaderListSize.equals(that.maxHeaderListSize) &&
minConnections.equals(that.minConnections) &&
Objects.equals(pushEnabled, that.pushEnabled);
}

@Override
public int hashCode() {
return Objects.hash(headerTableSize, initialWindowSize, maxConcurrentStreams, maxFrameSize, maxHeaderListSize, pushEnabled);
return Objects.hash(headerTableSize, initialWindowSize, maxConcurrentStreams, maxFrameSize, maxHeaderListSize,
minConnections, pushEnabled);
}

static final char SETTINGS_MIN_CONNECTIONS = NUM_STANDARD_SETTINGS + 1;

final Long headerTableSize;
final Integer initialWindowSize;
final Long maxConcurrentStreams;
final Integer maxFrameSize;
final Long maxHeaderListSize;
final Integer minConnections;
final Boolean pushEnabled;

Http2SettingsSpec(Build build) {
Expand All @@ -193,11 +221,19 @@ public int hashCode() {
maxConcurrentStreams = settings.maxConcurrentStreams();
maxFrameSize = settings.maxFrameSize();
maxHeaderListSize = settings.maxHeaderListSize();
minConnections = settings.getIntValue(SETTINGS_MIN_CONNECTIONS);
pushEnabled = settings.pushEnabled();
}

static final class Build implements Builder {
final Http2Settings http2Settings = Http2Settings.defaultSettings();
static final Long DEFAULT_MIN_CONNECTIONS = 0L;

final Http2Settings http2Settings;

Build() {
http2Settings = Http2Settings.defaultSettings();
http2Settings.put(SETTINGS_MIN_CONNECTIONS, DEFAULT_MIN_CONNECTIONS);
}

@Override
public Http2SettingsSpec build() {
Expand Down Expand Up @@ -234,6 +270,15 @@ public Builder maxHeaderListSize(long maxHeaderListSize) {
return this;
}

@Override
public Builder minConnections(int minConnections) {
if (minConnections < 0) {
throw new IllegalArgumentException("Setting MIN_CONNECTIONS is invalid: " + minConnections);
}
http2Settings.put(SETTINGS_MIN_CONNECTIONS, Long.valueOf(minConnections));
return this;
}

/*
@Override
public Builder pushEnabled(boolean pushEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime()));
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime(),
this.config.http2Settings != null ? this.config.http2Settings.minConnections() : 0));
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
* <li>{@link PoolMetrics#idleSize()} always returns {@code 0}.</li>
* </ul>
* <p>
* If minimum connections is specified, the cached connections with active streams will be kept at that minimum
* (can be the best effort). However, if the cached connections have reached max concurrent streams,
* then new connections will be allocated up to the maximum connections limit.
* <p>
* Configurations that are not applicable
* <ul>
* <li>{@link PoolConfig#destroyHandler()} - the destroy handler cannot be used as the destruction is more complex.</li>
Expand All @@ -94,7 +98,6 @@
* <li>{@link PoolConfig#reuseIdleResourcesInLruOrder()} - FIFO is used when checking the connections.</li>
* <li>FIFO is used when obtaining the pending borrowers</li>
* <li>Warm up functionality is not supported</li>
* <li>Setting minimum connections configuration is not supported</li>
* </ul>
* <p>This class is based on
* https://github.com/reactor/reactor-pool/blob/v0.2.7/src/main/java/reactor/pool/SimpleDequePool.java
Expand Down Expand Up @@ -141,18 +144,24 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.

final Clock clock;
final long maxLifeTime;
final int minConnections;
final PoolConfig<Connection> poolConfig;

long lastInteractionTimestamp;

Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime) {
Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime, int minConnections) {
if (poolConfig.allocationStrategy().getPermits(0) != 0) {
throw new IllegalArgumentException("No support for configuring minimum number of connections");
throw new IllegalArgumentException(
"No support for configuring minimum number of connections via AllocationStrategy");
}
if (minConnections > poolConfig.allocationStrategy().permitMaximum()) {
throw new IllegalArgumentException("Minimum number of connections must be less than or equal to maximum");
}
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
this.maxLifeTime = maxLifeTime;
this.minConnections = minConnections;
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;

Expand Down Expand Up @@ -310,8 +319,11 @@ void drainLoop() {
int borrowersCount = pendingSize;

if (borrowersCount != 0) {
int resourcesCount = resources.size();
// find a connection that can be used for opening a new stream
Slot slot = findConnection(resources);
// when cached connections are below minimum connections, then allocate a new connection
boolean belowMinConnections = minConnections > 0 && resourcesCount < minConnections;
Slot slot = belowMinConnections ? null : findConnection(resources, resourcesCount);
if (slot != null) {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
Expand All @@ -338,54 +350,59 @@ void drainLoop() {
}
}
else {
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower extraneous = pollPending(borrowers, true);
if (extraneous != null) {
pendingAcquireLimitReached(extraneous, maxPending);
}
}
}
if (belowMinConnections && poolConfig.allocationStrategy().permitGranted() >= minConnections) {
// connections allocations were triggered
}
else {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
continue;
int permits = poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = pendingSize;
int toCull = borrowersCount - maxPending;
for (int i = 0; i < toCull; i++) {
Borrower extraneous = pollPending(borrowers, true);
if (extraneous != null) {
pendingAcquireLimitReached(extraneous, maxPending);
}
}
}
}
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
else {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null) {
continue;
}
if (isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
Mono<Connection> allocator = poolConfig.allocator();
Mono<Connection> primary =
allocator.doOnEach(sig -> {
if (sig.isOnNext()) {
Connection newInstance = sig.get();
assert newInstance != null;
Slot newSlot = new Slot(this, newInstance);
if (log.isDebugEnabled()) {
log.debug(format(newInstance.channel(), "Channel activated"));
}
ACQUIRED.incrementAndGet(this);
newSlot.incrementConcurrencyAndGet();
newSlot.deactivate();
borrower.deliver(new Http2PooledRef(newSlot));
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
poolConfig.allocationStrategy().returnPermits(1);
borrower.fail(error);
}
})
.contextWrite(borrower.currentContext());

primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
borrower.stopPendingCountdown();
Mono<Connection> allocator = poolConfig.allocator();
Mono<Connection> primary =
allocator.doOnEach(sig -> {
if (sig.isOnNext()) {
Connection newInstance = sig.get();
assert newInstance != null;
Slot newSlot = new Slot(this, newInstance);
if (log.isDebugEnabled()) {
log.debug(format(newInstance.channel(), "Channel activated"));
}
ACQUIRED.incrementAndGet(this);
newSlot.incrementConcurrencyAndGet();
newSlot.deactivate();
borrower.deliver(new Http2PooledRef(newSlot));
}
else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
poolConfig.allocationStrategy().returnPermits(1);
borrower.fail(error);
}
})
.contextWrite(borrower.currentContext());

primary.subscribe(alreadyPropagated -> {}, alreadyPropagatedOrLogged -> drain(), this::drain);
}
}
}
Expand All @@ -398,8 +415,7 @@ else if (sig.isOnError()) {
}

@Nullable
Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
int resourcesCount = resources.size();
Slot findConnection(ConcurrentLinkedQueue<Slot> resources, int resourcesCount) {
while (resourcesCount > 0) {
// There are connections in the queue

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static reactor.netty.http.Http2SettingsSpec.Build.DEFAULT_MIN_CONNECTIONS;

class Http2SettingsSpecTests {

Expand All @@ -40,6 +41,7 @@ void headerTableSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -59,6 +61,7 @@ void initialWindowSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -78,6 +81,7 @@ void maxConcurrentStreams() {
assertThat(spec.maxConcurrentStreams()).isEqualTo(123);
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -97,6 +101,7 @@ void maxFrameSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isEqualTo(16384);
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -116,6 +121,7 @@ void maxHeaderListSize() {
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(123);
assertThat(spec.minConnections()).isEqualTo(DEFAULT_MIN_CONNECTIONS.intValue());
assertThat(spec.pushEnabled()).isNull();
}

Expand All @@ -126,6 +132,26 @@ void maxHeaderListSizeBadValues() {
.withMessage("Setting MAX_HEADER_LIST_SIZE is invalid: -1");
}

@Test
void minConnections() {
builder.minConnections(4);
Http2SettingsSpec spec = builder.build();
assertThat(spec.headerTableSize()).isNull();
assertThat(spec.initialWindowSize()).isNull();
assertThat(spec.maxConcurrentStreams()).isNull();
assertThat(spec.maxFrameSize()).isNull();
assertThat(spec.maxHeaderListSize()).isEqualTo(Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE);
assertThat(spec.minConnections()).isEqualTo(4);
assertThat(spec.pushEnabled()).isNull();
}

@Test
void minConnectionsBadValues() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> builder.minConnections(-1))
.withMessage("Setting MIN_CONNECTIONS is invalid: -1");
}

/*
@Test
public void pushEnabled() {
Expand Down
Loading

0 comments on commit 4926455

Please sign in to comment.