diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index f9fafa9f95a2e..fa32ab417737c 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -45,7 +45,7 @@ public interface ActionListener { * Creates a listener that listens for a response (or failure) and executes the * corresponding consumer when the response (or failure) is received. * - * @param onResponse the consumer of the response, when the listener receives one + * @param onResponse the checked consumer of the response, when the listener receives one * @param onFailure the consumer of the failure, when the listener receives one * @param the type of the response * @return a listener that listens for responses and invokes the consumer when received diff --git a/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index 749bf1fea019d..943c36797096c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -33,7 +33,7 @@ public class PlainListenableActionFuture extends AdapterActionFuture im volatile Object listeners; boolean executedListeners = false; - private PlainListenableActionFuture() {} + protected PlainListenableActionFuture() {} /** * This method returns a listenable future. The listeners will be called on completion of the future. diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java index 27ddca978786f..f846c53abdec6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java @@ -136,10 +136,6 @@ private void closeChannels(ArrayList connections, Exception e) for (final NioSocketChannel socketChannel : connections) { try { socketChannel.closeAsync().awaitClose(); - } catch (InterruptedException inner) { - logger.trace("exception while closing channel", e); - e.addSuppressed(inner); - Thread.currentThread().interrupt(); } catch (Exception inner) { logger.trace("exception while closing channel", e); e.addSuppressed(inner); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index a621925140090..5a9dba29f5e57 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -119,11 +119,7 @@ protected void closeChannels(List channels, boolean blocking) throws for (CloseFuture future : futures) { try { future.awaitClose(); - IOException closeException = future.getCloseException(); - if (closeException != null) { - closingExceptions = addClosingException(closingExceptions, closeException); - } - } catch (InterruptedException e) { + } catch (Exception e) { closingExceptions = addClosingException(closingExceptions, e); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index c02312aab51d6..36f776a625aea 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -113,7 +113,7 @@ public void closeFromSelector() { closeRawChannel(); closedOnThisCall = closeFuture.channelClosed(this); } catch (IOException e) { - closedOnThisCall = closeFuture.channelCloseThrewException(this, e); + closedOnThisCall = closeFuture.channelCloseThrewException(e); } finally { if (closedOnThisCall) { selector.removeRegisteredChannel(this); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index c25936ce7fc01..f2f92e94e509d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.AcceptingSelector; @@ -55,7 +56,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().setListener(closeListener); + channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } @@ -65,7 +66,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel); NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().setListener(closeListener); + channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java index c27ba306e0e60..5932de8fef708 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java @@ -19,35 +19,37 @@ package org.elasticsearch.transport.nio.channel; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.common.util.concurrent.BaseFuture; +import org.elasticsearch.action.support.PlainListenableActionFuture; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; -public class CloseFuture extends BaseFuture { - - private final SetOnce> listener = new SetOnce<>(); +public class CloseFuture extends PlainListenableActionFuture { @Override public boolean cancel(boolean mayInterruptIfRunning) { throw new UnsupportedOperationException("Cannot cancel close future"); } - public void awaitClose() throws InterruptedException, IOException { + public void awaitClose() throws IOException { try { super.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); } catch (ExecutionException e) { throw (IOException) e.getCause(); } } - public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, IOException { + public void awaitClose(long timeout, TimeUnit unit) throws TimeoutException, IOException { try { super.get(timeout, unit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); } catch (ExecutionException e) { throw (IOException) e.getCause(); } @@ -76,31 +78,13 @@ public boolean isClosed() { return super.isDone(); } - public void setListener(Consumer listener) { - this.listener.set(listener); - } - boolean channelClosed(NioChannel channel) { - boolean set = set(channel); - if (set) { - Consumer listener = this.listener.get(); - if (listener != null) { - listener.accept(channel); - } - } - return set; + return set(channel); } - boolean channelCloseThrewException(NioChannel channel, IOException ex) { - boolean set = setException(ex); - if (set) { - Consumer listener = this.listener.get(); - if (listener != null) { - listener.accept(channel); - } - } - return set; + boolean channelCloseThrewException(IOException ex) { + return setException(ex); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index 6f05d3c1f34c6..367df0c78f4c8 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio.channel; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.AcceptingSelector; import org.elasticsearch.transport.nio.AcceptorEventHandler; @@ -33,6 +34,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.mockito.Mockito.mock; @@ -64,10 +66,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti CountDownLatch latch = new CountDownLatch(1); NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); - channel.getCloseFuture().setListener((c) -> { + Consumer listener = (c) -> { ref.set(c); latch.countDown(); - }); + }; + channel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(channel))); CloseFuture closeFuture = channel.getCloseFuture(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index 3d039b41a8a68..75ec57b2603db 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio.channel; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.SocketEventHandler; import org.elasticsearch.transport.nio.SocketSelector; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -67,10 +69,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); - socketChannel.getCloseFuture().setListener((c) -> { + Consumer listener = (c) -> { ref.set(c); latch.countDown(); - }); + }; + socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(socketChannel))); CloseFuture closeFuture = socketChannel.getCloseFuture(); assertFalse(closeFuture.isClosed());