Skip to content

Commit

Permalink
Add close() method to connectionPool so connections are closed when o…
Browse files Browse the repository at this point in the history
…rigins are reloaded (#551)

Implemented close() method in SimpleConnectionPool so old connections are purged when an origin is reloaded.
  • Loading branch information
dvlato authored Dec 4, 2019
1 parent 7235bb3 commit 9da2fd3
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SimpleConnectionPool implements ConnectionPool, Connection.Listener
private final AtomicInteger terminatedConnections = new AtomicInteger();
private final AtomicInteger connectionFailures = new AtomicInteger();
private final AtomicInteger connectionsInEstablishment = new AtomicInteger();
private volatile boolean active;


public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings, Connection.Factory connectionFactory) {
Expand All @@ -65,6 +66,7 @@ public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings,
this.connectionFactory = requireNonNull(connectionFactory);
this.availableConnections = new ConcurrentLinkedDeque<>();
this.waitingSubscribers = new ConcurrentLinkedDeque<>();
this.active = true;
}

public Origin getOrigin() {
Expand All @@ -73,25 +75,29 @@ public Origin getOrigin() {

@Override
public Publisher<Connection> borrowConnection() {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
attemptBorrowConnection(sink, connection);
} else {
if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) {
this.waitingSubscribers.add(sink);
sink.onDispose(() -> waitingSubscribers.remove(sink));
newConnection();
if (active) {
return Mono.<Connection>create(sink -> {
Connection connection = dequeue();
if (connection != null) {
attemptBorrowConnection(sink, connection);
} else {
sink.error(new MaxPendingConnectionsExceededException(
origin,
poolSettings.maxPendingConnectionsPerHost(),
poolSettings.maxPendingConnectionsPerHost()));
if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) {
this.waitingSubscribers.add(sink);
sink.onDispose(() -> waitingSubscribers.remove(sink));
newConnection();
} else {
sink.error(new MaxPendingConnectionsExceededException(
origin,
poolSettings.maxPendingConnectionsPerHost(),
poolSettings.maxPendingConnectionsPerHost()));
}
}
}
}).timeout(
Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()),
Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis())));
}).timeout(
Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()),
Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis())));
} else {
return Mono.error(() -> new IllegalStateException("Pool is closed"));
}
}

private void newConnection() {
Expand Down Expand Up @@ -158,17 +164,26 @@ private void attemptBorrowConnection(MonoSink<Connection> sink, Connection conne
public boolean returnConnection(Connection connection) {
borrowedCount.decrementAndGet();
if (connection.isConnected()) {
queueNewConnection(connection);
if (active) {
queueNewConnection(connection);
} else {
doCloseConnection(connection);
}
}
return false;
}

public boolean closeConnection(Connection connection) {
private void doCloseConnection(Connection connection) {
connection.close();
borrowedCount.decrementAndGet();
closedConnections.incrementAndGet();
}

newConnection();
public boolean closeConnection(Connection connection) {
borrowedCount.decrementAndGet();
doCloseConnection(connection);
if (active) {
newConnection();
}
return true;
}

Expand All @@ -191,6 +206,17 @@ public void connectionClosed(Connection connection) {
availableConnections.remove(connection);
}

@Override
public void close() {
active = false;
Connection con;
while ((con = availableConnections.poll()) != null) {
if (con.isConnected()) {
doCloseConnection(con);
}
}
}

public ConnectionPool.Stats stats() {
return this.stats;
}
Expand Down Expand Up @@ -237,6 +263,7 @@ public int connectionsInEstablishment() {
return connectionsInEstablishment.get();
}


@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -418,12 +419,8 @@ public void closesConnections() {
.thenReturn(Mono.just(connection3))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
Expand All @@ -444,12 +441,7 @@ public void closeConnectionDecrementsBorrowedCount() {
.thenReturn(Mono.just(connection3))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

// Saturate pool:
StepVerifier.create(pool.borrowConnection())
Expand Down Expand Up @@ -537,12 +529,8 @@ public void idleActiveConnectionMakesRoomForOthers() {
.thenReturn(Mono.just(connection3))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

// Create a new connection
StepVerifier.create(pool.borrowConnection())
Expand Down Expand Up @@ -591,12 +579,8 @@ public void borrowRetriesThreeTimesOnConnectionEstablishmentFailure() {
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection4)
Expand All @@ -611,12 +595,7 @@ public void borrowRetriesThreeTimesOnFailureDueToConnectionClosure() {
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.just(connection4));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
Expand All @@ -643,12 +622,7 @@ public void borrowGivesUpConnectionEstablishmentAttemptAfterThreeTries() {
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

Mono.from(pool.borrowConnection()).subscribe();

Expand All @@ -667,12 +641,8 @@ public void connectionEstablishmentFailureRetryThreeTimesOnlyAtConnectionClosure
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())))
.thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException())));

ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder()
.maxConnectionsPerHost(2)
.maxPendingConnectionsPerHost(2)
.build();

SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory);
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
Expand Down Expand Up @@ -921,4 +891,113 @@ public void ensureInEstablishmentCountIsDecrementedInError() throws InterruptedE
.verifyComplete();
}

