Skip to content

Commit 8c2c812

Browse files
committed
HDFS-11472. Fix inconsistent replica size after a data pipeline failure. Contributed by Erik Krogen and Wei-Chiu Chuang.
1 parent b3269f7 commit 8c2c812

File tree

3 files changed

+78
-5
lines changed

3 files changed

+78
-5
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,13 +1424,27 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
14241424
minBytesRcvd + ", " + maxBytesRcvd + "].");
14251425
}
14261426

1427+
long bytesOnDisk = rbw.getBytesOnDisk();
1428+
long blockDataLength = rbw.getReplicaInfo().getBlockDataLength();
1429+
if (bytesOnDisk != blockDataLength) {
1430+
LOG.info("Resetting bytesOnDisk to match blockDataLength (={}) for " +
1431+
"replica {}", blockDataLength, rbw);
1432+
bytesOnDisk = blockDataLength;
1433+
rbw.setLastChecksumAndDataLen(bytesOnDisk, null);
1434+
}
1435+
1436+
if (bytesOnDisk < bytesAcked) {
1437+
throw new ReplicaNotFoundException("Found fewer bytesOnDisk than " +
1438+
"bytesAcked for replica " + rbw);
1439+
}
1440+
14271441
FsVolumeReference ref = rbw.getReplicaInfo()
14281442
.getVolume().obtainReference();
14291443
try {
14301444
// Truncate the potentially corrupt portion.
14311445
// If the source was client and the last node in the pipeline was lost,
14321446
// any corrupt data written after the acked length can go unnoticed.
1433-
if (numBytes > bytesAcked) {
1447+
if (bytesOnDisk > bytesAcked) {
14341448
rbw.getReplicaInfo().truncateBlock(bytesAcked);
14351449
rbw.setNumBytes(bytesAcked);
14361450
rbw.setLastChecksumAndDataLen(bytesAcked, null);
@@ -2460,8 +2474,8 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
24602474

24612475
//check replica bytes on disk.
24622476
if (replica.getBytesOnDisk() < replica.getVisibleLength()) {
2463-
throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
2464-
+ " getBytesOnDisk() < getVisibleLength(), rip=" + replica);
2477+
throw new IOException("getBytesOnDisk() < getVisibleLength(), rip="
2478+
+ replica);
24652479
}
24662480

24672481
//check the replica's files

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.hadoop.hdfs.server.datanode.DataNode;
3333
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
3434
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
35+
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
3536
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
3637
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
3738
import org.apache.hadoop.hdfs.server.datanode.Replica;
@@ -302,6 +303,15 @@ public Replica createRBW(FsVolumeSpi volume, ExtendedBlock eb)
302303
rbw.getBlockFile().createNewFile();
303304
rbw.getMetaFile().createNewFile();
304305
dataset.volumeMap.add(bpid, rbw);
306+
307+
FileIoProvider fileIoProvider = rbw.getFileIoProvider();
308+
309+
try (RandomAccessFile blockRAF = fileIoProvider.getRandomAccessFile(
310+
volume, rbw.getBlockFile(), "rw")) {
311+
//extend blockFile
312+
blockRAF.setLength(eb.getNumBytes());
313+
}
314+
saveMetaFileHeader(rbw.getMetaFile());
305315
return rbw;
306316
}
307317

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.fail;
2223

2324
import java.io.IOException;
25+
import java.io.RandomAccessFile;
2426
import java.util.ArrayList;
2527
import java.util.Arrays;
2628
import java.util.List;
@@ -36,12 +38,14 @@
3638
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
3739
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
3840
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
41+
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
3942
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
4043
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
4144
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
4245
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
4346
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
4447
import org.apache.hadoop.hdfs.server.namenode.NameNode;
48+
import org.apache.hadoop.test.GenericTestUtils;
4549
import org.apache.hadoop.util.AutoCloseableLock;
4650
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
4751
import org.junit.Assert;
@@ -154,7 +158,7 @@ private ExtendedBlock[] setup(String bpid, FsDatasetTestUtils testUtils)
154158

155159
ExtendedBlock[] blocks = new ExtendedBlock[] {
156160
new ExtendedBlock(bpid, 1, 1, 2001), new ExtendedBlock(bpid, 2, 1, 2002),
157-
new ExtendedBlock(bpid, 3, 1, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
161+
new ExtendedBlock(bpid, 3, 2, 2003), new ExtendedBlock(bpid, 4, 1, 2004),
158162
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
159163
};
160164

@@ -552,7 +556,52 @@ public void testReplicaMapAfterDatanodeRestart() throws Exception {
552556
cluster.shutdown();
553557
}
554558
}
555-
559+
560+
/**
561+
* Test that we can successfully recover a {@link ReplicaBeingWritten}
562+
* which has inconsistent metadata (bytes were written to disk but bytesOnDisk
563+
* was not updated) but that recovery fails when the block is actually
564+
* corrupt (bytes are not present on disk).
565+
*/
566+
@Test
567+
public void testRecoverInconsistentRbw() throws IOException {
568+
Configuration conf = new HdfsConfiguration();
569+
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
570+
cluster.waitActive();
571+
DataNode dn = cluster.getDataNodes().get(0);
572+
FsDatasetImpl fsDataset = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
573+
574+
// set up replicasMap
575+
String bpid = cluster.getNamesystem().getBlockPoolId();
576+
ExtendedBlock[] blocks = setup(bpid, cluster.getFsDatasetTestUtils(dn));
577+
578+
ReplicaBeingWritten rbw = (ReplicaBeingWritten)fsDataset.
579+
getReplicaInfo(bpid, blocks[RBW].getBlockId());
580+
long bytesOnDisk = rbw.getBytesOnDisk();
581+
// simulate an inconsistent replica length update by reducing in-memory
582+
// value of on disk length
583+
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
584+
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
585+
rbw.getNumBytes());
586+
// after the recovery, on disk length should equal acknowledged length.
587+
Assert.assertTrue(rbw.getBytesOnDisk() == rbw.getBytesAcked());
588+
589+
// reduce on disk length again; this time actually truncate the file to
590+
// simulate the data not being present
591+
rbw.setLastChecksumAndDataLen(bytesOnDisk - 1, null);
592+
try (RandomAccessFile blockRAF = rbw.getFileIoProvider().
593+
getRandomAccessFile(rbw.getVolume(), rbw.getBlockFile(), "rw")) {
594+
// truncate blockFile
595+
blockRAF.setLength(bytesOnDisk - 1);
596+
fsDataset.recoverRbw(blocks[RBW], blocks[RBW].getGenerationStamp(), 0L,
597+
rbw.getNumBytes());
598+
fail("recovery should have failed");
599+
} catch (ReplicaNotFoundException rnfe) {
600+
GenericTestUtils.assertExceptionContains("Found fewer bytesOnDisk than " +
601+
"bytesAcked for replica", rnfe);
602+
}
603+
}
604+
556605
/**
557606
* Compare the replica map before and after the restart
558607
**/

0 commit comments

Comments
 (0)