Skip to content

Commit c258171

Browse files
author
Yongjun Zhang
committed
HDFS-10652. Add a unit test for HDFS-4660. Contributed by Vinayakumar B., Wei-Chiu Chuang, Yongjun Zhang.
1 parent 19c743c commit c258171

File tree

5 files changed

+144
-3
lines changed

5 files changed

+144
-3
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,6 +1306,7 @@ public void run() {
13061306
long ackRecvNanoTime = 0;
13071307
try {
13081308
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
1309+
DataNodeFaultInjector.get().failPipeline(replicaInfo, mirrorAddr);
13091310
// read an ack from downstream datanode
13101311
ack.readFields(downstreamIn);
13111312
ackRecvNanoTime = System.nanoTime();

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2403,7 +2403,8 @@ public void run() {
24032403
blockSender.sendBlock(out, unbufOut, null);
24042404

24052405
// no response necessary
2406-
LOG.info(getClass().getSimpleName() + ": Transmitted " + b
2406+
LOG.info(getClass().getSimpleName() + ", at "
2407+
+ DataNode.this.getDisplayName() + ": Transmitted " + b
24072408
+ " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
24082409

24092410
// read ack

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,7 @@ public void stopSendingPacketDownstream() throws IOException {}
5555
public void noRegistration() throws IOException { }
5656

5757
public void failMirrorConnection() throws IOException { }
58+
59+
public void failPipeline(ReplicaInPipelineInterface replicaInfo,
60+
String mirrorAddr) throws IOException { }
5861
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1505,7 +1505,7 @@ public ReplicaHandler recoverRbw(
15051505
if (!rbw.attemptToSetWriter(null, Thread.currentThread())) {
15061506
throw new MustStopExistingWriter(rbw);
15071507
}
1508-
LOG.info("Recovering " + rbw);
1508+
LOG.info("At " + datanode.getDisplayName() + ", Recovering " + rbw);
15091509
return recoverRbwImpl(rbw, b, newGS, minBytesRcvd, maxBytesRcvd);
15101510
}
15111511
} catch (MustStopExistingWriter e) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import static org.junit.Assert.assertTrue;
21+
2022
import java.io.IOException;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import java.util.Random;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2127

2228
import com.google.common.base.Supplier;
29+
2330
import org.apache.hadoop.conf.Configuration;
2431
import org.apache.hadoop.fs.FSDataInputStream;
2532
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -31,6 +38,8 @@
3138
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
3239
import org.apache.hadoop.hdfs.server.datanode.DataNode;
3340
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
41+
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
42+
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
3443
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
3544
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
3645
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -39,12 +48,15 @@
3948
import org.junit.Assert;
4049
import org.junit.Test;
4150
import org.mockito.Mockito;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
4253

4354
/**
4455
* This tests pipeline recovery related client protocol works correct or not.
4556
*/
4657
public class TestClientProtocolForPipelineRecovery {
47-
58+
private static final Logger LOG =
59+
LoggerFactory.getLogger(TestClientProtocolForPipelineRecovery.class);
4860
@Test public void testGetNewStamp() throws IOException {
4961
int numDataNodes = 1;
5062
Configuration conf = new HdfsConfiguration();
@@ -477,4 +489,128 @@ public void stopSendingPacketDownstream() throws IOException {
477489
DataNodeFaultInjector.set(oldDnInjector);
478490
}
479491
}
492+
493+
// Test to verify that blocks are no longer corrupted after HDFS-4660.
494+
// Revert HDFS-4660 and the other related ones (HDFS-9220, HDFS-8722), this
495+
// test would fail.
496+
// Scenario: Prior to the fix, block get corrupted when the transferBlock
497+
// happens during pipeline recovery with extra bytes to make up the end of
498+
// chunk.
499+
// For verification, Need to fail the pipeline for last datanode when the
500+
// second datanode have more bytes on disk than already acked bytes.
501+
// This will enable to transfer extra bytes to the newNode to makeup
502+
// end-of-chunk during pipeline recovery. This is achieved by the customized
503+
// DataNodeFaultInjector class in this test.
504+
// For detailed info, please refer to HDFS-4660 and HDFS-10587. HDFS-9220
505+
// fixes an issue in HDFS-4660 patch, and HDFS-8722 is an optimization.
506+
@Test
507+
public void testPipelineRecoveryWithTransferBlock() throws Exception {
508+
final int chunkSize = 512;
509+
final int oneWriteSize = 5000;
510+
final int totalSize = 1024 * 1024;
511+
final int errorInjectionPos = 512;
512+
Configuration conf = new HdfsConfiguration();
513+
// Need 4 datanodes to verify the replaceDatanode during pipeline recovery
514+
final MiniDFSCluster cluster =
515+
new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
516+
DataNodeFaultInjector old = DataNodeFaultInjector.get();
517+
518+
try {
519+
DistributedFileSystem fs = cluster.getFileSystem();
520+
Path fileName = new Path("/f");
521+
FSDataOutputStream o = fs.create(fileName);
522+
int count = 0;
523+
// Flush to get the pipeline created.
524+
o.writeBytes("hello");
525+
o.hflush();
526+
DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream();
527+
final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
528+
final String lastDn = pipeline[2].getXferAddr(false);
529+
final AtomicBoolean failed = new AtomicBoolean(false);
530+
531+
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
532+
@Override
533+
public void failPipeline(ReplicaInPipelineInterface replicaInfo,
534+
String mirror) throws IOException {
535+
if (!lastDn.equals(mirror)) {
536+
// Only fail for second DN
537+
return;
538+
}
539+
if (!failed.get() &&
540+
(replicaInfo.getBytesAcked() > errorInjectionPos) &&
541+
(replicaInfo.getBytesAcked() % chunkSize != 0)) {
542+
int count = 0;
543+
while (count < 10) {
544+
// Fail the pipeline (Throw exception) when:
545+
// 1. bytsAcked is not at chunk boundary (checked in the if
546+
// statement above)
547+
// 2. bytesOnDisk is bigger than bytesAcked and at least
548+
// reaches (or go beyond) the end of the chunk that
549+
// bytesAcked is in (checked in the if statement below).
550+
// At this condition, transferBlock that happens during
551+
// pipeline recovery would transfer extra bytes to make up to the
552+
// end of the chunk. And this is when the block corruption
553+
// described in HDFS-4660 would occur.
554+
if ((replicaInfo.getBytesOnDisk() / chunkSize) -
555+
(replicaInfo.getBytesAcked() / chunkSize) >= 1) {
556+
failed.set(true);
557+
throw new IOException(
558+
"Failing Pipeline " + replicaInfo.getBytesAcked() + " : "
559+
+ replicaInfo.getBytesOnDisk());
560+
}
561+
try {
562+
Thread.sleep(200);
563+
} catch (InterruptedException e) {
564+
}
565+
count++;
566+
}
567+
}
568+
}
569+
});
570+
571+
Random r = new Random();
572+
byte[] b = new byte[oneWriteSize];
573+
while (count < totalSize) {
574+
r.nextBytes(b);
575+
o.write(b);
576+
count += oneWriteSize;
577+
o.hflush();
578+
}
579+
580+
assertTrue("Expected a failure in the pipeline", failed.get());
581+
DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes();
582+
o.close();
583+
// Trigger block report to NN
584+
for (DataNode d: cluster.getDataNodes()) {
585+
DataNodeTestUtils.triggerBlockReport(d);
586+
}
587+
// Read from the replaced datanode to verify the corruption. So shutdown
588+
// all other nodes in the pipeline.
589+
List<DatanodeInfo> pipelineList = Arrays.asList(pipeline);
590+
DatanodeInfo newNode = null;
591+
for (DatanodeInfo node : newNodes) {
592+
if (!pipelineList.contains(node)) {
593+
newNode = node;
594+
break;
595+
}
596+
}
597+
LOG.info("Number of nodes in pipeline: {} newNode {}",
598+
newNodes.length, newNode.getName());
599+
// shutdown old 2 nodes
600+
for (int i = 0; i < newNodes.length; i++) {
601+
if (newNodes[i].getName().equals(newNode.getName())) {
602+
continue;
603+
}
604+
LOG.info("shutdown {}", newNodes[i].getName());
605+
cluster.stopDataNode(newNodes[i].getName());
606+
}
607+
608+
// Read should be successfull from only the newNode. There should not be
609+
// any corruption reported.
610+
DFSTestUtil.readFile(fs, fileName);
611+
} finally {
612+
DataNodeFaultInjector.set(old);
613+
cluster.shutdown();
614+
}
615+
}
480616
}

0 commit comments

Comments
 (0)