@Test
public void availableConnectionsAreClosedAsPartOfPoolClosure() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));

SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.consumeNextWith(pool::returnConnection)
.verifyComplete();

assertEquals(1, pool.stats().availableConnectionCount());
pool.close();

verify(connection1).close();
assertEquals(0, pool.stats().availableConnectionCount());
assertEquals(0, pool.stats().busyConnectionCount());
}

@Test
public void emitsExceptionWhenBrrowingFromClosedPool() {

SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);
pool.close();
StepVerifier.create(pool.borrowConnection())
.expectError(IllegalStateException.class)
.verify();
assertEquals(pool.stats().connectionAttempts(), 0);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(pool.stats().availableConnectionCount(), 0);
assertEquals(pool.stats().pendingConnectionCount(), 0);
}

@Test
public void justClosesConnectionWhenReturningToClosedPool() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))).thenReturn(Mono.just(connection1));
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
.verifyComplete();
assertEquals(pool.stats().connectionAttempts(), 1);
assertEquals(pool.stats().busyConnectionCount(), 1);

pool.close();
assertEquals(0, pool.stats(). closedConnections());

doAnswer( arg -> {
when(connection1.isConnected()).thenReturn(false);
return true;
}).when(connection1).close();
pool.returnConnection(connection1);
assertEquals(pool.stats().connectionAttempts(), 1);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(1, pool.stats(). closedConnections());
}

@Test
public void closedPoolDoesNotOpenNewConnectionWhenConnectionIsClosed() {

when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));


SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
.verifyComplete();

assertEquals(1, pool.stats().busyConnectionCount());
//Close connection pool
pool.close();
pool.closeConnection(connection1);

verify(connection1).close();
assertEquals(pool.stats().terminatedConnections(), 0);
assertEquals(pool.stats().closedConnections(), 1);
assertEquals(0, pool.stats().busyConnectionCount());
}

@Test
public void purgesTerminatedConnectionsForClosedPools() {
when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class)))
.thenReturn(Mono.just(connection1));

// Create a new connection
SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory);

StepVerifier.create(pool.borrowConnection())
.expectNext(connection1)
.verifyComplete();
//Close the pool
pool.close();
//Close and return the connection
pool.connectionClosed(connection1);
when(connection1.isConnected()).thenReturn(false);
pool.returnConnection(connection1);

assertEquals(pool.stats().connectionAttempts(), 1);
assertEquals(pool.stats().pendingConnectionCount(), 0);
assertEquals(pool.stats().busyConnectionCount(), 0);
assertEquals(pool.stats().availableConnectionCount(), 0);

assertEquals(pool.stats().terminatedConnections(), 1);
assertEquals(pool.stats().closedConnections(), 0);
}


}

0 comments on commit 9da2fd3

Please sign in to comment.