Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PlainListenableActionFuture for CloseFuture #26242

Merged
merged 7 commits into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface ActionListener<Response> {
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the consumer of the response, when the listener receives one
* @param onResponse the checked consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class PlainListenableActionFuture<T> extends AdapterActionFuture<T, T> im
volatile Object listeners;
boolean executedListeners = false;

private PlainListenableActionFuture() {}
protected PlainListenableActionFuture() {}

/**
* This method returns a listenable future. The listeners will be called on completion of the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ private void closeChannels(ArrayList<NioSocketChannel> connections, Exception e)
for (final NioSocketChannel socketChannel : connections) {
try {
socketChannel.closeAsync().awaitClose();
} catch (InterruptedException inner) {
logger.trace("exception while closing channel", e);
e.addSuppressed(inner);
Thread.currentThread().interrupt();
} catch (Exception inner) {
logger.trace("exception while closing channel", e);
e.addSuppressed(inner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ protected void closeChannels(List<NioChannel> channels, boolean blocking) throws
for (CloseFuture future : futures) {
try {
future.awaitClose();
IOException closeException = future.getCloseException();
if (closeException != null) {
closingExceptions = addClosingException(closingExceptions, closeException);
}
} catch (InterruptedException e) {
} catch (Exception e) {
closingExceptions = addClosingException(closingExceptions, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void closeFromSelector() {
closeRawChannel();
closedOnThisCall = closeFuture.channelClosed(this);
} catch (IOException e) {
closedOnThisCall = closeFuture.channelCloseThrewException(this, e);
closedOnThisCall = closeFuture.channelCloseThrewException(e);
} finally {
if (closedOnThisCall) {
selector.removeRegisteredChannel(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.mocksocket.PrivilegedSocketAccess;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.AcceptingSelector;
Expand Down Expand Up @@ -55,7 +56,7 @@ public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSe
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector);
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
channel.getCloseFuture().setListener(closeListener);
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
scheduleChannel(channel, selector);
return channel;
}
Expand All @@ -65,7 +66,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel);
NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector);
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
channel.getCloseFuture().setListener(closeListener);
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
scheduleChannel(channel, selector);
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,37 @@

package org.elasticsearch.transport.nio.channel;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.action.support.PlainListenableActionFuture;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public class CloseFuture extends BaseFuture<NioChannel> {

private final SetOnce<Consumer<NioChannel>> listener = new SetOnce<>();
public class CloseFuture extends PlainListenableActionFuture<NioChannel> {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("Cannot cancel close future");
}

public void awaitClose() throws InterruptedException, IOException {
public void awaitClose() throws IOException {
try {
super.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
}

public void awaitClose(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, IOException {
public void awaitClose(long timeout, TimeUnit unit) throws TimeoutException, IOException {
try {
super.get(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw (IOException) e.getCause();
}
Expand Down Expand Up @@ -76,31 +78,13 @@ public boolean isClosed() {
return super.isDone();
}

public void setListener(Consumer<NioChannel> listener) {
this.listener.set(listener);
}

boolean channelClosed(NioChannel channel) {
boolean set = set(channel);
if (set) {
Consumer<NioChannel> listener = this.listener.get();
if (listener != null) {
listener.accept(channel);
}
}
return set;
return set(channel);
}


boolean channelCloseThrewException(NioChannel channel, IOException ex) {
boolean set = setException(ex);
if (set) {
Consumer<NioChannel> listener = this.listener.get();
if (listener != null) {
listener.accept(channel);
}
}
return set;
boolean channelCloseThrewException(IOException ex) {
return setException(ex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio.channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.AcceptorEventHandler;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -64,10 +66,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti
CountDownLatch latch = new CountDownLatch(1);

NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector);
channel.getCloseFuture().setListener((c) -> {
Consumer<NioChannel> listener = (c) -> {
ref.set(c);
latch.countDown();
});
};
channel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(channel)));

CloseFuture closeFuture = channel.getCloseFuture();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport.nio.channel;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.SocketEventHandler;
import org.elasticsearch.transport.nio.SocketSelector;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -67,10 +69,11 @@ public void testClose() throws IOException, TimeoutException, InterruptedExcepti

NioSocketChannel socketChannel = new DoNotCloseChannel(NioChannel.CLIENT, mock(SocketChannel.class), selector);
socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class));
socketChannel.getCloseFuture().setListener((c) -> {
Consumer<NioChannel> listener = (c) -> {
ref.set(c);
latch.countDown();
});
};
socketChannel.getCloseFuture().addListener(ActionListener.wrap(listener::accept, (e) -> listener.accept(socketChannel)));
CloseFuture closeFuture = socketChannel.getCloseFuture();

assertFalse(closeFuture.isClosed());
Expand Down