diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java index e0f8413662..862aa2ce9f 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java @@ -65,6 +65,14 @@ public interface AsyncHttpClientConfig { */ int getMaxConnectionsPerHost(); + /** + * Return the maximum duration in milliseconds an {@link AsyncHttpClient} can wait to acquire a free channel + * + * @return Return the maximum duration in milliseconds an {@link AsyncHttpClient} can wait to acquire a free channel + */ + int getAcquireFreeChannelTimeout(); + + /** * Return the maximum time in millisecond an {@link AsyncHttpClient} can wait when connecting to a remote host * diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java index 96d95f91bd..d26612fb6d 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java @@ -84,6 +84,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig { private final int connectionTtl; private final int maxConnections; private final int maxConnectionsPerHost; + private final int acquireFreeChannelTimeout; private final ChannelPool channelPool; private final ConnectionSemaphoreFactory connectionSemaphoreFactory; private final KeepAliveStrategy keepAliveStrategy; @@ -163,6 +164,7 @@ private DefaultAsyncHttpClientConfig(// http int connectionTtl, int maxConnections, int maxConnectionsPerHost, + int acquireFreeChannelTimeout, ChannelPool channelPool, ConnectionSemaphoreFactory connectionSemaphoreFactory, KeepAliveStrategy keepAliveStrategy, @@ -250,6 +252,7 @@ private DefaultAsyncHttpClientConfig(// http this.connectionTtl = connectionTtl; this.maxConnections = maxConnections; this.maxConnectionsPerHost = maxConnectionsPerHost; + this.acquireFreeChannelTimeout = acquireFreeChannelTimeout; this.channelPool = channelPool; this.connectionSemaphoreFactory = connectionSemaphoreFactory; this.keepAliveStrategy = keepAliveStrategy; @@ -445,6 +448,9 @@ public int getMaxConnectionsPerHost() { return maxConnectionsPerHost; } + @Override + public int getAcquireFreeChannelTimeout() { return acquireFreeChannelTimeout; } + @Override public ChannelPool getChannelPool() { return channelPool; @@ -696,6 +702,7 @@ public static class Builder { private int connectionTtl = defaultConnectionTtl(); private int maxConnections = defaultMaxConnections(); private int maxConnectionsPerHost = defaultMaxConnectionsPerHost(); + private int acquireFreeChannelTimeout = defaultAcquireFreeChannelTimeout(); private ChannelPool channelPool; private ConnectionSemaphoreFactory connectionSemaphoreFactory; private KeepAliveStrategy keepAliveStrategy = new DefaultKeepAliveStrategy(); @@ -991,6 +998,16 @@ public Builder setMaxConnectionsPerHost(int maxConnectionsPerHost) { return this; } + /** + * Sets the maximum duration in milliseconds to acquire a free channel to send a request + * @param acquireFreeChannelTimeout maximum duration in milliseconds to acquire a free channel to send a request + * @return the same builder instance + */ + public Builder setAcquireFreeChannelTimeout(int acquireFreeChannelTimeout) { + this.acquireFreeChannelTimeout = acquireFreeChannelTimeout; + return this; + } + public Builder setChannelPool(ChannelPool channelPool) { this.channelPool = channelPool; return this; @@ -1249,6 +1266,7 @@ public DefaultAsyncHttpClientConfig build() { connectionTtl, maxConnections, maxConnectionsPerHost, + acquireFreeChannelTimeout, channelPool, connectionSemaphoreFactory, keepAliveStrategy, diff --git a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java index 274537a6ad..fa073bc82f 100644 --- a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java +++ b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java @@ -22,6 +22,7 @@ public final class AsyncHttpClientConfigDefaults { public static final String THREAD_POOL_NAME_CONFIG = "threadPoolName"; public static final String MAX_CONNECTIONS_CONFIG = "maxConnections"; public static final String MAX_CONNECTIONS_PER_HOST_CONFIG = "maxConnectionsPerHost"; + public static final String ACQUIRE_FREE_CHANNEL_TIMEOUT = "acquireFreeChannelTimeout"; public static final String CONNECTION_TIMEOUT_CONFIG = "connectTimeout"; public static final String POOLED_CONNECTION_IDLE_TIMEOUT_CONFIG = "pooledConnectionIdleTimeout"; public static final String CONNECTION_POOL_CLEANER_PERIOD_CONFIG = "connectionPoolCleanerPeriod"; @@ -39,7 +40,7 @@ public final class AsyncHttpClientConfigDefaults { public static final String USE_PROXY_PROPERTIES_CONFIG = "useProxyProperties"; public static final String VALIDATE_RESPONSE_HEADERS_CONFIG = "validateResponseHeaders"; public static final String AGGREGATE_WEBSOCKET_FRAME_FRAGMENTS_CONFIG = "aggregateWebSocketFrameFragments"; - public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG= "enableWebSocketCompression"; + public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG = "enableWebSocketCompression"; public static final String STRICT_302_HANDLING_CONFIG = "strict302Handling"; public static final String KEEP_ALIVE_CONFIG = "keepAlive"; public static final String MAX_REQUEST_RETRY_CONFIG = "maxRequestRetry"; @@ -97,6 +98,10 @@ public static int defaultMaxConnectionsPerHost() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + MAX_CONNECTIONS_PER_HOST_CONFIG); } + public static int defaultAcquireFreeChannelTimeout() { + return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + ACQUIRE_FREE_CHANNEL_TIMEOUT); + } + public static int defaultConnectTimeout() { return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + CONNECTION_TIMEOUT_CONFIG); } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java new file mode 100644 index 0000000000..04549fd80d --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/CombinedConnectionSemaphore.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2018 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * A combined {@link ConnectionSemaphore} with two limits - a global limit and a per-host limit + */ +public class CombinedConnectionSemaphore extends PerHostConnectionSemaphore { + protected final MaxConnectionSemaphore globalMaxConnectionSemaphore; + + CombinedConnectionSemaphore(int maxConnections, int maxConnectionsPerHost, int acquireTimeout) { + super(maxConnectionsPerHost, acquireTimeout); + this.globalMaxConnectionSemaphore = new MaxConnectionSemaphore(maxConnections, acquireTimeout); + } + + @Override + public void acquireChannelLock(Object partitionKey) throws IOException { + long remainingTime = super.acquireTimeout > 0 ? acquireGlobalTimed(partitionKey) : acquireGlobal(partitionKey); + + try { + if (remainingTime < 0 || !getFreeConnectionsForHost(partitionKey).tryAcquire(remainingTime, TimeUnit.MILLISECONDS)) { + releaseGlobal(partitionKey); + throw tooManyConnectionsPerHost; + } + } catch (InterruptedException e) { + releaseGlobal(partitionKey); + throw new RuntimeException(e); + } + } + + protected void releaseGlobal(Object partitionKey) { + this.globalMaxConnectionSemaphore.releaseChannelLock(partitionKey); + } + + protected long acquireGlobal(Object partitionKey) throws IOException { + this.globalMaxConnectionSemaphore.acquireChannelLock(partitionKey); + return 0; + } + + /* + * Acquires the global lock and returns the remaining time, in millis, to acquire the per-host lock + */ + protected long acquireGlobalTimed(Object partitionKey) throws IOException { + long beforeGlobalAcquire = System.currentTimeMillis(); + acquireGlobal(partitionKey); + long lockTime = System.currentTimeMillis() - beforeGlobalAcquire; + return this.acquireTimeout - lockTime; + } + + @Override + public void releaseChannelLock(Object partitionKey) { + this.globalMaxConnectionSemaphore.releaseChannelLock(partitionKey); + super.releaseChannelLock(partitionKey); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultConnectionSemaphoreFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultConnectionSemaphoreFactory.java index a102f1def8..eba42186ee 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultConnectionSemaphoreFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultConnectionSemaphoreFactory.java @@ -17,14 +17,21 @@ public class DefaultConnectionSemaphoreFactory implements ConnectionSemaphoreFactory { - public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) { - ConnectionSemaphore semaphore = new NoopConnectionSemaphore(); - if (config.getMaxConnections() > 0) { - semaphore = new MaxConnectionSemaphore(config.getMaxConnections()); - } - if (config.getMaxConnectionsPerHost() > 0) { - semaphore = new PerHostConnectionSemaphore(config.getMaxConnectionsPerHost(), semaphore); - } - return semaphore; + public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) { + int acquireFreeChannelTimeout = Math.max(0, config.getAcquireFreeChannelTimeout()); + int maxConnections = config.getMaxConnections(); + int maxConnectionsPerHost = config.getMaxConnectionsPerHost(); + + if (maxConnections > 0 && maxConnectionsPerHost > 0) { + return new CombinedConnectionSemaphore(maxConnections, maxConnectionsPerHost, acquireFreeChannelTimeout); + } + if (maxConnections > 0) { + return new MaxConnectionSemaphore(maxConnections, acquireFreeChannelTimeout); } + if (maxConnectionsPerHost > 0) { + return new CombinedConnectionSemaphore(maxConnections, maxConnectionsPerHost, acquireFreeChannelTimeout); + } + + return new NoopConnectionSemaphore(); + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/InfiniteSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/InfiniteSemaphore.java new file mode 100644 index 0000000000..97b8224739 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/InfiniteSemaphore.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2018 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * A java.util.concurrent.Semaphore that always has Integer.Integer.MAX_VALUE free permits + * + * @author Alex Maltinsky + */ +public class InfiniteSemaphore extends Semaphore { + + public static final InfiniteSemaphore INSTANCE = new InfiniteSemaphore(); + private static final long serialVersionUID = 1L; + + private InfiniteSemaphore() { + super(Integer.MAX_VALUE); + } + + @Override + public void acquire() { + // NO-OP + } + + @Override + public void acquireUninterruptibly() { + // NO-OP + } + + @Override + public boolean tryAcquire() { + return true; + } + + @Override + public boolean tryAcquire(long timeout, TimeUnit unit) { + return true; + } + + @Override + public void release() { + // NO-OP + } + + @Override + public void acquire(int permits) { + // NO-OP + } + + @Override + public void acquireUninterruptibly(int permits) { + // NO-OP + } + + @Override + public boolean tryAcquire(int permits) { + return true; + } + + @Override + public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { + return true; + } + + @Override + public void release(int permits) { + // NO-OP + } + + @Override + public int availablePermits() { + return Integer.MAX_VALUE; + } + + @Override + public int drainPermits() { + return Integer.MAX_VALUE; + } + + @Override + protected void reducePermits(int reduction) { + // NO-OP + } + + @Override + public boolean isFair() { + return true; + } + + @Override + protected Collection getQueuedThreads() { + return Collections.emptyList(); + } +} + diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java index 99bd6a4be4..99c318afac 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java @@ -16,6 +16,8 @@ import org.asynchttpclient.exception.TooManyConnectionsException; import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace; @@ -23,21 +25,29 @@ * Max connections limiter. * * @author Stepan Koltsov + * @author Alex Maltinsky */ public class MaxConnectionSemaphore implements ConnectionSemaphore { - private final NonBlockingSemaphoreLike freeChannels; - private final IOException tooManyConnections; + protected final Semaphore freeChannels; + protected final IOException tooManyConnections; + protected final int acquireTimeout; - MaxConnectionSemaphore(int maxConnections) { + MaxConnectionSemaphore(int maxConnections, int acquireTimeout) { tooManyConnections = unknownStackTrace(new TooManyConnectionsException(maxConnections), MaxConnectionSemaphore.class, "acquireChannelLock"); - freeChannels = maxConnections > 0 ? new NonBlockingSemaphore(maxConnections) : NonBlockingSemaphoreInfinite.INSTANCE; + freeChannels = maxConnections > 0 ? new Semaphore(maxConnections) : InfiniteSemaphore.INSTANCE; + this.acquireTimeout = Math.max(0, acquireTimeout); } @Override public void acquireChannelLock(Object partitionKey) throws IOException { - if (!freeChannels.tryAcquire()) - throw tooManyConnections; + try { + if (!freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) { + throw tooManyConnections; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @Override diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphore.java deleted file mode 100644 index a7bd2eacfe..0000000000 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphore.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package org.asynchttpclient.netty.channel; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Semaphore-like API, but without blocking. - * - * @author Stepan Koltsov - */ -class NonBlockingSemaphore implements NonBlockingSemaphoreLike { - - private final AtomicInteger permits; - - NonBlockingSemaphore(int permits) { - this.permits = new AtomicInteger(permits); - } - - @Override - public void release() { - permits.incrementAndGet(); - } - - @Override - public boolean tryAcquire() { - for (; ; ) { - int count = permits.get(); - if (count <= 0) { - return false; - } - if (permits.compareAndSet(count, count - 1)) { - return true; - } - } - } - - @Override - public String toString() { - // mimic toString of Semaphore class - return super.toString() + "[Permits = " + permits + "]"; - } -} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreInfinite.java b/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreInfinite.java deleted file mode 100644 index 3d4fb91dbd..0000000000 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreInfinite.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package org.asynchttpclient.netty.channel; - -/** - * Non-blocking semaphore-like object with infinite permits. - *

- * So try-acquire always succeeds. - * - * @author Stepan Koltsov - */ -enum NonBlockingSemaphoreInfinite implements NonBlockingSemaphoreLike { - INSTANCE; - - @Override - public void release() { - } - - @Override - public boolean tryAcquire() { - return true; - } - - @Override - public String toString() { - return NonBlockingSemaphore.class.getName(); - } -} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreLike.java b/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreLike.java deleted file mode 100644 index 44303c9dfc..0000000000 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreLike.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package org.asynchttpclient.netty.channel; - -/** - * Non-blocking semaphore API. - * - * @author Stepan Koltsov - */ -interface NonBlockingSemaphoreLike { - void release(); - - boolean tryAcquire(); -} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java b/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java index 5ebb348abf..9ce1f20e93 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/PerHostConnectionSemaphore.java @@ -17,6 +17,8 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace; @@ -25,37 +27,36 @@ */ public class PerHostConnectionSemaphore implements ConnectionSemaphore { - private final ConnectionSemaphore globalSemaphore; + protected final ConcurrentHashMap freeChannelsPerHost = new ConcurrentHashMap<>(); + protected final int maxConnectionsPerHost; + protected final IOException tooManyConnectionsPerHost; + protected final int acquireTimeout; - private final ConcurrentHashMap freeChannelsPerHost = new ConcurrentHashMap<>(); - private final int maxConnectionsPerHost; - private final IOException tooManyConnectionsPerHost; - - PerHostConnectionSemaphore(int maxConnectionsPerHost, ConnectionSemaphore globalSemaphore) { - this.globalSemaphore = globalSemaphore; + PerHostConnectionSemaphore(int maxConnectionsPerHost, int acquireTimeout) { tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(maxConnectionsPerHost), PerHostConnectionSemaphore.class, "acquireChannelLock"); this.maxConnectionsPerHost = maxConnectionsPerHost; + this.acquireTimeout = Math.max(0, acquireTimeout); } @Override public void acquireChannelLock(Object partitionKey) throws IOException { - globalSemaphore.acquireChannelLock(partitionKey); - - if (!getFreeConnectionsForHost(partitionKey).tryAcquire()) { - globalSemaphore.releaseChannelLock(partitionKey); - throw tooManyConnectionsPerHost; + try { + if (!getFreeConnectionsForHost(partitionKey).tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) { + throw tooManyConnectionsPerHost; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @Override public void releaseChannelLock(Object partitionKey) { - globalSemaphore.releaseChannelLock(partitionKey); getFreeConnectionsForHost(partitionKey).release(); } - private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) { + protected Semaphore getFreeConnectionsForHost(Object partitionKey) { return maxConnectionsPerHost > 0 ? - freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(maxConnectionsPerHost)) : - NonBlockingSemaphoreInfinite.INSTANCE; + freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new Semaphore(maxConnectionsPerHost)) : + InfiniteSemaphore.INSTANCE; } } diff --git a/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties b/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties index cdc632f701..c6fb355d75 100644 --- a/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties +++ b/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties @@ -1,6 +1,7 @@ org.asynchttpclient.threadPoolName=AsyncHttpClient org.asynchttpclient.maxConnections=-1 org.asynchttpclient.maxConnectionsPerHost=-1 +org.asynchttpclient.acquireFreeChannelTimeout=0 org.asynchttpclient.connectTimeout=5000 org.asynchttpclient.pooledConnectionIdleTimeout=60000 org.asynchttpclient.connectionPoolCleanerPeriod=1000 diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreTest.java deleted file mode 100644 index a387ba408b..0000000000 --- a/client/src/test/java/org/asynchttpclient/netty/channel/NonBlockingSemaphoreTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package org.asynchttpclient.netty.channel; - -import org.testng.annotations.Test; - -import java.util.concurrent.Semaphore; - -import static org.testng.Assert.*; - -/** - * @author Stepan Koltsov - */ -public class NonBlockingSemaphoreTest { - - @Test - public void test0() { - Mirror mirror = new Mirror(0); - assertFalse(mirror.tryAcquire()); - } - - @Test - public void three() { - Mirror mirror = new Mirror(3); - for (int i = 0; i < 3; ++i) { - assertTrue(mirror.tryAcquire()); - } - assertFalse(mirror.tryAcquire()); - mirror.release(); - assertTrue(mirror.tryAcquire()); - } - - @Test - public void negative() { - Mirror mirror = new Mirror(-1); - assertFalse(mirror.tryAcquire()); - mirror.release(); - assertFalse(mirror.tryAcquire()); - mirror.release(); - assertTrue(mirror.tryAcquire()); - } - - private static class Mirror { - private final Semaphore real; - private final NonBlockingSemaphore nonBlocking; - - Mirror(int permits) { - real = new Semaphore(permits); - nonBlocking = new NonBlockingSemaphore(permits); - } - - boolean tryAcquire() { - boolean a = real.tryAcquire(); - boolean b = nonBlocking.tryAcquire(); - assertEquals(a, b); - return a; - } - - void release() { - real.release(); - nonBlocking.release(); - } - } - -} diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreRunner.java b/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreRunner.java new file mode 100644 index 0000000000..7bff799ceb --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreRunner.java @@ -0,0 +1,52 @@ +package org.asynchttpclient.netty.channel; + +class SemaphoreRunner { + + final ConnectionSemaphore semaphore; + final Thread acquireThread; + + volatile long acquireTime; + volatile Exception acquireException; + + public SemaphoreRunner(ConnectionSemaphore semaphore, Object partitionKey) { + this.semaphore = semaphore; + this.acquireThread = new Thread(() -> { + long beforeAcquire = System.currentTimeMillis(); + try { + semaphore.acquireChannelLock(partitionKey); + } catch (Exception e) { + acquireException = e; + } finally { + acquireTime = System.currentTimeMillis() - beforeAcquire; + } + }); + } + + public void acquire() { + this.acquireThread.start(); + } + + public void interrupt() { + this.acquireThread.interrupt(); + } + + public void await() { + try { + this.acquireThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public boolean finished() { + return !this.acquireThread.isAlive(); + } + + public long getAcquireTime() { + return acquireTime; + } + + public Exception getAcquireException() { + return acquireException; + } +} diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java new file mode 100644 index 0000000000..125cd9b066 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/channel/SemaphoreTest.java @@ -0,0 +1,143 @@ +package org.asynchttpclient.netty.channel; + +import org.asynchttpclient.exception.TooManyConnectionsException; +import org.asynchttpclient.exception.TooManyConnectionsPerHostException; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.testng.AssertJUnit.*; + +public class SemaphoreTest { + + static final int CHECK_ACQUIRE_TIME__PERMITS = 10; + static final int CHECK_ACQUIRE_TIME__TIMEOUT = 100; + + static final int NON_DETERMINISTIC__INVOCATION_COUNT = 10; + static final int NON_DETERMINISTIC__SUCCESS_PERCENT = 70; + + private final Object PK = new Object(); + + @DataProvider(name = "permitsAndRunnersCount") + public Object[][] permitsAndRunnersCount() { + Object[][] objects = new Object[100][]; + int row = 0; + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + objects[row++] = new Object[]{i, j}; + } + } + return objects; + } + + @Test(timeOut = 1000, dataProvider = "permitsAndRunnersCount") + public void maxConnectionCheckPermitCount(int permitCount, int runnerCount) { + allSemaphoresCheckPermitCount(new MaxConnectionSemaphore(permitCount, 0), permitCount, runnerCount); + } + + @Test(timeOut = 1000, dataProvider = "permitsAndRunnersCount") + public void perHostCheckPermitCount(int permitCount, int runnerCount) { + allSemaphoresCheckPermitCount(new PerHostConnectionSemaphore(permitCount, 0), permitCount, runnerCount); + } + + @Test(timeOut = 3000, dataProvider = "permitsAndRunnersCount") + public void combinedCheckPermitCount(int permitCount, int runnerCount) { + allSemaphoresCheckPermitCount(new CombinedConnectionSemaphore(permitCount, permitCount, 0), permitCount, runnerCount); + allSemaphoresCheckPermitCount(new CombinedConnectionSemaphore(0, permitCount, 0), permitCount, runnerCount); + allSemaphoresCheckPermitCount(new CombinedConnectionSemaphore(permitCount, 0, 0), permitCount, runnerCount); + } + + private void allSemaphoresCheckPermitCount(ConnectionSemaphore semaphore, int permitCount, int runnerCount) { + List runners = IntStream.range(0, runnerCount) + .mapToObj(i -> new SemaphoreRunner(semaphore, PK)) + .collect(Collectors.toList()); + runners.forEach(SemaphoreRunner::acquire); + runners.forEach(SemaphoreRunner::await); + + long tooManyConnectionsCount = runners.stream().map(SemaphoreRunner::getAcquireException) + .filter(Objects::nonNull) + .filter(e -> e instanceof IOException) + .count(); + + long acquired = runners.stream().map(SemaphoreRunner::getAcquireException) + .filter(Objects::isNull) + .count(); + + int expectedAcquired = permitCount > 0 ? Math.min(permitCount, runnerCount) : runnerCount; + + assertEquals(expectedAcquired, acquired); + assertEquals(runnerCount - acquired, tooManyConnectionsCount); + } + + @Test(timeOut = 1000, invocationCount = NON_DETERMINISTIC__INVOCATION_COUNT, successPercentage = NON_DETERMINISTIC__SUCCESS_PERCENT) + public void maxConnectionCheckAcquireTime() { + checkAcquireTime(new MaxConnectionSemaphore(CHECK_ACQUIRE_TIME__PERMITS, CHECK_ACQUIRE_TIME__TIMEOUT)); + } + + @Test(timeOut = 1000, invocationCount = NON_DETERMINISTIC__INVOCATION_COUNT, successPercentage = NON_DETERMINISTIC__SUCCESS_PERCENT) + public void perHostCheckAcquireTime() { + checkAcquireTime(new PerHostConnectionSemaphore(CHECK_ACQUIRE_TIME__PERMITS, CHECK_ACQUIRE_TIME__TIMEOUT)); + } + + @Test(timeOut = 1000, invocationCount = NON_DETERMINISTIC__INVOCATION_COUNT, successPercentage = NON_DETERMINISTIC__SUCCESS_PERCENT) + public void combinedCheckAcquireTime() { + checkAcquireTime(new CombinedConnectionSemaphore(CHECK_ACQUIRE_TIME__PERMITS, + CHECK_ACQUIRE_TIME__PERMITS, + CHECK_ACQUIRE_TIME__TIMEOUT)); + } + + private void checkAcquireTime(ConnectionSemaphore semaphore) { + List runners = IntStream.range(0, CHECK_ACQUIRE_TIME__PERMITS * 2) + .mapToObj(i -> new SemaphoreRunner(semaphore, PK)) + .collect(Collectors.toList()); + long acquireStartTime = System.currentTimeMillis(); + runners.forEach(SemaphoreRunner::acquire); + runners.forEach(SemaphoreRunner::await); + long timeToAcquire = System.currentTimeMillis() - acquireStartTime; + + assertTrue("Semaphore acquired too soon: " + timeToAcquire+" ms",timeToAcquire >= (CHECK_ACQUIRE_TIME__TIMEOUT - 50)); //Lower Bound + assertTrue("Semaphore acquired too late: " + timeToAcquire+" ms",timeToAcquire <= (CHECK_ACQUIRE_TIME__TIMEOUT + 300)); //Upper Bound + } + + @Test(timeOut = 1000) + public void maxConnectionCheckRelease() throws IOException { + checkRelease(new MaxConnectionSemaphore(1, 0)); + } + + @Test(timeOut = 1000) + public void perHostCheckRelease() throws IOException { + checkRelease(new PerHostConnectionSemaphore(1, 0)); + } + + @Test(timeOut = 1000) + public void combinedCheckRelease() throws IOException { + checkRelease(new CombinedConnectionSemaphore(1, 1, 0)); + } + + private void checkRelease(ConnectionSemaphore semaphore) throws IOException { + semaphore.acquireChannelLock(PK); + boolean tooManyCaught = false; + try { + semaphore.acquireChannelLock(PK); + } catch (TooManyConnectionsException | TooManyConnectionsPerHostException e) { + tooManyCaught = true; + } + assertTrue(tooManyCaught); + tooManyCaught = false; + semaphore.releaseChannelLock(PK); + try { + semaphore.acquireChannelLock(PK); + } catch (TooManyConnectionsException | TooManyConnectionsPerHostException e) { + tooManyCaught = true; + } + assertFalse(tooManyCaught); + } + + +} + diff --git a/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java b/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java index 8917052611..55c88ab251 100644 --- a/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java +++ b/extras/typesafeconfig/src/main/java/org/asynchttpclient/extras/typesafeconfig/AsyncHttpClientTypesafeConfig.java @@ -69,6 +69,11 @@ public int getMaxConnectionsPerHost() { return getIntegerOpt(MAX_CONNECTIONS_PER_HOST_CONFIG).orElse(defaultMaxConnectionsPerHost()); } + @Override + public int getAcquireFreeChannelTimeout() { + return getIntegerOpt(ACQUIRE_FREE_CHANNEL_TIMEOUT).orElse(defaultAcquireFreeChannelTimeout()); + } + @Override public int getConnectTimeout() { return getIntegerOpt(CONNECTION_TIMEOUT_CONFIG).orElse(defaultConnectTimeout()); @@ -407,7 +412,7 @@ private Optional> getListOpt(String key) { private Optional getOpt(Function func, String key) { return config.hasPath(key) - ? Optional.ofNullable(func.apply(key)) - : Optional.empty(); + ? Optional.ofNullable(func.apply(key)) + : Optional.empty(); } }