Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add close() method to connectionPool so connections are closed when origins are reloaded #551

Merged
merged 8 commits into from
Dec 4, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SimpleConnectionPool implements ConnectionPool, Connection.Listener
private final AtomicInteger terminatedConnections = new AtomicInteger();
private final AtomicInteger connectionFailures = new AtomicInteger();
private final AtomicInteger connectionsInEstablishment = new AtomicInteger();
private volatile boolean active;


public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings, Connection.Factory connectionFactory) {
Expand All @@ -65,6 +66,7 @@ public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings,
this.connectionFactory = requireNonNull(connectionFactory);
this.availableConnections = new ConcurrentLinkedDeque<>();
this.waitingSubscribers = new ConcurrentLinkedDeque<>();
this.active = true;
}

public Origin getOrigin() {
Expand All @@ -73,25 +75,29 @@ public Origin getOrigin() {

@Override
public Publisher<Connection> borrowConnection() {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
attemptBorrowConnection(sink, connection);
} else {
if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) {
this.waitingSubscribers.add(sink);
sink.onDispose(() -> waitingSubscribers.remove(sink));
newConnection();
if (active) {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
attemptBorrowConnection(sink, connection);
} else {
sink.error(new MaxPendingConnectionsExceededException(
origin,
poolSettings.maxPendingConnectionsPerHost(),
poolSettings.maxPendingConnectionsPerHost()));
if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) {
this.waitingSubscribers.add(sink);
sink.onDispose(() -> waitingSubscribers.remove(sink));
newConnection();
} else {
sink.error(new MaxPendingConnectionsExceededException(
origin,
poolSettings.maxPendingConnectionsPerHost(),
poolSettings.maxPendingConnectionsPerHost()));
}
}
}
}).timeout(
Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()),
Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis())));
}).timeout(
Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()),
Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis())));
} else {
return Mono.error(() -> new IllegalStateException("Pool is closed"));
}
}

private void newConnection() {
Expand Down Expand Up @@ -158,17 +164,26 @@ private void attemptBorrowConnection(MonoSink<Connection> sink, Connection conne
public boolean returnConnection(Connection connection) {
borrowedCount.decrementAndGet();
if (connection.isConnected()) {
queueNewConnection(connection);
if (active) {
queueNewConnection(connection);
} else {
doCloseConnection(connection);
}
dvlato marked this conversation as resolved.
Show resolved Hide resolved
}
return false;
}

public boolean closeConnection(Connection connection) {
private void doCloseConnection(Connection connection) {
connection.close();
borrowedCount.decrementAndGet();
closedConnections.incrementAndGet();
}

newConnection();
public boolean closeConnection(Connection connection) {
borrowedCount.decrementAndGet();
doCloseConnection(connection);
if (active) {
newConnection();
}
dvlato marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

Expand All @@ -191,6 +206,17 @@ public void connectionClosed(Connection connection) {
availableConnections.remove(connection);
}

@Override
public void close() {
active = false;
Connection con;
while ((con = availableConnections.poll()) != null) {
if (con.isConnected()) {
doCloseConnection(con);
}
}
}

public ConnectionPool.Stats stats() {
return this.stats;
}
Expand Down Expand Up @@ -237,6 +263,7 @@ public int connectionsInEstablishment() {
return connectionsInEstablishment.get();
}


@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -418,12 +419,8 @@ public void closesConnections() {
.thenReturn(Mono.just(connection3))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
Expand All @@ -444,12 +441,7 @@ public void closeConnectionDecrementsBorrowedCount() {
.thenReturn(Mono.just(connection3))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

// Saturate pool:
StepVerifier.create(pool.borrowConnection())
Expand Down Expand Up @@ -537,12 +529,8 @@ public void idleActiveConnectionMakesRoomForOthers() {
.thenReturn(Mono.just(connection3))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

// Create a new connection
StepVerifier.create(pool.borrowConnection())
Expand Down Expand Up @@ -591,12 +579,8 @@ public void borrowRetriesThreeTimesOnConnectionEstablishmentFailure() {
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection4)
Expand All @@ -611,12 +595,7 @@ public void borrowRetriesThreeTimesOnFailureDueToConnectionClosure() {
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
Expand All @@ -643,12 +622,7 @@ public void borrowGivesUpConnectionEstablishmentAttemptAfterThreeTries() {
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

Mono.from(pool.borrowConnection()).subscribe();

Expand All @@ -667,12 +641,8 @@ public void connectionEstablishmentFailureRetryThreeTimesOnlyAtConnectionClosure
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
Expand Down Expand Up @@ -921,4 +891,113 @@ public void ensureInEstablishmentCountIsDecrementedInError() throws InterruptedE
.verifyComplete();
}

@Test
public void availableConnectionsAreClosedAsPartOfPoolClosure() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));

SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.consumeNextWith(pool::returnConnection)
.verifyComplete();

assertEquals(1, pool.stats().availableConnectionCount());
pool.close();

verify(connection1).close();
assertEquals(0, pool.stats().availableConnectionCount());
assertEquals(0, pool.stats().busyConnectionCount());
}

@Test
public void emitsExceptionWhenBrrowingFromClosedPool() {

SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);
pool.close();
StepVerifier.create(pool.borrowConnection())
.expectError(IllegalStateException.class)
.verify();
assertEquals(pool.stats().connectionAttempts(), 0);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(pool.stats().availableConnectionCount(), 0);
assertEquals(pool.stats().pendingConnectionCount(), 0);
}

@Test
public void justClosesConnectionWhenReturningToClosedPool() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))).thenReturn(Mono.just(connection1));
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
.verifyComplete();
assertEquals(pool.stats().connectionAttempts(), 1);
assertEquals(pool.stats().busyConnectionCount(), 1);

pool.close();
assertEquals(0, pool.stats(). closedConnections());

doAnswer( arg -> {
when(connection1.isConnected()).thenReturn(false);
return true;
}).when(connection1).close();
pool.returnConnection(connection1);
assertEquals(pool.stats().connectionAttempts(), 1);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(1, pool.stats(). closedConnections());
}

@Test
public void closedPoolDoesNotOpenNewConnectionWhenConnectionIsClosed() {

when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));


SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
.verifyComplete();

assertEquals(1, pool.stats().busyConnectionCount());
//Close connection pool
pool.close();
pool.closeConnection(connection1);

verify(connection1).close();
assertEquals(pool.stats().terminatedConnections(), 0);
assertEquals(pool.stats().closedConnections(), 1);
assertEquals(0, pool.stats().busyConnectionCount());
}

@Test
public void purgesTerminatedConnectionsForClosedPools() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));

// Create a new connection
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
.verifyComplete();
//Close the pool
pool.close();
//Close and return the connection
pool.connectionClosed(connection1);
when(connection1.isConnected()).thenReturn(false);
pool.returnConnection(connection1);

assertEquals(pool.stats().connectionAttempts(), 1);
assertEquals(pool.stats().pendingConnectionCount(), 0);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(pool.stats().availableConnectionCount(), 0);

assertEquals(pool.stats().terminatedConnections(), 1);
assertEquals(pool.stats().closedConnections(), 0);
}


dvlato marked this conversation as resolved.
Show resolved Hide resolved
}