Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ public class DFSStripedInputStream extends DFSInputStream {
*/
private final Set<String> warnedNodes =
Collections.newSetFromMap(new ConcurrentHashMap<>());
/**
* We use this field to indicate whether we should retry the corresponding reader before
* we mark it skipped. possibly retry the same node so that transient errors don't
* result in application level failures (e.g. Datanode could have closed the connection
* because the client is idle for too long).
*/
private boolean[] retryCurrentReaderFlags;

DFSStripedInputStream(DFSClient dfsClient, String src,
boolean verifyChecksum, ErasureCodingPolicy ecPolicy,
Expand All @@ -112,6 +119,8 @@ public class DFSStripedInputStream extends DFSInputStream {
dataBlkNum, parityBlkNum);
decoder = CodecUtil.createRawDecoder(dfsClient.getConfiguration(),
ecPolicy.getCodecName(), coderOptions);
retryCurrentReaderFlags = new boolean[groupSize];
Arrays.fill(retryCurrentReaderFlags, true);
DFSClient.LOG.debug("Creating an striped input stream for file {}", src);
}

Expand Down Expand Up @@ -206,21 +215,24 @@ protected void closeCurrentBlockReaders() {
return;
}
for (int i = 0; i < groupSize; i++) {
closeReader(blockReaders[i]);
retryCurrentReaderFlags[i] = false;
closeReader(blockReaders[i], i);
blockReaders[i] = null;
}
blockEnd = -1;
}

protected void closeReader(BlockReaderInfo readerInfo) {
protected void closeReader(BlockReaderInfo readerInfo, int readerIndex) {
if (readerInfo != null) {
if (readerInfo.reader != null) {
try {
readerInfo.reader.close();
} catch (Throwable ignored) {
}
}
readerInfo.skip();
if (!retryCurrentReaderFlags[readerIndex]) {
readerInfo.skip();
}
}
}

Expand Down Expand Up @@ -516,8 +528,11 @@ protected void fetchBlockByteRange(LocatedBlock block, long start,
}
buf.position(buf.position() + (int)(end - start + 1));
} finally {
int index = 0;
for (BlockReaderInfo preaderInfo : preaderInfos) {
closeReader(preaderInfo);
retryCurrentReaderFlags[index] = false;
closeReader(preaderInfo, index);
index++;
}
}
}
Expand Down Expand Up @@ -573,4 +588,11 @@ public synchronized void unbuffer() {
}
}

public boolean getRetryCurrentReaderFlags(int index) {
return retryCurrentReaderFlags[index];
}

