Skip to content

Commit b91d1b9

Browse files
committed
HDFS-17801. EC: Reading support retryCurrentNode to avoid transient errors cause application level failures.
1 parent 6eae158 commit b91d1b9

File tree

3 files changed

+156
-10
lines changed

3 files changed

+156
-10
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ protected void closeCurrentBlockReaders() {
206206
return;
207207
}
208208
for (int i = 0; i < groupSize; i++) {
209+
if (blockReaders[i] != null) {
210+
blockReaders[i].setRetryCurrentReader(false);
211+
}
209212
closeReader(blockReaders[i]);
210213
blockReaders[i] = null;
211214
}
@@ -220,7 +223,9 @@ protected void closeReader(BlockReaderInfo readerInfo) {
220223
} catch (Throwable ignored) {
221224
}
222225
}
223-
readerInfo.skip();
226+
if (!readerInfo.isRetryCurrentReader()) {
227+
readerInfo.skip();
228+
}
224229
}
225230
}
226231

@@ -517,6 +522,9 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
517522
buf.position(buf.position() + (int)(end - start + 1));
518523
} finally {
519524
for (BlockReaderInfo preaderInfo : preaderInfos) {
525+
if (preaderInfo != null) {
526+
preaderInfo.setRetryCurrentReader(false);
527+
}
520528
closeReader(preaderInfo);
521529
}
522530
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import java.nio.ByteBuffer;
3939
import java.util.Arrays;
4040
import java.util.HashMap;
41+
import java.util.HashSet;
4142
import java.util.Map;
43+
import java.util.Set;
4244
import java.util.concurrent.Callable;
4345
import java.util.concurrent.CompletionService;
4446
import java.util.concurrent.ExecutorCompletionService;
@@ -90,6 +92,13 @@ static class BlockReaderInfo {
9092
* using it for the next stripe.
9193
*/
9294
boolean shouldSkip = false;
95+
/**
96+
* We use this field to indicate whether we should retry the current reader before
97+
* we mark current reader skipped. possibly retry the same node so that transient errors don't
98+
* result in application level failures (e.g. Datanode could have closed the connection
99+
* because the client is idle for too long).
100+
*/
101+
boolean retryCurrentReader = true;
93102

94103
BlockReaderInfo(BlockReader reader, DatanodeInfo dn, long offset) {
95104
this.reader = reader;
@@ -104,6 +113,14 @@ void setOffset(long offset) {
104113
void skip() {
105114
this.shouldSkip = true;
106115
}
116+
117+
public boolean isRetryCurrentReader() {
118+
return retryCurrentReader;
119+
}
120+
121+
public void setRetryCurrentReader(boolean retryCurrentReader) {
122+
this.retryCurrentReader = retryCurrentReader;
123+
}
107124
}
108125

109126
private final Map<Future<BlockReadStats>, Integer> futures =
@@ -174,11 +191,26 @@ void updateState4SuccessRead(StripingChunkReadResult result) {
174191

175192
private void checkMissingBlocks() throws IOException {
176193
if (alignedStripe.missingChunksNum > parityBlkNum) {
177-
clearFutures();
178-
throw new IOException(alignedStripe.missingChunksNum
179-
+ " missing blocks, the stripe is: " + alignedStripe
180-
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
194+
if (countOfNullReaderInfos(readerInfos) < parityBlkNum) {
195+
clearFutures();
196+
throw new IOException(alignedStripe.missingChunksNum
197+
+ " missing blocks, the stripe is: " + alignedStripe
198+
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
199+
}
200+
}
201+
}
202+
203+
private int countOfNullReaderInfos(BlockReaderInfo[] readerInfos) {
204+
if (readerInfos == null) {
205+
return 0;
181206
}
207+
int count = 0;
208+
for (int i = 0; i < readerInfos.length; i++) {
209+
if (readerInfos[i] == null) {
210+
count++;
211+
}
212+
}
213+
return count;
182214
}
183215

184216
/**
@@ -187,6 +219,23 @@ private void checkMissingBlocks() throws IOException {
187219
*/
188220
private void readDataForDecoding() throws IOException {
189221
prepareDecodeInputs();
222+
223+
if (alignedStripe.missingChunksNum > parityBlkNum) {
224+
Set<Integer> recoveredIndexes = new HashSet<>();
225+
if (countOfNullReaderInfos(readerInfos) >= parityBlkNum) {
226+
for (int index = 0; index < dataBlkNum + parityBlkNum; index++) {
227+
if (readerInfos[index] == null) {
228+
alignedStripe.chunks[index].state = StripingChunk.REQUESTED;
229+
recoveredIndexes.add(index);
230+
}
231+
}
232+
}
233+
234+
for (int recoveredIndex : recoveredIndexes) {
235+
alignedStripe.missingChunksNum--;
236+
}
237+
}
238+
190239
for (int i = 0; i < dataBlkNum; i++) {
191240
Preconditions.checkNotNull(alignedStripe.chunks[i]);
192241
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
@@ -332,7 +381,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
332381
}
333382

334383
/**
335-
* read the whole stripe. do decoding if necessary
384+
* Read the whole stripe. do decoding if necessary.
336385
*/
337386
void readStripe() throws IOException {
338387
try {
@@ -349,7 +398,7 @@ void readStripe() throws IOException {
349398
if (alignedStripe.missingChunksNum > 0) {
350399
checkMissingBlocks();
351400
readDataForDecoding();
352-
// read parity chunks
401+
// Read parity chunks.
353402
readParityChunks(alignedStripe.missingChunksNum);
354403
}
355404
} catch (IOException e) {
@@ -359,7 +408,7 @@ void readStripe() throws IOException {
359408
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
360409

361410
// Input buffers for potential decode operation, which remains null until
362-
// first read failure
411+
// first read failure.
363412
while (!futures.isEmpty()) {
364413
try {
365414
long beginReadMS = Time.monotonicNow();
@@ -384,8 +433,12 @@ void readStripe() throws IOException {
384433
}
385434
} else {
386435
returnedChunk.state = StripingChunk.MISSING;
387-
// close the corresponding reader
436+
// Close the corresponding reader.
388437
dfsStripedInputStream.closeReader(readerInfos[r.index]);
438+
if (readerInfos[r.index].isRetryCurrentReader()) {
439+
readerInfos[r.index].setRetryCurrentReader(false);
440+
readerInfos[r.index] = null;
441+
}
389442

390443
final int missing = alignedStripe.missingChunksNum;
391444
alignedStripe.missingChunksNum++;
@@ -399,7 +452,7 @@ void readStripe() throws IOException {
399452
DFSClient.LOG.error(err, ie);
400453
dfsStripedInputStream.close();
401454
clearFutures();
402-
// Don't decode if read interrupted
455+
// Don't decode if read interrupted.
403456
throw new InterruptedIOException(err);
404457
}
405458
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import org.apache.hadoop.fs.FSDataOutputStream;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -50,6 +51,7 @@
5051
import java.util.Arrays;
5152
import java.util.Collections;
5253
import java.util.List;
54+
import java.util.Random;
5355

5456
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
5557
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -735,4 +737,87 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
735737
assertEquals(rangesExpected, ranges);
736738
}
737739

740+
@Test
741+
public void testStatefulReadAfterLongTimeIdle() throws Exception {
742+
HdfsConfiguration hdfsConf = new HdfsConfiguration();
743+
hdfsConf.setInt("dfs.datanode.socket.write.timeout", 10000);
744+
hdfsConf.setInt("dfs.client.socket-timeout", 10000);
745+
String testBaseDir = "/testECRead";
746+
String testfileName = "testfile";
747+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
748+
.numDataNodes(9).build()) {
749+
cluster.waitActive();
750+
final DistributedFileSystem dfs = cluster.getFileSystem();
751+
Path dir = new Path(testBaseDir);
752+
dfs.mkdirs(dir);
753+
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
754+
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
755+
756+
757+
// Write 30MB + 1 data.
758+
int writeBufSize = 30 * 1024 * 1024 + 1;
759+
byte[] writeBuf = new byte[writeBufSize];
760+
try (FSDataOutputStream fsdos = dfs.create(
761+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
762+
Random random = new Random();
763+
random.nextBytes(writeBuf);
764+
fsdos.write(writeBuf, 0, writeBuf.length);
765+
Thread.sleep(2000);
766+
}
767+
768+
byte[] readBuf = new byte[6 * 1024 * 1024];
769+
try (FSDataInputStream fsdis = dfs.open(
770+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
771+
fsdis.read(readBuf);
772+
Thread.sleep(21 * 1000);
773+
while ((fsdis.read(readBuf)) > 0) {
774+
Thread.sleep(21 * 1000);
775+
}
776+
}
777+
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
778+
}
779+
}
780+
781+
@Test
782+
public void testPReadAfterLongTimeIdle() throws Exception {
783+
HdfsConfiguration hdfsConf = new HdfsConfiguration();
784+
hdfsConf.setInt("dfs.datanode.socket.write.timeout", 10000);
785+
hdfsConf.setInt("dfs.client.socket-timeout", 10000);
786+
String testBaseDir = "/testECRead";
787+
String testfileName = "testfile";
788+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
789+
.numDataNodes(9).build()) {
790+
cluster.waitActive();
791+
final DistributedFileSystem dfs = cluster.getFileSystem();
792+
Path dir = new Path(testBaseDir);
793+
dfs.mkdirs(dir);
794+
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
795+
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
796+
797+
// Write 30MB + 1 data.
798+
int writeBufSize = 30 * 1024 * 1024 + 1;
799+
byte[] writeBuf = new byte[writeBufSize];
800+
try (FSDataOutputStream fsdos = dfs.create(
801+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
802+
Random random = new Random();
803+
random.nextBytes(writeBuf);
804+
fsdos.write(writeBuf, 0, writeBuf.length);
805+
Thread.sleep(2000);
806+
}
807+
808+
byte[] readBuf = new byte[6 * 1024 * 1024];
809+
try (FSDataInputStream fsdis = dfs.open(
810+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
811+
int curPos = 0;
812+
int readLen = fsdis.read(curPos, readBuf, 0, readBuf.length);
813+
curPos += readLen;
814+
Thread.sleep(21 * 1000);
815+
while ((readLen = fsdis.read(curPos, readBuf, 0, readBuf.length)) > 0) {
816+
curPos += readLen;
817+
Thread.sleep(21 * 1000);
818+
}
819+
}
820+
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
821+
}
822+
}
738823
}

0 commit comments

Comments
 (0)