From 26b47ccccb4329479b434490ff45f4b15393e384 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 16 Aug 2017 10:26:05 -0500 Subject: [PATCH 1/6] Use PlainListenableActionFuture for CloseFuture Right now we use a custom future for the CloseFuture associated with a channel. This is because we need special unwrapping logic to ensure that exceptions from a future failure are a certain type (opposed to an UncategorizedException). However, the current version is limiting because we can only attach one listener. This commit changes the CloseFuture to extend the PlainListenableActionFuture. This change allows us to attach multiple listeners. --- .../support/PlainListenableActionFuture.java | 2 +- .../transport/nio/NioClient.java | 4 -- .../transport/nio/NioTransport.java | 7 ++- .../nio/channel/AbstractNioChannel.java | 2 +- .../nio/channel/ChannelConsumerAdaptor.java | 49 +++++++++++++++++++ .../transport/nio/channel/ChannelFactory.java | 4 +- .../transport/nio/channel/CloseFuture.java | 42 +++++----------- .../channel/NioServerSocketChannelTests.java | 4 +- .../nio/channel/NioSocketChannelTests.java | 4 +- 9 files changed, 73 insertions(+), 45 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java diff --git a/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index 749bf1fea019d..943c36797096c 100644 --- a/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -33,7 +33,7 @@ public class PlainListenableActionFuture extends AdapterActionFuture im volatile Object listeners; boolean executedListeners = false; - private PlainListenableActionFuture() {} + protected PlainListenableActionFuture() {} /** * This method returns a listenable future. The listeners will be called on completion of the future. diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java index 27ddca978786f..f846c53abdec6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java @@ -136,10 +136,6 @@ private void closeChannels(ArrayList connections, Exception e) for (final NioSocketChannel socketChannel : connections) { try { socketChannel.closeAsync().awaitClose(); - } catch (InterruptedException inner) { - logger.trace("exception while closing channel", e); - e.addSuppressed(inner); - Thread.currentThread().interrupt(); } catch (Exception inner) { logger.trace("exception while closing channel", e); e.addSuppressed(inner); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index a621925140090..531a4d29989e1 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -119,11 +119,10 @@ protected void closeChannels(List channels, boolean blocking) throws for (CloseFuture future : futures) { try { future.awaitClose(); - IOException closeException = future.getCloseException(); - if (closeException != null) { - closingExceptions = addClosingException(closingExceptions, closeException); + } catch (Exception e) { + if (closingExceptions != null) { + closingExceptions = addClosingException(closingExceptions, e); } - } catch (InterruptedException e) { closingExceptions = addClosingException(closingExceptions, e); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index c02312aab51d6..36f776a625aea 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -113,7 +113,7 @@ public void closeFromSelector() { closeRawChannel(); closedOnThisCall = closeFuture.channelClosed(this); } catch (IOException e) { - closedOnThisCall = closeFuture.channelCloseThrewException(this, e); + closedOnThisCall = closeFuture.channelCloseThrewException(e); } finally { if (closedOnThisCall) { selector.removeRegisteredChannel(this); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java new file mode 100644 index 0000000000000..df3864bf48be0 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio.channel; + +import org.elasticsearch.action.ActionListener; + +import java.util.function.Consumer; + +public class ChannelConsumerAdaptor implements ActionListener { + + private final NioChannel channel; + private final Consumer consumer; + + private ChannelConsumerAdaptor(NioChannel channel, Consumer consumer) { + this.channel = channel; + this.consumer = consumer; + } + + static ChannelConsumerAdaptor adapt(NioChannel channel, Consumer consumer) { + return new ChannelConsumerAdaptor(channel, consumer); + } + + @Override + public void onResponse(NioChannel channel) { + consumer.accept(channel); + } + + @Override + public void onFailure(Exception e) { + consumer.accept(channel); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index c25936ce7fc01..4227b9a0cd549 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -55,7 +55,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().setListener(closeListener); + channel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(channel, closeListener)); scheduleChannel(channel, selector); return channel; } @@ -65,7 +65,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel); NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().setListener(closeListener); + channel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(channel, closeListener)); scheduleChannel(channel, selector); return channel; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java index c27ba306e0e60..5932de8fef708 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/CloseFuture.java @@ -19,35 +19,37 @@ package org.elasticsearch.transport.nio.channel; -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.common.util.concurrent.BaseFuture; +import org.elasticsearch.action.support.PlainListenableActionFuture; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; -public class CloseFuture extends BaseFuture { - - private final SetOnce> listener = new SetOnce<>(); +public class CloseFuture extends PlainListenableActionFuture { @Override public boolean cancel(boolean mayInterruptIfRunning) { throw new UnsupportedOperationException("Cannot cancel close future"); } - public void awaitClose() throws InterruptedException, IOException { + public void awaitClose() throws IOException { try { super.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); } catch (ExecutionException e) { throw (IOException) e.getCause(); } } - public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, IOException { + public void awaitClose(long timeout, TimeUnit unit) throws TimeoutException, IOException { try { super.get(timeout, unit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Future got interrupted", e); } catch (ExecutionException e) { throw (IOException) e.getCause(); } @@ -76,31 +78,13 @@ public boolean isClosed() { return super.isDone(); } - public void setListener(Consumer listener) { - this.listener.set(listener); - } - boolean channelClosed(NioChannel channel) { - boolean set = set(channel); - if (set) { - Consumer listener = this.listener.get(); - if (listener != null) { - listener.accept(channel); - } - } - return set; + return set(channel); } - boolean channelCloseThrewException(NioChannel channel, IOException ex) { - boolean set = setException(ex); - if (set) { - Consumer listener = this.listener.get(); - if (listener != null) { - listener.accept(channel); - } - } - return set; + boolean channelCloseThrewException(IOException ex) { + return setException(ex); } } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index 6f05d3c1f34c6..b3693bfb7866e 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -64,10 +64,10 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti CountDownLatch latch = new CountDownLatch(1); NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); - channel.getCloseFuture().setListener((c) -> { + channel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(channel, (c) -> { ref.set(c); latch.countDown(); - }); + })); CloseFuture closeFuture = channel.getCloseFuture(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index 3d039b41a8a68..90002c38af724 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -67,10 +67,10 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); - socketChannel.getCloseFuture().setListener((c) -> { + socketChannel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(socketChannel, (c) -> { ref.set(c); latch.countDown(); - }); + })); CloseFuture closeFuture = socketChannel.getCloseFuture(); assertFalse(closeFuture.isClosed()); From d49797e80e5241481796c3138d5f565657c6a2be Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 16 Aug 2017 10:34:06 -0500 Subject: [PATCH 2/6] Fix issue with closing exceptions --- .../java/org/elasticsearch/transport/nio/NioTransport.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 531a4d29989e1..5a9dba29f5e57 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -120,9 +120,6 @@ protected void closeChannels(List channels, boolean blocking) throws try { future.awaitClose(); } catch (Exception e) { - if (closingExceptions != null) { - closingExceptions = addClosingException(closingExceptions, e); - } closingExceptions = addClosingException(closingExceptions, e); } } From f209337814f8d9b856ebe253babea9d8a3117299 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 Aug 2017 10:25:51 -0500 Subject: [PATCH 3/6] Remove listener adaptor --- .../elasticsearch/action/ActionListener.java | 23 +++++++++ .../nio/channel/ChannelConsumerAdaptor.java | 49 ------------------- .../transport/nio/channel/ChannelFactory.java | 5 +- .../channel/NioServerSocketChannelTests.java | 7 ++- .../nio/channel/NioSocketChannelTests.java | 7 ++- 5 files changed, 36 insertions(+), 55 deletions(-) delete mode 100644 test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index f9fafa9f95a2e..78e6b6f8cad29 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -50,6 +50,29 @@ public interface ActionListener { * @param the type of the response * @return a listener that listens for responses and invokes the consumer when received */ + static ActionListener wrap(Consumer onResponse, Consumer onFailure) { + return new ActionListener() { + @Override + public void onResponse(Response response) { + onResponse.accept(response); + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } + }; + } + + /** + * Creates a listener that listens for a response (or failure) and executes the + * corresponding consumer when the response (or failure) is received. + * + * @param onResponse the checked consumer of the response, when the listener receives one + * @param onFailure the consumer of the failure, when the listener receives one + * @param the type of the response + * @return a listener that listens for responses and invokes the consumer when received + */ static ActionListener wrap(CheckedConsumer onResponse, Consumer onFailure) { return new ActionListener() { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java deleted file mode 100644 index df3864bf48be0..0000000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelConsumerAdaptor.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio.channel; - -import org.elasticsearch.action.ActionListener; - -import java.util.function.Consumer; - -public class ChannelConsumerAdaptor implements ActionListener { - - private final NioChannel channel; - private final Consumer consumer; - - private ChannelConsumerAdaptor(NioChannel channel, Consumer consumer) { - this.channel = channel; - this.consumer = consumer; - } - - static ChannelConsumerAdaptor adapt(NioChannel channel, Consumer consumer) { - return new ChannelConsumerAdaptor(channel, consumer); - } - - @Override - public void onResponse(NioChannel channel) { - consumer.accept(channel); - } - - @Override - public void onFailure(Exception e) { - consumer.accept(channel); - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index 4227b9a0cd549..126fa5a36f9ec 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.mocksocket.PrivilegedSocketAccess; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.AcceptingSelector; @@ -55,7 +56,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(channel, closeListener)); + channel.getCloseFuture().addListener(ActionListener.wrap(closeListener, (e) -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } @@ -65,7 +66,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel); NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(channel, closeListener)); + channel.getCloseFuture().addListener(ActionListener.wrap(closeListener, (e) -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index b3693bfb7866e..9c54e1ca59c50 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio.channel; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.AcceptingSelector; import org.elasticsearch.transport.nio.AcceptorEventHandler; @@ -33,6 +34,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import static org.mockito.Mockito.mock; @@ -64,10 +66,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti CountDownLatch latch = new CountDownLatch(1); NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); - channel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(channel, (c) -> { + Consumer listener = (c) -> { ref.set(c); latch.countDown(); - })); + }; + channel.getCloseFuture().addListener(ActionListener.wrap(listener, (e) -> listener.accept(channel))); CloseFuture closeFuture = channel.getCloseFuture(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index 90002c38af724..0ddfbb1d7deba 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.nio.channel; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.nio.SocketEventHandler; import org.elasticsearch.transport.nio.SocketSelector; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -67,10 +69,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class)); - socketChannel.getCloseFuture().addListener(ChannelConsumerAdaptor.adapt(socketChannel, (c) -> { + Consumer listener = (c) -> { ref.set(c); latch.countDown(); - })); + }; + socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener, (e) -> listener.accept(socketChannel))); CloseFuture closeFuture = socketChannel.getCloseFuture(); assertFalse(closeFuture.isClosed()); From 3283faeedbe5d792ed832ede52a7ef2be78780c4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 Aug 2017 10:35:20 -0500 Subject: [PATCH 4/6] Wrap runnable instead --- .../elasticsearch/action/ActionListener.java | 22 +++++++++---------- .../transport/nio/channel/ChannelFactory.java | 4 ++-- .../channel/NioServerSocketChannelTests.java | 2 +- .../nio/channel/NioSocketChannelTests.java | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index 78e6b6f8cad29..f692353a6e829 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -45,16 +45,21 @@ public interface ActionListener { * Creates a listener that listens for a response (or failure) and executes the * corresponding consumer when the response (or failure) is received. * - * @param onResponse the consumer of the response, when the listener receives one + * @param onResponse the checked consumer of the response, when the listener receives one * @param onFailure the consumer of the failure, when the listener receives one * @param the type of the response * @return a listener that listens for responses and invokes the consumer when received */ - static ActionListener wrap(Consumer onResponse, Consumer onFailure) { + static ActionListener wrap(CheckedConsumer onResponse, + Consumer onFailure) { return new ActionListener() { @Override public void onResponse(Response response) { - onResponse.accept(response); + try { + onResponse.accept(response); + } catch (Exception e) { + onFailure(e); + } } @Override @@ -73,21 +78,16 @@ public void onFailure(Exception e) { * @param the type of the response * @return a listener that listens for responses and invokes the consumer when received */ - static ActionListener wrap(CheckedConsumer onResponse, - Consumer onFailure) { + static ActionListener wrap(Runnable runnable) { return new ActionListener() { @Override public void onResponse(Response response) { - try { - onResponse.accept(response); - } catch (Exception e) { - onFailure(e); - } + runnable.run(); } @Override public void onFailure(Exception e) { - onFailure.accept(e); + runnable.run(); } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index 126fa5a36f9ec..ffd751e438321 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -56,7 +56,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().addListener(ActionListener.wrap(closeListener, (e) -> closeListener.accept(channel))); + channel.getCloseFuture().addListener(ActionListener.wrap(() -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } @@ -66,7 +66,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel); NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().addListener(ActionListener.wrap(closeListener, (e) -> closeListener.accept(channel))); + channel.getCloseFuture().addListener(ActionListener.wrap(() -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index 9c54e1ca59c50..da61c1a0d4499 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -70,7 +70,7 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti ref.set(c); latch.countDown(); }; - channel.getCloseFuture().addListener(ActionListener.wrap(listener, (e) -> listener.accept(channel))); + channel.getCloseFuture().addListener(ActionListener.wrap(() -> listener.accept(channel))); CloseFuture closeFuture = channel.getCloseFuture(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index 0ddfbb1d7deba..bc217b55d80d4 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -73,7 +73,7 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti ref.set(c); latch.countDown(); }; - socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener, (e) -> listener.accept(socketChannel))); + socketChannel.getCloseFuture().addListener(ActionListener.wrap(() -> listener.accept(socketChannel))); CloseFuture closeFuture = socketChannel.getCloseFuture(); assertFalse(closeFuture.isClosed()); From 691d7ed8a528687fb7e1afc2545c76fc4ed6e79f Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 Aug 2017 10:39:07 -0500 Subject: [PATCH 5/6] Fix documentation --- .../main/java/org/elasticsearch/action/ActionListener.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index f692353a6e829..f0e43b1402e0f 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -71,12 +71,11 @@ public void onFailure(Exception e) { /** * Creates a listener that listens for a response (or failure) and executes the - * corresponding consumer when the response (or failure) is received. + * corresponding runnable when the response (or failure) is received. * - * @param onResponse the checked consumer of the response, when the listener receives one - * @param onFailure the consumer of the failure, when the listener receives one + * @param runnable the runnable to be executed when the listener is called. * @param the type of the response - * @return a listener that listens for responses and invokes the consumer when received + * @return a listener that listens for responses and invokes the runnable when received */ static ActionListener wrap(Runnable runnable) { return new ActionListener() { From d89d8f953c7347e451be29c7c01a89642ec6a113 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 18 Aug 2017 11:06:41 -0500 Subject: [PATCH 6/6] Go back to consumer version --- .../elasticsearch/action/ActionListener.java | 22 ------------------- .../transport/nio/channel/ChannelFactory.java | 4 ++-- .../channel/NioServerSocketChannelTests.java | 2 +- .../nio/channel/NioSocketChannelTests.java | 2 +- 4 files changed, 4 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index f0e43b1402e0f..fa32ab417737c 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -69,28 +69,6 @@ public void onFailure(Exception e) { }; } - /** - * Creates a listener that listens for a response (or failure) and executes the - * corresponding runnable when the response (or failure) is received. - * - * @param runnable the runnable to be executed when the listener is called. - * @param the type of the response - * @return a listener that listens for responses and invokes the runnable when received - */ - static ActionListener wrap(Runnable runnable) { - return new ActionListener() { - @Override - public void onResponse(Response response) { - runnable.run(); - } - - @Override - public void onFailure(Exception e) { - runnable.run(); - } - }; - } - /** * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java index ffd751e438321..f2f92e94e509d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java @@ -56,7 +56,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().addListener(ActionListener.wrap(() -> closeListener.accept(channel))); + channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } @@ -66,7 +66,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel); NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector); channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel)); - channel.getCloseFuture().addListener(ActionListener.wrap(() -> closeListener.accept(channel))); + channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel))); scheduleChannel(channel, selector); return channel; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java index da61c1a0d4499..367df0c78f4c8 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannelTests.java @@ -70,7 +70,7 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti ref.set(c); latch.countDown(); }; - channel.getCloseFuture().addListener(ActionListener.wrap(() -> listener.accept(channel))); + channel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(channel))); CloseFuture closeFuture = channel.getCloseFuture(); diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java index bc217b55d80d4..75ec57b2603db 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/channel/NioSocketChannelTests.java @@ -73,7 +73,7 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti ref.set(c); latch.countDown(); }; - socketChannel.getCloseFuture().addListener(ActionListener.wrap(() -> listener.accept(socketChannel))); + socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(socketChannel))); CloseFuture closeFuture = socketChannel.getCloseFuture(); assertFalse(closeFuture.isClosed());