diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java index 314ebfbbcd..a756202886 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java @@ -56,6 +56,7 @@ public class SimpleConnectionPool implements ConnectionPool, Connection.Listener private final AtomicInteger terminatedConnections = new AtomicInteger(); private final AtomicInteger connectionFailures = new AtomicInteger(); private final AtomicInteger connectionsInEstablishment = new AtomicInteger(); + private volatile boolean active; public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings, Connection.Factory connectionFactory) { @@ -65,6 +66,7 @@ public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings, this.connectionFactory = requireNonNull(connectionFactory); this.availableConnections = new ConcurrentLinkedDeque<>(); this.waitingSubscribers = new ConcurrentLinkedDeque<>(); + this.active = true; } public Origin getOrigin() { @@ -73,25 +75,29 @@ public Origin getOrigin() { @Override public Publisher borrowConnection() { - return Mono.create(sink -> { - Connection connection = dequeue(); - if (connection != null) { - attemptBorrowConnection(sink, connection); - } else { - if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) { - this.waitingSubscribers.add(sink); - sink.onDispose(() -> waitingSubscribers.remove(sink)); - newConnection(); + if (active) { + return Mono.create(sink -> { + Connection connection = dequeue(); + if (connection != null) { + attemptBorrowConnection(sink, connection); } else { - sink.error(new MaxPendingConnectionsExceededException( - origin, - poolSettings.maxPendingConnectionsPerHost(), - poolSettings.maxPendingConnectionsPerHost())); + if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) { + this.waitingSubscribers.add(sink); + sink.onDispose(() -> waitingSubscribers.remove(sink)); + newConnection(); + } else { + sink.error(new MaxPendingConnectionsExceededException( + origin, + poolSettings.maxPendingConnectionsPerHost(), + poolSettings.maxPendingConnectionsPerHost())); + } } - } - }).timeout( - Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()), - Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis()))); + }).timeout( + Duration.ofMillis(poolSettings.pendingConnectionTimeoutMillis()), + Mono.error(() -> new MaxPendingConnectionTimeoutException(origin, connectionSettings.connectTimeoutMillis()))); + } else { + return Mono.error(() -> new IllegalStateException("Pool is closed")); + } } private void newConnection() { @@ -158,17 +164,26 @@ private void attemptBorrowConnection(MonoSink sink, Connection conne public boolean returnConnection(Connection connection) { borrowedCount.decrementAndGet(); if (connection.isConnected()) { - queueNewConnection(connection); + if (active) { + queueNewConnection(connection); + } else { + doCloseConnection(connection); + } } return false; } - public boolean closeConnection(Connection connection) { + private void doCloseConnection(Connection connection) { connection.close(); - borrowedCount.decrementAndGet(); closedConnections.incrementAndGet(); + } - newConnection(); + public boolean closeConnection(Connection connection) { + borrowedCount.decrementAndGet(); + doCloseConnection(connection); + if (active) { + newConnection(); + } return true; } @@ -191,6 +206,17 @@ public void connectionClosed(Connection connection) { availableConnections.remove(connection); } + @Override + public void close() { + active = false; + Connection con; + while ((con = availableConnections.poll()) != null) { + if (con.isConnected()) { + doCloseConnection(con); + } + } + } + public ConnectionPool.Stats stats() { return this.stats; } @@ -237,6 +263,7 @@ public int connectionsInEstablishment() { return connectionsInEstablishment.get(); } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java index 30863c2109..20eb4fba1c 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java @@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -418,12 +419,8 @@ public void closesConnections() { .thenReturn(Mono.just(connection3)) .thenReturn(Mono.just(connection4)); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); StepVerifier.create(pool.borrowConnection()) .expectNext(connection1) @@ -444,12 +441,7 @@ public void closeConnectionDecrementsBorrowedCount() { .thenReturn(Mono.just(connection3)) .thenReturn(Mono.just(connection4)); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); // Saturate pool: StepVerifier.create(pool.borrowConnection()) @@ -537,12 +529,8 @@ public void idleActiveConnectionMakesRoomForOthers() { .thenReturn(Mono.just(connection3)) .thenReturn(Mono.just(connection4)); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); // Create a new connection StepVerifier.create(pool.borrowConnection()) @@ -591,12 +579,8 @@ public void borrowRetriesThreeTimesOnConnectionEstablishmentFailure() { .thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException()))) .thenReturn(Mono.just(connection4)); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); StepVerifier.create(pool.borrowConnection()) .expectNext(connection4) @@ -611,12 +595,7 @@ public void borrowRetriesThreeTimesOnFailureDueToConnectionClosure() { .thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException()))) .thenReturn(Mono.just(connection4)); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); StepVerifier.create(pool.borrowConnection()) .expectNext(connection1) @@ -643,12 +622,7 @@ public void borrowGivesUpConnectionEstablishmentAttemptAfterThreeTries() { .thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException()))) .thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException()))); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); Mono.from(pool.borrowConnection()).subscribe(); @@ -667,12 +641,8 @@ public void connectionEstablishmentFailureRetryThreeTimesOnlyAtConnectionClosure .thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException()))) .thenReturn(Mono.error(new OriginUnreachableException(origin, new RuntimeException()))); - ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() - .maxConnectionsPerHost(2) - .maxPendingConnectionsPerHost(2) - .build(); - SimpleConnectionPool pool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); StepVerifier.create(pool.borrowConnection()) .expectNext(connection1) @@ -921,4 +891,113 @@ public void ensureInEstablishmentCountIsDecrementedInError() throws InterruptedE .verifyComplete(); } + @Test + public void availableConnectionsAreClosedAsPartOfPoolClosure() { + when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))) + .thenReturn(Mono.just(connection1)); + + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); + + StepVerifier.create(pool.borrowConnection()) + .consumeNextWith(pool::returnConnection) + .verifyComplete(); + + assertEquals(1, pool.stats().availableConnectionCount()); + pool.close(); + + verify(connection1).close(); + assertEquals(0, pool.stats().availableConnectionCount()); + assertEquals(0, pool.stats().busyConnectionCount()); + } + + @Test + public void emitsExceptionWhenBrrowingFromClosedPool() { + + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); + pool.close(); + StepVerifier.create(pool.borrowConnection()) + .expectError(IllegalStateException.class) + .verify(); + assertEquals(pool.stats().connectionAttempts(), 0); + assertEquals(pool.stats().busyConnectionCount(), 0); + assertEquals(pool.stats().availableConnectionCount(), 0); + assertEquals(pool.stats().pendingConnectionCount(), 0); + } + + @Test + public void justClosesConnectionWhenReturningToClosedPool() { + when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))).thenReturn(Mono.just(connection1)); + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); + + StepVerifier.create(pool.borrowConnection()) + .expectNext(connection1) + .verifyComplete(); + assertEquals(pool.stats().connectionAttempts(), 1); + assertEquals(pool.stats().busyConnectionCount(), 1); + + pool.close(); + assertEquals(0, pool.stats(). closedConnections()); + + doAnswer( arg -> { + when(connection1.isConnected()).thenReturn(false); + return true; + }).when(connection1).close(); + pool.returnConnection(connection1); + assertEquals(pool.stats().connectionAttempts(), 1); + assertEquals(pool.stats().busyConnectionCount(), 0); + assertEquals(1, pool.stats(). closedConnections()); + } + + @Test + public void closedPoolDoesNotOpenNewConnectionWhenConnectionIsClosed() { + + when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))) + .thenReturn(Mono.just(connection1)); + + + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); + + StepVerifier.create(pool.borrowConnection()) + .expectNext(connection1) + .verifyComplete(); + + assertEquals(1, pool.stats().busyConnectionCount()); + //Close connection pool + pool.close(); + pool.closeConnection(connection1); + + verify(connection1).close(); + assertEquals(pool.stats().terminatedConnections(), 0); + assertEquals(pool.stats().closedConnections(), 1); + assertEquals(0, pool.stats().busyConnectionCount()); + } + + @Test + public void purgesTerminatedConnectionsForClosedPools() { + when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))) + .thenReturn(Mono.just(connection1)); + + // Create a new connection + SimpleConnectionPool pool = new SimpleConnectionPool(origin, defaultConnectionPoolSettings(), connectionFactory); + + StepVerifier.create(pool.borrowConnection()) + .expectNext(connection1) + .verifyComplete(); + //Close the pool + pool.close(); + //Close and return the connection + pool.connectionClosed(connection1); + when(connection1.isConnected()).thenReturn(false); + pool.returnConnection(connection1); + + assertEquals(pool.stats().connectionAttempts(), 1); + assertEquals(pool.stats().pendingConnectionCount(), 0); + assertEquals(pool.stats().busyConnectionCount(), 0); + assertEquals(pool.stats().availableConnectionCount(), 0); + + assertEquals(pool.stats().terminatedConnections(), 1); + assertEquals(pool.stats().closedConnections(), 0); + } + + }