Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26679 Wait on the future returned by FanOutOneBlockAsyncDFSOutp… #4039

Merged
merged 15 commits into from
Jan 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -131,7 +133,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final ByteBufAllocator alloc;

private static final class Callback {
protected static final class Callback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need it to be protected?

Copy link
Contributor Author

@comnetwork comnetwork Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had fix it


private final CompletableFuture<Long> future;

Expand All @@ -157,6 +159,13 @@ public Callback(CompletableFuture<Long> future, long ackedLength,
replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Set<ChannelId> getUnfinishedReplicas() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is for checking whether we have finished the ack? I think this could also be done by adding a ChannelInboundHandler, as the completed or failed method will be called inside event loop?

Copy link
Contributor Author

@comnetwork comnetwork Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had fix it

return this.unfinishedReplicas;
}

}

private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
Expand Down Expand Up @@ -187,7 +196,7 @@ private enum State {
private final StreamSlowMonitor streamSlowMonitor;

// all lock-free to make it run faster
private void completed(Channel channel) {
protected void completed(Channel channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to make it protected?

Copy link
Contributor Author

@comnetwork comnetwork Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had fix it

for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
// if the current unfinished replicas does not contain us then it means that we have already
Expand Down Expand Up @@ -231,7 +240,11 @@ private void completed(Channel channel) {
// so that the implementation will not burn up our brain as there are multiple state changes and
// checks.
private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
if (state == State.BROKEN || state == State.CLOSED) {
if (state == State.CLOSED) {
return;
}
if (state == State.BROKEN) {
failWaitingAckQueue(channel, errorSupplier);
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
if (state == State.CLOSING) {
Expand All @@ -243,6 +256,11 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
}
// disable further write, and fail all pending ack.
state = State.BROKEN;
failWaitingAckQueue(channel, errorSupplier);
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
}

private void failWaitingAckQueue(Channel channel, Supplier<Throwable> errorSupplier) {
Throwable error = errorSupplier.get();
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
Callback c = iter.next();
Expand All @@ -259,7 +277,6 @@ private synchronized void failed(Channel channel, Supplier<Throwable> errorSuppl
}
break;
}
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
}

@Sharable
Expand Down Expand Up @@ -598,4 +615,16 @@ public boolean isBroken() {
public long getSyncedLength() {
return this.ackedBlockLength;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
return this.datanodeInfoMap;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Deque<Callback> getWaitingAckQueue() {
return this.waitingAckQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
}
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
Expand All @@ -30,7 +31,10 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -46,6 +50,7 @@
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.AfterClass;
Expand All @@ -57,12 +62,14 @@
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;

@Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Expand Down Expand Up @@ -272,4 +279,132 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec
}
assertArrayEquals(b, actual);
}

/**
* <pre>
* This test is for HBASE-26679. Consider there are two dataNodes: dn1 and dn2,dn2 is a slow DN.
* The threads sequence before HBASE-26679 is:
* 1.We write some data to {@link FanOutOneBlockAsyncDFSOutput} and then flush it, there are one
* {@link FanOutOneBlockAsyncDFSOutput.Callback} in
* {@link FanOutOneBlockAsyncDFSOutput#waitingAckQueue}.
* 2.The ack from dn1 arrives firstly and triggers Netty to invoke
* {@link FanOutOneBlockAsyncDFSOutput#completed} with dn1's channel, then in
* {@link FanOutOneBlockAsyncDFSOutput#completed}, dn1's channel is removed from
* {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas}.
* 3.But dn2 responds slowly, before dn2 sending ack,dn1 is shut down or have a exception,
* so {@link FanOutOneBlockAsyncDFSOutput#failed} is triggered by Netty with dn1's channel,
* and because the {@link FanOutOneBlockAsyncDFSOutput.Callback#unfinishedReplicas} does not
* contain dn1's channel,the {@link FanOutOneBlockAsyncDFSOutput.Callback} is skipped in
* {@link FanOutOneBlockAsyncDFSOutput#failed} method,and
* {@link FanOutOneBlockAsyncDFSOutput#state} is set to
* {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},and dn1,dn2 are all closed at the end of
* {@link FanOutOneBlockAsyncDFSOutput#failed}.
* 4.{@link FanOutOneBlockAsyncDFSOutput#failed} is triggered again by dn2 because it is closed,
* but because {@link FanOutOneBlockAsyncDFSOutput#state} is already
* {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN},the whole
* {@link FanOutOneBlockAsyncDFSOutput#failed} is skipped. So wait on the future
* returned by {@link FanOutOneBlockAsyncDFSOutput#flush} would be stuck for ever.
* After HBASE-26679, for above step 4,even if the {@link FanOutOneBlockAsyncDFSOutput#state}
* is already {@link FanOutOneBlockAsyncDFSOutput.State#BROKEN}, we would still try to trigger
* {@link FanOutOneBlockAsyncDFSOutput.Callback#future}.
* </pre>
*/
@Test
public void testFlushStuckWhenOneDataNodeFailedBeforeOtherDataNodeAck() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem for introducing a separated test class in my patch is that, we need to increase the READER_TIMEOUT value, we set it to 2 seconds in this test but then it will generate unexpected netty traficc and mess up the testing code here. The default value is 60 seconds, which is enough for this test.

So I suggest we introduce a separated test class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 , why we need to increase the READER_TIMEOUT value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said above, it will mess up the testing code. We will send heartbeat to DN when there is no out going packet, the timeout is controlled by READER_TIMEOUT. If we set it to 2 seconds, it will keep sending package out and DN will respond immedately and then mess up the testing handler added by us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 , I mean it seems not affect the test? The Response for heartbeat is ignored by your added ChannelInboundHandler? and in AckHandler.channelRead0, it is also ignore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, seems no big problem if we just ignore all inbound messages.

But it needs extra consideration, better not introduce more conditions to a test. When I first tried to implement the UT, I used a boolean flag to drop exact one package and the heartbeat message messed up the test. In the future if we want to change the implementation to simulate more, maybe another developer will fall into this too and waste more time...

Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();

DataNodeProperties firstDataNodeProperties = null;
try {
FanOutOneBlockAsyncDFSOutput out =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 2, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
Map<Channel,DatanodeInfo> datanodeInfoMap = out.getDatanodeInfoMap();
Iterator<Map.Entry<Channel,DatanodeInfo>> iterator = datanodeInfoMap.entrySet().iterator();
assertTrue(iterator.hasNext());
Map.Entry<Channel,DatanodeInfo> dn1Entry= iterator.next();
DatanodeInfo dn1DatanodeInfo = dn1Entry.getValue();

assertTrue(iterator.hasNext());
Map.Entry<Channel,DatanodeInfo> dn2Entry= iterator.next();
Channel dn2Channel= dn2Entry.getKey();

/**
* Here we add a {@link ChannelInboundHandlerAdapter} to eat all the responses to simulate a
* slow dn2.
*/
dn2Channel.pipeline().addFirst(new ChannelInboundHandlerAdapter() {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof ByteBuf)) {
ctx.fireChannelRead(msg);
}
}
});

byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length);
CompletableFuture<Long> future = out.flush(false);
/**
* Wait for ack from dn1.
*/
Deque<FanOutOneBlockAsyncDFSOutput.Callback> ackQueue = out.getWaitingAckQueue();
assertTrue(ackQueue.size() == 1);
FanOutOneBlockAsyncDFSOutput.Callback callback = ackQueue.getFirst();
while (callback.getUnfinishedReplicas().size() != 1) {
Thread.sleep(1000);
}

/**
* First ack is received from dn1,we could stop dn1 now.
*/
firstDataNodeProperties = findAndKillFirstDataNode(dn1DatanodeInfo);
assertTrue(firstDataNodeProperties != null);
try {
/**
* Before HBASE-26679,here we should be stuck, after HBASE-26679,we would fail soon with
* {@link ExecutionException}.
*/
future.get();
fail();
} catch (ExecutionException e) {
assertTrue(e != null);
LOG.info("expected exception caught when get future", e);
}
/**
* Make sure all the data node channel are closed.
*/
datanodeInfoMap.keySet().forEach(ch -> {
try {
ch.closeFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
} finally {
if (firstDataNodeProperties != null) {
CLUSTER.restartDataNode(firstDataNodeProperties);
}
}

}

private static DataNodeProperties findAndKillFirstDataNode(
DatanodeInfo firstDatanodeInfo) {
assertTrue(firstDatanodeInfo != null);
ArrayList<DataNode> dataNodes = CLUSTER.getDataNodes();
ArrayList<Integer> foundIndexes = new ArrayList<Integer>();
int index = 0;
for (DataNode dataNode : dataNodes) {
if (firstDatanodeInfo.getXferAddr().equals(dataNode.getDatanodeId().getXferAddr())) {
foundIndexes.add(index);
}
index++;
}
assertTrue(foundIndexes.size() == 1);
return CLUSTER.stopDataNode(foundIndexes.get(0));
}

}