diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java index 7d1c0c6506..a9411621f4 100755 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java @@ -287,4 +287,12 @@ public interface AsyncHttpClient extends Closeable { * @return the config associated to this client. */ AsyncHttpClientConfig getConfig(); + + /** + * Similar to calling the method {@link #close()} but additionally waits for inactivity on shared resources between + * multiple instances of netty. Calling this method instead of the method {@link #close()} might be helpful + * on application shutdown to prevent errors like a {@link ClassNotFoundException} because the class loader was + * already removed but there are still some active tasks on this shared resources which want to access these classes. + */ + void closeAndAwaitInactivity(); } diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java index b2570056f5..dd09ca386a 100644 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientState.java @@ -17,13 +17,13 @@ public class AsyncHttpClientState { - private final AtomicBoolean closed; + private final AtomicBoolean closeTriggered; - public AsyncHttpClientState(AtomicBoolean closed) { - this.closed = closed; + public AsyncHttpClientState(AtomicBoolean closeTriggered) { + this.closeTriggered = closeTriggered; } - public boolean isClosed() { - return closed.get(); + public boolean isCloseTriggered() { + return closeTriggered.get(); } } diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java index bded469db2..fe26f4ab58 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java @@ -19,8 +19,14 @@ import static org.asynchttpclient.util.Assertions.assertNotNull; import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; +import io.netty.util.ThreadDeathWatcher; import io.netty.util.Timer; +import io.netty.util.concurrent.GlobalEventExecutor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -42,6 +48,7 @@ public class DefaultAsyncHttpClient implements AsyncHttpClient { private final static Logger LOGGER = LoggerFactory.getLogger(DefaultAsyncHttpClient.class); private final AsyncHttpClientConfig config; + private final AtomicBoolean closeTriggered = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false); private final ChannelManager channelManager; private final ConnectionSemaphore connectionSemaphore; @@ -87,7 +94,7 @@ public DefaultAsyncHttpClient(AsyncHttpClientConfig config) { channelManager = new ChannelManager(config, nettyTimer); connectionSemaphore = new ConnectionSemaphore(config); - requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closed)); + requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closeTriggered)); channelManager.configureBootstraps(requestSender); } @@ -99,22 +106,60 @@ private Timer newNettyTimer() { @Override public void close() { - if (closed.compareAndSet(false, true)) { - try { - channelManager.close(); - } catch (Throwable t) { - LOGGER.warn("Unexpected error on ChannelManager close", t); - } - if (allowStopNettyTimer) { - try { - nettyTimer.stop(); - } catch (Throwable t) { - LOGGER.warn("Unexpected error on HashedWheelTimer close", t); + closeInternal(false); + } + + public void closeAndAwaitInactivity() { + closeInternal(true); + } + + private void closeInternal(boolean awaitInactivity) { + if (closeTriggered.compareAndSet(false, true)) { + CompletableFuture handledCloseFuture = channelManager.close().whenComplete((v, t) -> { + if(t != null) { + LOGGER.warn("Unexpected error on ChannelManager close", t); } + if (allowStopNettyTimer) { + try { + nettyTimer.stop(); + } catch (Throwable th) { + LOGGER.warn("Unexpected error on HashedWheelTimer close", th); + } + } + }); + + if(awaitInactivity) { + handledCloseFuture = handledCloseFuture.thenCombine(awaitInactivity(), (v1,v2) -> null) ; } + + try { + handledCloseFuture.get(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException t) { + LOGGER.warn("Unexpected error on AsyncHttpClient close", t); + } catch (ExecutionException e) { + // already handled and could be ignored + } + closed.compareAndSet(false, true); } } + private CompletableFuture awaitInactivity() { + //see https://github.com/netty/netty/issues/2084#issuecomment-44822314 + CompletableFuture wait1 = CompletableFuture.runAsync(() -> { + try { + GlobalEventExecutor.INSTANCE.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } catch(InterruptedException t) { + // Ignore + }}); + CompletableFuture wait2 = CompletableFuture.runAsync(() -> { + try { + ThreadDeathWatcher.awaitInactivity(config.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } catch(InterruptedException t) { + // Ignore + }}); + return wait1.thenCombine(wait2, (v1,v2) -> null); + } + @Override public boolean isClosed() { return closed.get(); diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index 0a6b0caad8..5150169388 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -38,11 +38,13 @@ import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -299,18 +301,25 @@ public boolean removeAll(Channel connection) { return channelPool.removeAll(connection); } - private void doClose() { - openChannels.close(); - channelPool.destroy(); - } - - public void close() { + public CompletableFuture close() { + CompletableFuture closeFuture = CompletableFuture.completedFuture(null); if (allowReleaseEventLoopGroup) { - eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)// - .addListener(future -> doClose()); - } else { - doClose(); + closeFuture = toCompletableFuture( + eventLoopGroup.shutdownGracefully(config.getShutdownQuietPeriod(), config.getShutdownTimeout(), TimeUnit.MILLISECONDS)); } + return closeFuture.thenCompose(v -> toCompletableFuture(openChannels.close())).whenComplete((v,t) -> channelPool.destroy()); + } + + private static CompletableFuture toCompletableFuture(final Future future) { + final CompletableFuture completableFuture = new CompletableFuture<>(); + future.addListener(r -> { + if(r.isSuccess()) { + completableFuture.complete(null); + } else { + completableFuture.completeExceptionally(r.cause()); + } + }); + return completableFuture; } public void closeChannel(Channel channel) { diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java index 1bccecec42..8ec3f78725 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java @@ -72,7 +72,7 @@ public void connect(final Bootstrap bootstrap, final NettyConnectListener con try { connect0(bootstrap, connectListener, remoteAddress); } catch (RejectedExecutionException e) { - if (clientState.isClosed()) { + if (clientState.isCloseTriggered()) { LOGGER.info("Connect crash but engine is shutting down"); } else { connectListener.onFailure(null, e); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 6e3e0e0f62..7241c3f008 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -597,7 +597,7 @@ public void replayRequest(final NettyResponseFuture future, FilterContext fc, } public boolean isClosed() { - return clientState.isClosed(); + return clientState.isCloseTriggered(); } public void drainChannelAndExecuteNextRequest(final Channel channel, final NettyResponseFuture future, Request nextRequest) { diff --git a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java index 713887c98d..8891e7ffae 100644 --- a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java +++ b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java @@ -143,4 +143,9 @@ public void flushChannelPoolPartitions(Predicate predicate) { public AsyncHttpClientConfig getConfig() { return null; } + + @Override + public void closeAndAwaitInactivity() { + + } } diff --git a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java index 0e61c109fd..275ac2f3fc 100644 --- a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java +++ b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java @@ -139,4 +139,8 @@ public void flushChannelPoolPartitions(Predicate predicate) { public AsyncHttpClientConfig getConfig() { return null; } + + @Override + public void closeAndAwaitInactivity() { + } }