Skip to content

Commit f52be54

Browse files
committed
HDFS-17801. EC: Reading support retryCurrentNode to avoid transient errors cause application level failures.
1 parent 6eae158 commit f52be54

File tree

3 files changed

+156
-14
lines changed

3 files changed

+156
-14
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,13 @@ public class DFSStripedInputStream extends DFSInputStream {
9393
*/
9494
private final Set<String> warnedNodes =
9595
Collections.newSetFromMap(new ConcurrentHashMap<>());
96+
/**
97+
* We use this field to indicate whether we should retry the corresponding reader before
98+
* we mark it skipped. possibly retry the same node so that transient errors don't
99+
* result in application level failures (e.g. Datanode could have closed the connection
100+
* because the client is idle for too long).
101+
*/
102+
private boolean[] retryCurrentReaderFlags;
96103

97104
DFSStripedInputStream(DFSClient dfsClient, String src,
98105
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
@@ -112,6 +119,8 @@ public class DFSStripedInputStream extends DFSInputStream {
112119
dataBlkNum, parityBlkNum);
113120
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
114121
ecPolicy.getCodecName(), coderOptions);
122+
retryCurrentReaderFlags = new boolean[groupSize];
123+
Arrays.fill(retryCurrentReaderFlags, true);
115124
DFSClient.LOG.debug("Creating an striped input stream for file {}", src);
116125
}
117126

@@ -206,21 +215,24 @@ protected void closeCurrentBlockReaders() {
206215
return;
207216
}
208217
for (int i = 0; i < groupSize; i++) {
209-
closeReader(blockReaders[i]);
218+
retryCurrentReaderFlags[i] = false;
219+
closeReader(blockReaders[i], i);
210220
blockReaders[i] = null;
211221
}
212222
blockEnd = -1;
213223
}
214224

215-
protected void closeReader(BlockReaderInfo readerInfo) {
225+
protected void closeReader(BlockReaderInfo readerInfo, int readerIndex) {
216226
if (readerInfo != null) {
217227
if (readerInfo.reader != null) {
218228
try {
219229
readerInfo.reader.close();
220230
} catch (Throwable ignored) {
221231
}
222232
}
223-
readerInfo.skip();
233+
if (!retryCurrentReaderFlags[readerIndex]) {
234+
readerInfo.skip();
235+
}
224236
}
225237
}
226238

@@ -516,8 +528,11 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
516528
}
517529
buf.position(buf.position() + (int)(end - start + 1));
518530
} finally {
531+
int index = 0;
519532
for (BlockReaderInfo preaderInfo : preaderInfos) {
520-
closeReader(preaderInfo);
533+
retryCurrentReaderFlags[index] = false;
534+
closeReader(preaderInfo, index);
535+
index++;
521536
}
522537
}
523538
}
@@ -573,4 +588,7 @@ public synchronized void unbuffer() {
573588
}
574589
}
575590

