diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java index 67952be3ef..fb3d4f5922 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/SimpleConnectionPool.java @@ -48,7 +48,7 @@ public class SimpleConnectionPool implements ConnectionPool, Connection.Listener private final Origin origin; private final ConcurrentLinkedDeque> waitingSubscribers; - private final Queue activeConnections; + private final Queue availableConnections; private final AtomicInteger borrowedCount = new AtomicInteger(); private final SimpleConnectionPool.ConnectionPoolStats stats = new SimpleConnectionPool.ConnectionPoolStats(); private final AtomicInteger connectionAttempts = new AtomicInteger(); @@ -63,7 +63,7 @@ public SimpleConnectionPool(Origin origin, ConnectionPoolSettings poolSettings, this.poolSettings = requireNonNull(poolSettings); this.connectionSettings = new ConnectionSettings(poolSettings.connectTimeoutMillis()); this.connectionFactory = requireNonNull(connectionFactory); - this.activeConnections = new ConcurrentLinkedDeque<>(); + this.availableConnections = new ConcurrentLinkedDeque<>(); this.waitingSubscribers = new ConcurrentLinkedDeque<>(); } @@ -76,26 +76,17 @@ public Publisher borrowConnection() { return Mono.create(sink -> { Connection connection = dequeue(); if (connection != null) { - if (borrowedCount.getAndIncrement() < poolSettings.maxConnectionsPerHost()) { - sink.success(connection); - } else { - borrowedCount.decrementAndGet(); - queueNewConnection(connection); - } + borrowedCount.incrementAndGet(); + sink.success(connection); } else { - if (waitingSubscribers.size() >= poolSettings.maxPendingConnectionsPerHost()) { + if (waitingSubscribers.size() < poolSettings.maxPendingConnectionsPerHost()) { + this.waitingSubscribers.add(sink.onDispose(() -> waitingSubscribers.remove(sink))); + newConnection(); + } else { sink.error(new MaxPendingConnectionsExceededException( origin, poolSettings.maxPendingConnectionsPerHost(), poolSettings.maxPendingConnectionsPerHost())); - } else { - this.waitingSubscribers.add(sink.onCancel(() -> waitingSubscribers.remove(sink))); - int borrowed = borrowedCount.get(); - int inEstablishment = connectionsInEstablishment.getAndIncrement(); - - if ((borrowed + inEstablishment) < poolSettings.maxConnectionsPerHost()) { - newConnection(); - } } } }).timeout( @@ -104,6 +95,14 @@ public Publisher borrowConnection() { } private void newConnection() { + int borrowed = borrowedCount.get(); + int inEstablishment = connectionsInEstablishment.getAndIncrement(); + + if ((borrowed + inEstablishment) >= poolSettings.maxConnectionsPerHost()) { + connectionsInEstablishment.decrementAndGet(); + return; + } + connectionAttempts.incrementAndGet(); newConnection(MAX_ATTEMPTS) .doOnNext(it -> it.addConnectionListener(SimpleConnectionPool.this)) @@ -129,10 +128,10 @@ private Mono newConnection(int attempts) { } private Connection dequeue() { - Connection connection = activeConnections.poll(); + Connection connection = availableConnections.poll(); while (nonNull(connection) && !connection.isConnected()) { - connection = activeConnections.poll(); + connection = availableConnections.poll(); } return connection; @@ -141,7 +140,7 @@ private Connection dequeue() { private void queueNewConnection(Connection connection) { MonoSink subscriber = waitingSubscribers.poll(); if (subscriber == null) { - activeConnections.add(connection); + availableConnections.add(connection); } else { borrowedCount.incrementAndGet(); subscriber.success(connection); @@ -181,7 +180,7 @@ public ConnectionPoolSettings settings() { @Override public void connectionClosed(Connection connection) { terminatedConnections.incrementAndGet(); - activeConnections.remove(connection); + availableConnections.remove(connection); } public ConnectionPool.Stats stats() { @@ -192,7 +191,7 @@ public ConnectionPool.Stats stats() { private class ConnectionPoolStats implements Stats { @Override public int availableConnectionCount() { - return activeConnections.size(); + return availableConnections.size(); } @Override @@ -233,7 +232,7 @@ public int connectionsInEstablishment() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("\nactiveConnections", availableConnectionCount()) + .add("\navailableConnections", availableConnectionCount()) .add("\npendingConnections", pendingConnectionCount()) .add("\nbusyConnections", busyConnectionCount()) .add("\nconnectionAttempts", connectionAttempts()) diff --git a/components/client/src/main/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPool.java b/components/client/src/main/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPool.java index 3e8a2e73b6..2b2fc6c7ea 100644 --- a/components/client/src/main/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPool.java +++ b/components/client/src/main/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPool.java @@ -90,7 +90,7 @@ private static void registerMetrics(ConnectionPool hostConnectionPool, MetricReg scopedRegistry.register("connection-failures", (Gauge) () -> (int) stats.connectionFailures()); scopedRegistry.register("connections-closed", (Gauge) () -> (int) stats.closedConnections()); scopedRegistry.register("connections-terminated", (Gauge) () -> (int) stats.terminatedConnections()); - scopedRegistry.register("in-establishment", (Gauge) () -> (int) stats.connectionsInEstablishment()); + scopedRegistry.register("connections-in-establishment", (Gauge) () -> (int) stats.connectionsInEstablishment()); } private void registerMetrics() { @@ -107,7 +107,7 @@ private void removeMetrics() { MetricRegistry scopedRegistry = getMetricScope(connectionPool); asList("busy-connections", "pending-connections", "available-connections", "ttfb", "connection-attempts", "connection-failures", "connections-closed", "connections-terminated", - "in-establishment") + "connections-in-establishment") .forEach(scopedRegistry::deregister); } diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolFactoryTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolFactoryTest.java index 33a885e095..01109b2660 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolFactoryTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/SimpleConnectionPoolFactoryTest.java @@ -49,7 +49,7 @@ public void registersMetricsUnderOriginsScope() { "origins.test-app.origin-X.connectionspool.pending-connections", "origins.test-app.origin-X.connectionspool.available-connections", "origins.test-app.origin-X.connectionspool.busy-connections", - "origins.test-app.origin-X.connectionspool.in-establishment" + "origins.test-app.origin-X.connectionspool.connections-in-establishment" )); } } \ No newline at end of file diff --git a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPoolTest.java b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPoolTest.java index c9501e92f1..b50f096842 100644 --- a/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPoolTest.java +++ b/components/client/src/test/unit/java/com/hotels/styx/client/connectionpool/StatsReportingConnectionPoolTest.java @@ -45,7 +45,7 @@ public void removesRegisteredMetricsOnClose() { "origins.generic-app.backend-01.connectionspool.busy-connections", "origins.generic-app.backend-01.connectionspool.pending-connections", "origins.generic-app.backend-01.connectionspool.connection-attempts", - "origins.generic-app.backend-01.connectionspool.in-establishment", + "origins.generic-app.backend-01.connectionspool.connections-in-establishment", "origins.generic-app.backend-01.connectionspool.connection-failures", "origins.generic-app.backend-01.connectionspool.connections-closed", "origins.generic-app.backend-01.connectionspool.connections-terminated"