diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java index fb3d4f5922..4239249039 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java @@ -76,11 +76,11 @@ public Publisher borrowConnection() { return Mono.create(sink -> { Connection connection = dequeue(); if (connection != null) { - borrowedCount.incrementAndGet(); - sink.success(connection); + attemptBorrowConnection(sink, connection); } else { if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) { - this.waitingSubscribers.add(sink.onDispose(() -> waitingSubscribers.remove(sink))); + this.waitingSubscribers.add(sink); + sink.onDispose(() -> waitingSubscribers.remove(sink)); newConnection(); } else { sink.error(new MaxPendingConnectionsExceededException( @@ -127,7 +127,8 @@ private Mono newConnection(int attempts) { } } - private Connection dequeue() { + @VisibleForTesting + Connection dequeue() { Connection connection = availableConnections.poll(); while (nonNull(connection) && !connection.isConnected()) { @@ -142,11 +143,18 @@ private void queueNewConnection(Connection connection) { if (subscriber == null) { availableConnections.add(connection); } else { - borrowedCount.incrementAndGet(); - subscriber.success(connection); + attemptBorrowConnection(subscriber, connection); } } + private void attemptBorrowConnection(MonoSink sink, Connection connection) { + borrowedCount.incrementAndGet(); + sink.onCancel(() -> { + returnConnection(connection); + }); + sink.success(connection); + } + public boolean returnConnection(Connection connection) { borrowedCount.decrementAndGet(); if (connection.isConnected()) { diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java index f8587c6bc2..db90125208 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolTest.java @@ -21,6 +21,9 @@ import com.hotels.styx.client.Connection; import com.hotels.styx.client.ConnectionSettings; import org.mockito.Mockito; +import org.mockito.internal.stubbing.answers.AnswersWithDelay; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.reactivestreams.Publisher; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -39,6 +42,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -718,6 +722,88 @@ public void emitsExceptionWhenPendingConnectionTimesOut() { assertEquals(pool.stats().connectionAttempts(), 1); } + @Test + public void shouldNotHandoutConnectionToCancelledSubscriberWhenCreatingNewConnection() throws Exception { + when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))) + .thenReturn(Mono.just(connection1)); + + ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() + .pendingConnectionTimeout(100, MILLISECONDS) + .build(); + + SimpleConnectionPool simpleConnectionPool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = spy(simpleConnectionPool); + when(pool.dequeue()) + .thenAnswer(new AnswersWithDelay(200, new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.callRealMethod(); + } + })); + StepVerifier.create(pool.borrowConnection()) + .expectError(MaxPendingConnectionTimeoutException.class) + .verify(); + + assertEquals(pool.stats().availableConnectionCount(), 1); + assertEquals(pool.stats().pendingConnectionCount(), 0); // Waiting subscribers + assertEquals(pool.stats().busyConnectionCount(), 0); // Borrowed count + } + + @Test + public void shouldNotHandoutConnectionToCancelledSubscriberWhenConnectionIsReturned() throws Exception { + EmitterProcessor processor = EmitterProcessor.create(); + when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))) + .thenReturn(Mono.from(processor)); + ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() + .pendingConnectionTimeout(100, MILLISECONDS) + .build(); + + SimpleConnectionPool simpleConnectionPool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = spy(simpleConnectionPool); + when(pool.dequeue()) + .thenAnswer(new AnswersWithDelay(200, new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.callRealMethod(); + } + })); + + StepVerifier.create(pool.borrowConnection()) + .expectError(MaxPendingConnectionTimeoutException.class) + .verify(); + + pool.returnConnection(connection1); + + assertEquals(pool.stats().availableConnectionCount(), 1); + assertEquals(pool.stats().pendingConnectionCount(), 0); // Waiting subscribers + assertEquals(pool.stats().busyConnectionCount(), -1); // Borrowed count + } + + @Test + public void shouldNotHandoutConnectionToCancelledSubscriberWhenConnectionAlreadyInPool() throws Exception { + ConnectionPoolSettings poolSettings = new ConnectionPoolSettings.Builder() + .pendingConnectionTimeout(100, MILLISECONDS) + .build(); + + SimpleConnectionPool simpleConnectionPool = new SimpleConnectionPool(origin, poolSettings, connectionFactory); + SimpleConnectionPool pool = spy(simpleConnectionPool); + when(pool.dequeue()) + .thenAnswer(new AnswersWithDelay(200, new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return connection1; + } + })); + + StepVerifier.create(pool.borrowConnection()) + .expectError(MaxPendingConnectionTimeoutException.class) + .verify(); + + assertEquals(pool.stats().availableConnectionCount(), 1); + assertEquals(pool.stats().pendingConnectionCount(), 0); // Waiting subscribers + assertEquals(pool.stats().busyConnectionCount(), 0); // Borrowed count + } + @Test public void registersAsConnectionListener() { when(connectionFactory.createConnection(any(Origin.class), any(ConnectionSettings.class))) diff --git a/components/proxy/src/main/java/com/hotels/styx/proxy/StyxBackendServiceClientFactory.java b/components/proxy/src/main/java/com/hotels/styx/proxy/StyxBackendServiceClientFactory.java index a9b5303e87..29c1652318 100644 --- a/components/proxy/src/main/java/com/hotels/styx/proxy/StyxBackendServiceClientFactory.java +++ b/components/proxy/src/main/java/com/hotels/styx/proxy/StyxBackendServiceClientFactory.java @@ -1,5 +1,5 @@ /* - Copyright (C) 2013-2018 Expedia Inc. + Copyright (C) 2013-2019 Expedia Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pom.xml b/pom.xml index 36c639822a..b154632d57 100755 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ 2.0.26.Final 1.1.6 1.0.2 - 3.2.0.RELEASE + 3.3.0.RELEASE 3.0.3