591+
public boolean[] getRetryCurrentReaderFlags() {
592+
return retryCurrentReaderFlags;
593+
}
576594
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import java.nio.ByteBuffer;
3939
import java.util.Arrays;
4040
import java.util.HashMap;
41+
import java.util.HashSet;
4142
import java.util.Map;
43+
import java.util.Set;
4244
import java.util.concurrent.Callable;
4345
import java.util.concurrent.CompletionService;
4446
import java.util.concurrent.ExecutorCompletionService;
@@ -174,11 +176,26 @@ void updateState4SuccessRead(StripingChunkReadResult result) {
174176

175177
private void checkMissingBlocks() throws IOException {
176178
if (alignedStripe.missingChunksNum > parityBlkNum) {
177-
clearFutures();
178-
throw new IOException(alignedStripe.missingChunksNum
179-
+ " missing blocks, the stripe is: " + alignedStripe
180-
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
179+
if (countOfNullReaderInfos(readerInfos) < parityBlkNum) {
180+
clearFutures();
181+
throw new IOException(alignedStripe.missingChunksNum
182+
+ " missing blocks, the stripe is: " + alignedStripe
183+
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
184+
}
185+
}
186+
}
187+
188+
private int countOfNullReaderInfos(BlockReaderInfo[] readerInfos) {
189+
if (readerInfos == null) {
190+
return 0;
191+
}
192+
int count = 0;
193+
for (int i = 0; i < readerInfos.length; i++) {
194+
if (readerInfos[i] == null) {
195+
count++;
196+
}
181197
}
198+
return count;
182199
}
183200

184201
/**
@@ -187,6 +204,23 @@ private void checkMissingBlocks() throws IOException {
187204
*/
188205
private void readDataForDecoding() throws IOException {
189206
prepareDecodeInputs();
207+
208+
if (alignedStripe.missingChunksNum > parityBlkNum) {
209+
Set<Integer> recoveredIndexes = new HashSet<>();
210+
if (countOfNullReaderInfos(readerInfos) >= parityBlkNum) {
211+
for (int index = 0; index < dataBlkNum + parityBlkNum; index++) {
212+
if (readerInfos[index] == null) {
213+
alignedStripe.chunks[index].state = StripingChunk.REQUESTED;
214+
recoveredIndexes.add(index);
215+
}
216+
}
217+
}
218+
219+
for (int recoveredIndex : recoveredIndexes) {
220+
alignedStripe.missingChunksNum--;
221+
}
222+
}
223+
190224
for (int i = 0; i < dataBlkNum; i++) {
191225
Preconditions.checkNotNull(alignedStripe.chunks[i]);
192226
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
@@ -332,7 +366,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
332366
}
333367

334368
/**
335-
* read the whole stripe. do decoding if necessary
369+
* Read the whole stripe. do decoding if necessary.
336370
*/
337371
void readStripe() throws IOException {
338372
try {
@@ -349,7 +383,7 @@ void readStripe() throws IOException {
349383
if (alignedStripe.missingChunksNum > 0) {
350384
checkMissingBlocks();
351385
readDataForDecoding();
352-
// read parity chunks
386+
// Read parity chunks.
353387
readParityChunks(alignedStripe.missingChunksNum);
354388
}
355389
} catch (IOException e) {
@@ -359,7 +393,7 @@ void readStripe() throws IOException {
359393
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
360394

361395
// Input buffers for potential decode operation, which remains null until
362-
// first read failure
396+
// first read failure.
363397
while (!futures.isEmpty()) {
364398
try {
365399
long beginReadMS = Time.monotonicNow();
@@ -378,14 +412,20 @@ void readStripe() throws IOException {
378412
returnedChunk.state = StripingChunk.FETCHED;
379413
alignedStripe.fetchedChunksNum++;
380414
updateState4SuccessRead(r);
415+
dfsStripedInputStream.getRetryCurrentReaderFlags()[r.index] = true;
381416
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
382417
clearFutures();
383418
break;
384419
}
385420
} else {
386421
returnedChunk.state = StripingChunk.MISSING;
387-
// close the corresponding reader
388-
dfsStripedInputStream.closeReader(readerInfos[r.index]);
422+
// Close the corresponding reader.
423+
dfsStripedInputStream.closeReader(readerInfos[r.index], r.index);
424+
boolean originalRetryFlag = dfsStripedInputStream.getRetryCurrentReaderFlags()[r.index];
425+
if (originalRetryFlag) {
426+
dfsStripedInputStream.getRetryCurrentReaderFlags()[r.index] = false;
427+
readerInfos[r.index] = null;
428+
}
389429

390430
final int missing = alignedStripe.missingChunksNum;
391431
alignedStripe.missingChunksNum++;
@@ -399,7 +439,7 @@ void readStripe() throws IOException {
399439
DFSClient.LOG.error(err, ie);
400440
dfsStripedInputStream.close();
401441
clearFutures();
402-
// Don't decode if read interrupted
442+
// Don't decode if read interrupted.
403443
throw new InterruptedIOException(err);
404444
}
405445
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import org.apache.hadoop.fs.FSDataOutputStream;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -50,6 +51,7 @@
5051
import java.util.Arrays;
5152
import java.util.Collections;
5253
import java.util.List;
54+
import java.util.Random;
5355

5456
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
5557
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -735,4 +737,86 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
735737
assertEquals(rangesExpected, ranges);
736738
}
737739

740+
@Test
741+
public void testStatefulReadAfterLongTimeIdle() throws Exception {
742+
HdfsConfiguration hdfsConf = new HdfsConfiguration();
743+
hdfsConf.setInt("dfs.datanode.socket.write.timeout", 5000);
744+
hdfsConf.setInt("dfs.client.socket-timeout", 5000);
745+
String testBaseDir = "/testECRead";
746+
String testfileName = "testfile";
747+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
748+
.numDataNodes(9).build()) {
749+
cluster.waitActive();
750+
final DistributedFileSystem dfs = cluster.getFileSystem();
751+
Path dir = new Path(testBaseDir);
752+
assertTrue(dfs.mkdirs(dir));
753+
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
754+
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
755+
assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());
756+
757+
int writeBufSize = 30 * 1024 * 1024 + 1;
758+
byte[] writeBuf = new byte[writeBufSize];
759+
try (FSDataOutputStream fsdos = dfs.create(
760+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
761+
Random random = new Random();
762+
random.nextBytes(writeBuf);
763+
fsdos.write(writeBuf, 0, writeBuf.length);
764+
Thread.sleep(2000);
765+
}
766+
767+
byte[] readBuf = new byte[6 * 1024 * 1024];
768+
try (FSDataInputStream fsdis = dfs.open(
769+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
770+
fsdis.read(readBuf);
771+
Thread.sleep(6 * 1000);
772+
while ((fsdis.read(readBuf)) > 0) {
773+
Thread.sleep(6 * 1000);
774+
}
775+
}
776+
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
777+
}
778+
}
779+
780+
@Test
781+
public void testPReadAfterLongTimeIdle() throws Exception {
782+
HdfsConfiguration hdfsConf = new HdfsConfiguration();
783+
hdfsConf.setInt("dfs.datanode.socket.write.timeout", 5000);
784+
hdfsConf.setInt("dfs.client.socket-timeout", 5000);
785+
String testBaseDir = "/testECRead";
786+
String testfileName = "testfile";
787+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
788+
.numDataNodes(9).build()) {
789+
cluster.waitActive();
790+
final DistributedFileSystem dfs = cluster.getFileSystem();
791+
Path dir = new Path(testBaseDir);
792+
assertTrue(dfs.mkdirs(dir));
793+
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
794+
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
795+
assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());
796+
797+
int writeBufSize = 30 * 1024 * 1024 + 1;
798+
byte[] writeBuf = new byte[writeBufSize];
799+
try (FSDataOutputStream fsdos = dfs.create(
800+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
801+
Random random = new Random();
802+
random.nextBytes(writeBuf);
803+
fsdos.write(writeBuf, 0, writeBuf.length);
804+
Thread.sleep(2000);
805+
}
806+
807+
byte[] readBuf = new byte[6 * 1024 * 1024];
808+
try (FSDataInputStream fsdis = dfs.open(
809+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
810+
int curPos = 0;
811+
int readLen = fsdis.read(curPos, readBuf, 0, readBuf.length);
812+
curPos += readLen;
813+
Thread.sleep(6 * 1000);
814+
while ((readLen = fsdis.read(curPos, readBuf, 0, readBuf.length)) > 0) {
815+
curPos += readLen;
816+
Thread.sleep(6 * 1000);
817+
}
818+
}
819+
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
820+
}
821+
}
738822
}

0 commit comments

Comments
 (0)