Skip to content

Commit

Permalink
Use PlainListenableActionFuture for CloseFuture (#26242)
Browse files Browse the repository at this point in the history
Right now we use a custom future for the CloseFuture associated with a
channel. This is because we need special unwrapping logic to ensure that
exceptions from a future failure are a certain type (opposed to an
UncategorizedException). However, the current version is limiting
because we can only attach one listener.

This commit changes the CloseFuture to extend the
PlainListenableActionFuture. This change allows us to attach multiple
listeners.
  • Loading branch information
Tim-Brooks authored Aug 18, 2017
1 parent 6eef6c4 commit 5d7a78f
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ActionListener<Response> {
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the consumer of the response, when the listener receives one
* @param onResponse the checked consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class PlainListenableActionFuture<T> extends AdapterActionFuture<T, T> im
volatile Object listeners;
boolean executedListeners = false;

private PlainListenableActionFuture() {}
protected PlainListenableActionFuture() {}

/**
* This method returns a listenable future. The listeners will be called on completion of the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ private void closeChannels(ArrayList<NioSocketChannel> connections, Exception e)
for (final NioSocketChannel socketChannel : connections) {
try {
socketChannel.closeAsync().awaitClose();
} catch (InterruptedException inner) {
logger.trace("exception while closing channel", e);
e.addSuppressed(inner);
Thread.currentThread().interrupt();
} catch (Exception inner) {
logger.trace("exception while closing channel", e);
e.addSuppressed(inner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ protected void closeChannels(List<NioChannel> channels, boolean blocking) throws
for (CloseFuture future : futures) {
try {
future.awaitClose();
IOException closeException = future.getCloseException();
if (closeException != null) {
closingExceptions = addClosingException(closingExceptions, closeException);
}
} catch (InterruptedException e) {
} catch (Exception e) {
closingExceptions = addClosingException(closingExceptions, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void closeFromSelector() {
closeRawChannel();
closedOnThisCall = closeFuture.channelClosed(this);
} catch (IOException e) {
closedOnThisCall = closeFuture.channelCloseThrewException(this, e);
closedOnThisCall = closeFuture.channelCloseThrewException(e);
} finally {
if (closedOnThisCall) {
selector.removeRegisteredChannel(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.mocksocket.PrivilegedSocketAccess;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.AcceptingSelector;
Expand Down Expand Up @@ -55,7 +56,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector);
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
channel.getCloseFuture().setListener(closeListener);
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
scheduleChannel(channel, selector);
return channel;
}
Expand All @@ -65,7 +66,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel);
NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector);
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
channel.getCloseFuture().setListener(closeListener);
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
scheduleChannel(channel, selector);
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,37 @@

package org.elasticsearch.transport.nio.channel;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.action.support.PlainListenableActionFuture;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class CloseFuture extends BaseFuture<NioChannel> {

private final SetOnce<Consumer<NioChannel>> listener = new SetOnce<>();
public class CloseFuture extends PlainListenableActionFuture<NioChannel> {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel close future");
}

public void awaitClose() throws InterruptedException, IOException {
public void awaitClose() throws IOException {
try {
super.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
}

public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, IOException {
public void awaitClose(long timeout, TimeUnit unit) throws TimeoutException, IOException {
try {
super.get(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
Expand Down Expand Up @@ -76,31 +78,13 @@ public boolean isClosed() {
return super.isDone();
}

public void setListener(Consumer<NioChannel> listener) {
this.listener.set(listener);
}

boolean channelClosed(NioChannel channel) {
boolean set = set(channel);
if (set) {
Consumer<NioChannel> listener = this.listener.get();
if (listener != null) {
listener.accept(channel);
}
}
return set;
return set(channel);
}


boolean channelCloseThrewException(NioChannel channel, IOException ex) {
boolean set = setException(ex);
if (set) {
Consumer<NioChannel> listener = this.listener.get();
if (listener != null) {
listener.accept(channel);
}
}
return set;
boolean channelCloseThrewException(IOException ex) {
return setException(ex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio.channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.AcceptorEventHandler;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -64,10 +66,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti
CountDownLatch latch = new CountDownLatch(1);

NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector);
channel.getCloseFuture().setListener((c) -> {
Consumer<NioChannel> listener = (c) -> {
ref.set(c);
latch.countDown();
});
};
channel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(channel)));

CloseFuture closeFuture = channel.getCloseFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio.channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.SocketEventHandler;
import org.elasticsearch.transport.nio.SocketSelector;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -67,10 +69,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti

NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector);
socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class));
socketChannel.getCloseFuture().setListener((c) -> {
Consumer<NioChannel> listener = (c) -> {
ref.set(c);
latch.countDown();
});
};
socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(socketChannel)));
CloseFuture closeFuture = socketChannel.getCloseFuture();

assertFalse(closeFuture.isClosed());
Expand Down

0 comments on commit 5d7a78f

Please sign in to comment.