diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d6131f8ddeb54..192d3ace8c3a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -93,6 +93,13 @@ public class DFSStripedInputStream extends DFSInputStream { */ private final Set warnedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + /** + * We use this field to indicate whether we should retry the corresponding reader before + * we mark it skipped. possibly retry the same node so that transient errors don't + * result in application level failures (e.g. Datanode could have closed the connection + * because the client is idle for too long). + */ + private boolean[] retryCurrentReaderFlags; DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ErasureCodingPolicy ecPolicy, @@ -112,6 +119,8 @@ public class DFSStripedInputStream extends DFSInputStream { dataBlkNum, parityBlkNum); decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(), ecPolicy.getCodecName(), coderOptions); + retryCurrentReaderFlags = new boolean[groupSize]; + Arrays.fill(retryCurrentReaderFlags, true); DFSClient.LOG.debug("Creating an striped input stream for file {}", src); } @@ -206,13 +215,14 @@ protected void closeCurrentBlockReaders() { return; } for (int i = 0; i < groupSize; i++) { - closeReader(blockReaders[i]); + retryCurrentReaderFlags[i] = false; + closeReader(blockReaders[i], i); blockReaders[i] = null; } blockEnd = -1; } - protected void closeReader(BlockReaderInfo readerInfo) { + protected void closeReader(BlockReaderInfo readerInfo, int readerIndex) { if (readerInfo != null) { if (readerInfo.reader != null) { try { @@ -220,7 +230,9 @@ protected void closeReader(BlockReaderInfo readerInfo) { } catch (Throwable ignored) { } } - readerInfo.skip(); + if (!retryCurrentReaderFlags[readerIndex]) { + readerInfo.skip(); + } } } @@ -516,8 +528,11 @@ protected void fetchBlockByteRange(LocatedBlock block, long start, } buf.position(buf.position() + (int)(end - start + 1)); } finally { + int index = 0; for (BlockReaderInfo preaderInfo : preaderInfos) { - closeReader(preaderInfo); + retryCurrentReaderFlags[index] = false; + closeReader(preaderInfo, index); + index++; } } } @@ -573,4 +588,11 @@ public synchronized void unbuffer() { } } + public boolean getRetryCurrentReaderFlags(int index) { + return retryCurrentReaderFlags[index]; + } + + public void setRetryCurrentReaderFlags(int index, boolean retry) { + this.retryCurrentReaderFlags[index] = retry; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index bc39bace79588..e6dfaa7faa182 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -174,11 +174,26 @@ void updateState4SuccessRead(StripingChunkReadResult result) { private void checkMissingBlocks() throws IOException { if (alignedStripe.missingChunksNum > parityBlkNum) { - clearFutures(); - throw new IOException(alignedStripe.missingChunksNum - + " missing blocks, the stripe is: " + alignedStripe - + "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks()); + if (countOfNullReaderInfos(readerInfos) < parityBlkNum) { + clearFutures(); + throw new IOException(alignedStripe.missingChunksNum + + " missing blocks, the stripe is: " + alignedStripe + + "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks()); + } + } + } + + private int countOfNullReaderInfos(BlockReaderInfo[] blockreaderInfos) { + if (blockreaderInfos == null) { + return 0; } + int count = 0; + for (int i = 0; i < blockreaderInfos.length; i++) { + if (blockreaderInfos[i] == null) { + count++; + } + } + return count; } /** @@ -187,6 +202,16 @@ private void checkMissingBlocks() throws IOException { */ private void readDataForDecoding() throws IOException { prepareDecodeInputs(); + + if (alignedStripe.missingChunksNum > parityBlkNum) { + for (int index = 0; index < dataBlkNum; index++) { + if (readerInfos[index] == null) { + alignedStripe.chunks[index].state = StripingChunk.REQUESTED; + alignedStripe.missingChunksNum--; + } + } + } + for (int i = 0; i < dataBlkNum; i++) { Preconditions.checkNotNull(alignedStripe.chunks[i]); if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) { @@ -199,6 +224,17 @@ private void readDataForDecoding() throws IOException { } void readParityChunks(int num) throws IOException { + if (alignedStripe.missingChunksNum > parityBlkNum) { + for (int index = dataBlkNum; index < dataBlkNum + parityBlkNum; index++) { + if (readerInfos[index] == null) { + if (alignedStripe.chunks[index] != null) { + alignedStripe.chunks[index].state = StripingChunk.REQUESTED; + alignedStripe.missingChunksNum--; + } + } + } + } + for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num; i++) { if (alignedStripe.chunks[i] == null) { @@ -332,7 +368,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex) } /** - * read the whole stripe. do decoding if necessary + * Read the whole stripe. do decoding if necessary. */ void readStripe() throws IOException { try { @@ -349,7 +385,7 @@ void readStripe() throws IOException { if (alignedStripe.missingChunksNum > 0) { checkMissingBlocks(); readDataForDecoding(); - // read parity chunks + // Read parity chunks. readParityChunks(alignedStripe.missingChunksNum); } } catch (IOException e) { @@ -359,7 +395,7 @@ void readStripe() throws IOException { // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks // Input buffers for potential decode operation, which remains null until - // first read failure + // first read failure. while (!futures.isEmpty()) { try { long beginReadMS = Time.monotonicNow(); @@ -378,14 +414,20 @@ void readStripe() throws IOException { returnedChunk.state = StripingChunk.FETCHED; alignedStripe.fetchedChunksNum++; updateState4SuccessRead(r); + dfsStripedInputStream.setRetryCurrentReaderFlags(r.index, true); if (alignedStripe.fetchedChunksNum == dataBlkNum) { clearFutures(); break; } } else { returnedChunk.state = StripingChunk.MISSING; - // close the corresponding reader - dfsStripedInputStream.closeReader(readerInfos[r.index]); + // Close the corresponding reader. + dfsStripedInputStream.closeReader(readerInfos[r.index], r.index); + boolean originalRetryFlag = dfsStripedInputStream.getRetryCurrentReaderFlags(r.index); + if (originalRetryFlag) { + dfsStripedInputStream.setRetryCurrentReaderFlags(r.index, false); + readerInfos[r.index] = null; + } final int missing = alignedStripe.missingChunksNum; alignedStripe.missingChunksNum++; @@ -399,7 +441,7 @@ void readStripe() throws IOException { DFSClient.LOG.error(err, ie); dfsStripedInputStream.close(); clearFutures(); - // Don't decode if read interrupted + // Don't decode if read interrupted. throw new InterruptedIOException(err); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index f9646e9ee162a..90b460ff5ffee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import org.apache.hadoop.fs.FSDataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -50,6 +51,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Random; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; @@ -735,4 +737,86 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex, assertEquals(rangesExpected, ranges); } + @Test + public void testStatefulReadAfterLongTimeIdle() throws Exception { + HdfsConfiguration hdfsConf = new HdfsConfiguration(); + hdfsConf.setInt("dfs.datanode.socket.write.timeout", 5000); + hdfsConf.setInt("dfs.client.socket-timeout", 5000); + String testBaseDir = "/testECRead"; + String testfileName = "testfile"; + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(9).build()) { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + Path dir = new Path(testBaseDir); + assertTrue(dfs.mkdirs(dir)); + dfs.enableErasureCodingPolicy("RS-6-3-1024k"); + dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k"); + assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName()); + + int writeBufSize = 30 * 1024 * 1024 + 1; + byte[] writeBuf = new byte[writeBufSize]; + try (FSDataOutputStream fsdos = dfs.create( + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { + Random random = new Random(); + random.nextBytes(writeBuf); + fsdos.write(writeBuf, 0, writeBuf.length); + Thread.sleep(2000); + } + + byte[] readBuf = new byte[6 * 1024 * 1024]; + try (FSDataInputStream fsdis = dfs.open( + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { + fsdis.read(readBuf); + Thread.sleep(6 * 1000); + while ((fsdis.read(readBuf)) > 0) { + Thread.sleep(6 * 1000); + } + } + assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true)); + } + } + + @Test + public void testPReadAfterLongTimeIdle() throws Exception { + HdfsConfiguration hdfsConf = new HdfsConfiguration(); + hdfsConf.setInt("dfs.datanode.socket.write.timeout", 5000); + hdfsConf.setInt("dfs.client.socket-timeout", 5000); + String testBaseDir = "/testECRead"; + String testfileName = "testfile"; + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(9).build()) { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + Path dir = new Path(testBaseDir); + assertTrue(dfs.mkdirs(dir)); + dfs.enableErasureCodingPolicy("RS-6-3-1024k"); + dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k"); + assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName()); + + int writeBufSize = 30 * 1024 * 1024 + 1; + byte[] writeBuf = new byte[writeBufSize]; + try (FSDataOutputStream fsdos = dfs.create( + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { + Random random = new Random(); + random.nextBytes(writeBuf); + fsdos.write(writeBuf, 0, writeBuf.length); + Thread.sleep(2000); + } + + byte[] readBuf = new byte[6 * 1024 * 1024]; + try (FSDataInputStream fsdis = dfs.open( + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { + int curPos = 0; + int readLen = fsdis.read(curPos, readBuf, 0, readBuf.length); + curPos += readLen; + Thread.sleep(6 * 1000); + while ((readLen = fsdis.read(curPos, readBuf, 0, readBuf.length)) > 0) { + curPos += readLen; + Thread.sleep(6 * 1000); + } + } + assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true)); + } + } }