public void setRetryCurrentReaderFlags(int index, boolean retry) {
this.retryCurrentReaderFlags[index] = retry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,26 @@ void updateState4SuccessRead(StripingChunkReadResult result) {

private void checkMissingBlocks() throws IOException {
if (alignedStripe.missingChunksNum > parityBlkNum) {
clearFutures();
throw new IOException(alignedStripe.missingChunksNum
+ " missing blocks, the stripe is: " + alignedStripe
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
if (countOfNullReaderInfos(readerInfos) < parityBlkNum) {
clearFutures();
throw new IOException(alignedStripe.missingChunksNum
+ " missing blocks, the stripe is: " + alignedStripe
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
}
}
}

private int countOfNullReaderInfos(BlockReaderInfo[] blockreaderInfos) {
if (blockreaderInfos == null) {
return 0;
}
int count = 0;
for (int i = 0; i < blockreaderInfos.length; i++) {
if (blockreaderInfos[i] == null) {
count++;
}
}
return count;
}

/**
Expand All @@ -187,6 +202,16 @@ private void checkMissingBlocks() throws IOException {
*/
private void readDataForDecoding() throws IOException {
prepareDecodeInputs();

if (alignedStripe.missingChunksNum > parityBlkNum) {
for (int index = 0; index < dataBlkNum; index++) {
if (readerInfos[index] == null) {
alignedStripe.chunks[index].state = StripingChunk.REQUESTED;
alignedStripe.missingChunksNum--;
}
}
}

for (int i = 0; i < dataBlkNum; i++) {
Preconditions.checkNotNull(alignedStripe.chunks[i]);
if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
Expand All @@ -199,6 +224,17 @@ private void readDataForDecoding() throws IOException {
}

void readParityChunks(int num) throws IOException {
if (alignedStripe.missingChunksNum > parityBlkNum) {
for (int index = dataBlkNum; index < dataBlkNum + parityBlkNum; index++) {
if (readerInfos[index] == null) {
if (alignedStripe.chunks[index] != null) {
alignedStripe.chunks[index].state = StripingChunk.REQUESTED;
alignedStripe.missingChunksNum--;
}
}
}
}

for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
i++) {
if (alignedStripe.chunks[i] == null) {
Expand Down Expand Up @@ -332,7 +368,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
}

/**
* read the whole stripe. do decoding if necessary
* Read the whole stripe. do decoding if necessary.
*/
void readStripe() throws IOException {
try {
Expand All @@ -349,7 +385,7 @@ void readStripe() throws IOException {
if (alignedStripe.missingChunksNum > 0) {
checkMissingBlocks();
readDataForDecoding();
// read parity chunks
// Read parity chunks.
readParityChunks(alignedStripe.missingChunksNum);
}
} catch (IOException e) {
Expand All @@ -359,7 +395,7 @@ void readStripe() throws IOException {
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks

// Input buffers for potential decode operation, which remains null until
// first read failure
// first read failure.
while (!futures.isEmpty()) {
try {
long beginReadMS = Time.monotonicNow();
Expand All @@ -378,14 +414,20 @@ void readStripe() throws IOException {
returnedChunk.state = StripingChunk.FETCHED;
alignedStripe.fetchedChunksNum++;
updateState4SuccessRead(r);
dfsStripedInputStream.setRetryCurrentReaderFlags(r.index, true);
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
clearFutures();
break;
}
} else {
returnedChunk.state = StripingChunk.MISSING;
// close the corresponding reader
dfsStripedInputStream.closeReader(readerInfos[r.index]);
// Close the corresponding reader.
dfsStripedInputStream.closeReader(readerInfos[r.index], r.index);
boolean originalRetryFlag = dfsStripedInputStream.getRetryCurrentReaderFlags(r.index);
if (originalRetryFlag) {
dfsStripedInputStream.setRetryCurrentReaderFlags(r.index, false);
readerInfos[r.index] = null;
}

final int missing = alignedStripe.missingChunksNum;
alignedStripe.missingChunksNum++;
Expand All @@ -399,7 +441,7 @@ void readStripe() throws IOException {
DFSClient.LOG.error(err, ie);
dfsStripedInputStream.close();
clearFutures();
// Don't decode if read interrupted
// Don't decode if read interrupted.
throw new InterruptedIOException(err);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
Expand Down Expand Up @@ -735,4 +737,86 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
assertEquals(rangesExpected, ranges);
}

@Test
public void testStatefulReadAfterLongTimeIdle() throws Exception {
HdfsConfiguration hdfsConf = new HdfsConfiguration();
hdfsConf.setInt("dfs.datanode.socket.write.timeout", 5000);
hdfsConf.setInt("dfs.client.socket-timeout", 5000);
String testBaseDir = "/testECRead";
String testfileName = "testfile";
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(9).build()) {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
Path dir = new Path(testBaseDir);
assertTrue(dfs.mkdirs(dir));
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());

int writeBufSize = 30 * 1024 * 1024 + 1;
byte[] writeBuf = new byte[writeBufSize];
try (FSDataOutputStream fsdos = dfs.create(
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
Random random = new Random();
random.nextBytes(writeBuf);
fsdos.write(writeBuf, 0, writeBuf.length);
Thread.sleep(2000);
}

byte[] readBuf = new byte[6 * 1024 * 1024];
try (FSDataInputStream fsdis = dfs.open(
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
fsdis.read(readBuf);
Thread.sleep(6 * 1000);
while ((fsdis.read(readBuf)) > 0) {
Thread.sleep(6 * 1000);
}
}
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
}
}

@Test
public void testPReadAfterLongTimeIdle() throws Exception {
HdfsConfiguration hdfsConf = new HdfsConfiguration();
hdfsConf.setInt("dfs.datanode.socket.write.timeout", 5000);
hdfsConf.setInt("dfs.client.socket-timeout", 5000);
String testBaseDir = "/testECRead";
String testfileName = "testfile";
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
.numDataNodes(9).build()) {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
Path dir = new Path(testBaseDir);
assertTrue(dfs.mkdirs(dir));
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());

int writeBufSize = 30 * 1024 * 1024 + 1;
byte[] writeBuf = new byte[writeBufSize];
try (FSDataOutputStream fsdos = dfs.create(
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
Random random = new Random();
random.nextBytes(writeBuf);
fsdos.write(writeBuf, 0, writeBuf.length);
Thread.sleep(2000);
}

byte[] readBuf = new byte[6 * 1024 * 1024];
try (FSDataInputStream fsdis = dfs.open(
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
int curPos = 0;
int readLen = fsdis.read(curPos, readBuf, 0, readBuf.length);
curPos += readLen;
Thread.sleep(6 * 1000);
while ((readLen = fsdis.read(curPos, readBuf, 0, readBuf.length)) > 0) {
curPos += readLen;
Thread.sleep(6 * 1000);
}
}
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
}
}
}