Skip to content

Commit cb2dce3

Browse files
committed
HDFS-15240. Erasure Coding: dirty buffer causes reconstruction block error. Contributed by HuangTao.
1 parent edd9b65 commit cb2dce3

File tree

7 files changed

+316
-3
lines changed

7 files changed

+316
-3
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public synchronized ByteBuffer getBuffer(boolean direct, int length) {
9696
ByteBuffer.allocate(length);
9797
}
9898
tree.remove(entry.getKey());
99+
entry.getValue().clear();
99100
return entry.getValue();
100101
}
101102

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,22 @@ public void throwTooManyOpenFiles() throws FileNotFoundException {
9696
*/
9797
public void stripedBlockReconstruction() throws IOException {}
9898

99+
/**
100+
* Used as a hook to inject latency when read block
101+
* in erasure coding reconstruction process.
102+
*/
103+
public void delayBlockReader() {}
104+
105+
/**
106+
* Used as a hook to inject intercept when free the block reader buffer.
107+
*/
108+
public void interceptFreeBlockReaderBuffer() {}
109+
110+
/**
111+
* Used as a hook to inject intercept When finish reading from block.
112+
*/
113+
public void interceptBlockReader() {}
114+
99115
/**
100116
* Used as a hook to inject intercept when BPOfferService hold lock.
101117
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
3232
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
3333
import org.apache.hadoop.hdfs.server.datanode.DataNode;
34+
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
3435
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
3536
import org.apache.hadoop.io.IOUtils;
3637
import org.apache.hadoop.net.NetUtils;
@@ -95,6 +96,7 @@ ByteBuffer getReadBuffer() {
9596
}
9697

9798
void freeReadBuffer() {
99+
DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer();
98100
buffer = null;
99101
}
100102

@@ -179,6 +181,8 @@ public BlockReadStats call() throws Exception {
179181
} catch (IOException e) {
180182
LOG.info(e.getMessage());
181183
throw e;
184+
} finally {
185+
DataNodeFaultInjector.get().interceptBlockReader();
182186
}
183187
}
184188
};
@@ -188,6 +192,7 @@ public BlockReadStats call() throws Exception {
188192
* Perform actual reading of bytes from block.
189193
*/
190194
private BlockReadStats actualReadFromBlock() throws IOException {
195+
DataNodeFaultInjector.get().delayBlockReader();
191196
int len = buffer.remaining();
192197
int n = 0;
193198
while (n < len) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
1919

2020
import com.google.common.base.Preconditions;
21+
import java.util.concurrent.TimeUnit;
2122
import org.apache.hadoop.classification.InterfaceAudience;
2223
import org.apache.hadoop.conf.Configuration;
2324
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -328,14 +329,14 @@ int[] doReadMinimumSources(int reconstructLength,
328329
// cancel remaining reads if we read successfully from minimum
329330
// number of source DNs required by reconstruction.
330331
cancelReads(futures.keySet());
331-
futures.clear();
332+
clearFuturesAndService();
332333
break;
333334
}
334335
}
335336
} catch (InterruptedException e) {
336337
LOG.info("Read data interrupted.", e);
337338
cancelReads(futures.keySet());
338-
futures.clear();
339+
clearFuturesAndService();
339340
break;
340341
}
341342
}
@@ -429,6 +430,20 @@ private static void cancelReads(Collection<Future<BlockReadStats>> futures) {
429430
}
430431
}
431432

433+
// remove all stale futures from readService, and clear futures.
434+
private void clearFuturesAndService() {
435+
while (!futures.isEmpty()) {
436+
try {
437+
Future<BlockReadStats> future = readService.poll(
438+
stripedReadTimeoutInMills, TimeUnit.MILLISECONDS
439+
);
440+
futures.remove(future);
441+
} catch (InterruptedException e) {
442+
LOG.info("Clear stale futures from service is interrupted.", e);
443+
}
444+
}
445+
}
446+
432447
void close() {
433448
if (zeroStripeBuffers != null) {
434449
for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
@@ -438,9 +453,9 @@ void close() {
438453
zeroStripeBuffers = null;
439454

440455
for (StripedBlockReader reader : readers) {
456+
reader.closeBlockReader();
441457
reconstructor.freeBuffer(reader.getReadBuffer());
442458
reader.freeReadBuffer();
443-
reader.closeBlockReader();
444459
}
445460
}
446461

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
1919

20+
import com.google.common.annotations.VisibleForTesting;
2021
import org.apache.hadoop.classification.InterfaceAudience;
2122
import org.apache.hadoop.conf.Configuration;
2223
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -279,4 +280,9 @@ DataNode getDatanode() {
279280
public ErasureCodingWorker getErasureCodingWorker() {
280281
return erasureCodingWorker;
281282
}
283+
284+
@VisibleForTesting
285+
static ByteBufferPool getBufferPool() {
286+
return BUFFER_POOL;
287+
}
282288
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.File;
2525
import java.io.IOException;
26+
import java.nio.ByteBuffer;
2627
import java.util.ArrayList;
2728
import java.util.Arrays;
2829
import java.util.BitSet;
@@ -34,6 +35,12 @@
3435
import java.util.concurrent.BrokenBarrierException;
3536
import java.util.concurrent.CyclicBarrier;
3637

38+
import java.util.concurrent.TimeoutException;
39+
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
42+
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
43+
import org.apache.hadoop.io.ElasticByteBufferPool;
3744
import org.slf4j.Logger;
3845
import org.slf4j.LoggerFactory;
3946
import org.apache.hadoop.conf.Configuration;
@@ -575,4 +582,237 @@ public void stripedBlockReconstruction() throws IOException {
575582
}
576583
}
577584
}
585+
586+
/**
587+
* When the StripedBlockReader timeout, the outdated future should be ignored.
588+
* Or the NPE will be thrown, which will stop reading the remaining data, and
589+
* the reconstruction task will fail.
590+
*/
591+
@Test(timeout = 120000)
592+
public void testTimeoutReadBlockInReconstruction() throws Exception {
593+
assumeTrue("Ignore case where num parity units <= 1",
594+
ecPolicy.getNumParityUnits() > 1);
595+
int stripedBufferSize = conf.getInt(
596+
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
597+
cellSize);
598+
ErasureCodingPolicy policy = ecPolicy;
599+
fs.enableErasureCodingPolicy(policy.getName());
600+
fs.getClient().setErasureCodingPolicy("/", policy.getName());
601+
602+
// StripedBlockReconstructor#reconstruct will loop 2 times
603+
final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
604+
String fileName = "/timeout-read-block";
605+
Path file = new Path(fileName);
606+
writeFile(fs, fileName, fileLen);
607+
fs.getFileBlockLocations(file, 0, fileLen);
608+
609+
LocatedBlocks locatedBlocks =
610+
StripedFileTestUtil.getLocatedBlocks(file, fs);
611+
Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
612+
// The file only has one block group
613+
LocatedBlock lblock = locatedBlocks.get(0);
614+
DatanodeInfo[] datanodeinfos = lblock.getLocations();
615+
616+
// to reconstruct first block
617+
DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
618+
619+
int stripedReadTimeoutInMills = conf.getInt(
620+
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
621+
DFSConfigKeys.
622+
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
623+
Assert.assertTrue(
624+
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
625+
+ " must be greater than 2000",
626+
stripedReadTimeoutInMills > 2000);
627+
628+
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
629+
DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
630+
private AtomicInteger numDelayReader = new AtomicInteger(0);
631+
632+
@Override
633+
public void delayBlockReader() {
634+
int index = numDelayReader.incrementAndGet();
635+
LOG.info("Delay the {}th read block", index);
636+
637+
// the file's first StripedBlockReconstructor#reconstruct,
638+
// and the first reader will timeout
639+
if (index == 1) {
640+
try {
641+
GenericTestUtils.waitFor(() -> numDelayReader.get() >=
642+
ecPolicy.getNumDataUnits() + 1, 50,
643+
stripedReadTimeoutInMills * 3
644+
);
645+
} catch (TimeoutException e) {
646+
Assert.fail("Can't reconstruct the file's first part.");
647+
} catch (InterruptedException e) {
648+
}
649+
}
650+
// stop all the following re-reconstruction tasks
651+
if (index > 3 * ecPolicy.getNumDataUnits() + 1) {
652+
while (true) {
653+
try {
654+
Thread.sleep(1000);
655+
} catch (InterruptedException e) {
656+
}
657+
}
658+
}
659+
}
660+
};
661+
DataNodeFaultInjector.set(timeoutInjector);
662+
663+
try {
664+
shutdownDataNode(dataNode);
665+
// before HDFS-15240, NPE will cause reconstruction fail(test timeout)
666+
StripedFileTestUtil
667+
.waitForReconstructionFinished(file, fs, groupSize);
668+
} finally {
669+
DataNodeFaultInjector.set(oldInjector);
670+
}
671+
}
672+
673+
/**
674+
* When block reader timeout, the outdated future should be ignored.
675+
* Or the ByteBuffer would be wrote after giving back to the BufferPool.
676+
* This UT is used to ensure that we should close block reader
677+
* before freeing the buffer.
678+
*/
679+
@Test(timeout = 120000)
680+
public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
681+
assumeTrue("Ignore case where num parity units <= 1",
682+
ecPolicy.getNumParityUnits() > 1);
683+
int stripedBufferSize = conf.getInt(
684+
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
685+
cellSize);
686+
// StripedBlockReconstructor#reconstruct will loop 2 times
687+
final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
688+
String fileName = "/no-dirty-buffer";
689+
Path file = new Path(fileName);
690+
writeFile(fs, fileName, fileLen);
691+
fs.getFileBlockLocations(file, 0, fileLen);
692+
693+
LocatedBlocks locatedBlocks =
694+
StripedFileTestUtil.getLocatedBlocks(file, fs);
695+
Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
696+
// The file only has one block group
697+
LocatedBlock lblock = locatedBlocks.get(0);
698+
DatanodeInfo[] datanodeinfos = lblock.getLocations();
699+
700+
// to reconstruct first block
701+
DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
702+
703+
int stripedReadTimeoutInMills = conf.getInt(
704+
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
705+
DFSConfigKeys.
706+
DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
707+
Assert.assertTrue(
708+
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
709+
+ " must be greater than 2000",
710+
stripedReadTimeoutInMills > 2000);
711+
712+
ElasticByteBufferPool bufferPool =
713+
(ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
714+
emptyBufferPool(bufferPool, true);
715+
emptyBufferPool(bufferPool, false);
716+
717+
AtomicInteger finishedReadBlock = new AtomicInteger(0);
718+
719+
DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
720+
DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
721+
private AtomicInteger numDelayReader = new AtomicInteger(0);
722+
private AtomicBoolean continueRead = new AtomicBoolean(false);
723+
private AtomicBoolean closeByNPE = new AtomicBoolean(false);
724+
725+
@Override
726+
public void delayBlockReader() {
727+
int index = numDelayReader.incrementAndGet();
728+
LOG.info("Delay the {}th read block", index);
729+
730+
// the file's first StripedBlockReconstructor#reconstruct,
731+
// and the first reader will timeout
732+
if (index == 1) {
733+
try {
734+
GenericTestUtils.waitFor(() -> numDelayReader.get() >=
735+
ecPolicy.getNumDataUnits() + 1, 50,
736+
stripedReadTimeoutInMills * 3
737+
);
738+
} catch (TimeoutException e) {
739+
Assert.fail("Can't reconstruct the file's first part.");
740+
} catch (InterruptedException e) {
741+
}
742+
}
743+
if (index > ecPolicy.getNumDataUnits() + 1) {
744+
try {
745+
GenericTestUtils.waitFor(
746+
() -> {
747+
LOG.info("Close by NPE: {}, continue read: {}",
748+
closeByNPE, continueRead);
749+
return closeByNPE.get() ? continueRead.get()
750+
: index == finishedReadBlock.get() + 1; }, 5,
751+
stripedReadTimeoutInMills * 3
752+
);
753+
} catch (TimeoutException e) {
754+
Assert.fail("Can't reconstruct the file's remaining part.");
755+
} catch (InterruptedException e) {
756+
}
757+
}
758+
}
759+
760+
@Override
761+
public void interceptBlockReader() {
762+
int n = finishedReadBlock.incrementAndGet();
763+
LOG.info("Intercept the end of {}th read block.", n);
764+
}
765+
766+
private AtomicInteger numFreeBuffer = new AtomicInteger(0);
767+
@Override
768+
public void interceptFreeBlockReaderBuffer() {
769+
closeByNPE.compareAndSet(false, true);
770+
int num = numFreeBuffer.incrementAndGet();
771+
LOG.info("Intercept the {} free block buffer.", num);
772+
if (num >= ecPolicy.getNumDataUnits() + 1) {
773+
continueRead.compareAndSet(false, true);
774+
try {
775+
GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
776+
2 * ecPolicy.getNumDataUnits() + 1, 50,
777+
stripedReadTimeoutInMills * 3
778+
);
779+
} catch (TimeoutException e) {
780+
Assert.fail("Can't finish the file's reconstruction.");
781+
} catch (InterruptedException e) {
782+
}
783+
}
784+
}
785+
};
786+
DataNodeFaultInjector.set(timeoutInjector);
787+
try {
788+
shutdownDataNode(dataNode);
789+
// at least one timeout reader
790+
GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
791+
2 * ecPolicy.getNumDataUnits() + 1, 50,
792+
stripedReadTimeoutInMills * 3
793+
);
794+
795+
assertBufferPoolIsEmpty(bufferPool, false);
796+
assertBufferPoolIsEmpty(bufferPool, true);
797+
StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
798+
} finally {
799+
DataNodeFaultInjector.set(oldInjector);
800+
}
801+
}
802+
803+
private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
804+
boolean direct) {
805+
while (bufferPool.size(direct) != 0) {
806+
// iterate all ByteBuffers in ElasticByteBufferPool
807+
ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
808+
Assert.assertEquals(0, byteBuffer.position());
809+
}
810+
}
811+
812+
private void emptyBufferPool(ElasticByteBufferPool bufferPool,
813+
boolean direct) {
814+
while (bufferPool.size(direct) != 0) {
815+
bufferPool.getBuffer(direct, 0);
816+
}
817+
}
578818
}

0 commit comments

Comments
 (0)