Skip to content

Commit 23d0e6a

Browse files
committed
HDFS-17801. EC supports createBlockReader retry when IOException occurs occasionally.
1 parent 6eae158 commit 23d0e6a

File tree

7 files changed

+98
-5
lines changed

7 files changed

+98
-5
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import java.io.IOException;
2021
import java.util.concurrent.atomic.AtomicLong;
2122

2223
import org.apache.hadoop.classification.VisibleForTesting;
@@ -71,4 +72,6 @@ public void delayWhenRenewLeaseTimeout() {}
7172
public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
7273

7374
public void failCreateBlockReader() throws InvalidBlockTokenException {}
75+
76+
public void failCreateBlockReader(int curAttempt) throws IOException {}
7477
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,12 +234,12 @@ private long getOffsetInBlockGroup(long pos) {
234234

235235
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
236236
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
237-
int chunkIndex, long readTo) throws IOException {
237+
int chunkIndex, long readTo, int readDNMaxAttempts) throws IOException {
238238
BlockReader reader = null;
239239
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
240240
DFSInputStream.DNAddrPair dnInfo =
241241
new DFSInputStream.DNAddrPair(null, null, null, null);
242-
242+
int curAttempts = 0;
243243
while (true) {
244244
try {
245245
// the cached block location might have been re-fetched, so always
@@ -255,6 +255,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
255255
if (readTo < 0 || readTo > block.getBlockSize()) {
256256
readTo = block.getBlockSize();
257257
}
258+
DFSClientFaultInjector.get().failCreateBlockReader(curAttempts);
258259
reader = getBlockReader(block, offsetInBlock,
259260
readTo - offsetInBlock,
260261
dnInfo.addr, dnInfo.storageType, dnInfo.info);
@@ -273,9 +274,14 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
273274
fetchBlockAt(block.getStartOffset());
274275
retry.refetchToken();
275276
} else {
277+
if (curAttempts++ < readDNMaxAttempts) {
278+
DFSClient.LOG.info("Try to reconnect to {} for block {}, retry {} time.", dnInfo.addr,
279+
block.getBlock(), curAttempts);
280+
continue;
281+
}
276282
//TODO: handles connection issues
277-
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
278-
"block" + block.getBlock(), e);
283+
DFSClient.LOG.warn("Failed to connect to {} for block {} after {} time.", dnInfo.addr,
284+
block.getBlock(), curAttempts, e);
279285
// re-fetch the block in case the block has been moved
280286
fetchBlockAt(block.getStartOffset());
281287
addToLocalDeadNodes(dnInfo.info);

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ void skip() {
120120
protected final RawErasureDecoder decoder;
121121
protected final DFSStripedInputStream dfsStripedInputStream;
122122
private long readTo = -1;
123+
private final int readDNMaxAttempts;
123124

124125
protected ECChunk[] decodeInputs;
125126

@@ -138,6 +139,8 @@ void skip() {
138139
this.corruptedBlocks = corruptedBlocks;
139140
this.decoder = decoder;
140141
this.dfsStripedInputStream = dfsStripedInputStream;
142+
this.readDNMaxAttempts = dfsStripedInputStream.getDFSClient()
143+
.getConf().getStripedReadDnMaxAttempts();
141144

142145
service = new ExecutorCompletionService<>(
143146
dfsStripedInputStream.getStripedReadsThreadPool());
@@ -309,7 +312,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
309312
if (readerInfos[chunkIndex] == null) {
310313
if (!dfsStripedInputStream.createBlockReader(block,
311314
alignedStripe.getOffsetInBlock(), targetBlocks,
312-
readerInfos, chunkIndex, readTo)) {
315+
readerInfos, chunkIndex, readTo, readDNMaxAttempts)) {
313316
chunk.state = StripingChunk.MISSING;
314317
return false;
315318
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,14 @@ interface StripedRead {
532532
* span 6 DNs, so this default value accommodates 3 read streams
533533
*/
534534
int THREADPOOL_SIZE_DEFAULT = 18;
535+
536+
/**
537+
* The max retry counts of reconnecting to DN
538+
* during the striped read process, Default is 1.
539+
*/
540+
String DATANODE_MAX_RETRY_COUNT = PREFIX +
541+
"datanode.max.retry.count";
542+
int DATANODE_MAX_RETRY_COUNT_DEFAULT = 1;
535543
}
536544

537545
/** dfs.http.client configuration properties */

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public class DfsClientConf {
167167
private final boolean deadNodeDetectionEnabled;
168168
private final long leaseHardLimitPeriod;
169169
private final boolean recoverLeaseOnCloseException;
170+
private final int stripedReadDnMaxAttempts;
170171

171172
public DfsClientConf(Configuration conf) {
172173
// The hdfsTimeout is currently the same as the ipc timeout
@@ -296,6 +297,13 @@ public DfsClientConf(Configuration conf) {
296297
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
297298
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
298299
" must be greater than 0.");
300+
stripedReadDnMaxAttempts =
301+
conf.getInt(
302+
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT,
303+
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT_DEFAULT);
304+
Preconditions.checkArgument(stripedReadDnMaxAttempts >= 0, "The value of " +
305+
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT +
306+
" must be greater than or equal to 0.");
299307
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
300308

301309
leaseHardLimitPeriod =
@@ -710,6 +718,13 @@ public long getleaseHardLimitPeriod() {
710718
return leaseHardLimitPeriod;
711719
}
712720

721+
/**
722+
* @return the stripedReadDnMaxAttempts
723+
*/
724+
public int getStripedReadDnMaxAttempts() {
725+
return stripedReadDnMaxAttempts;
726+
}
727+
713728
/**
714729
* @return the readUseCachePriority
715730
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4625,6 +4625,15 @@
46254625
</description>
46264626
</property>
46274627

4628+
<property>
4629+
<name>dfs.client.read.striped.datanode.max.retry.count</name>
4630+
<value>1</value>
4631+
<description>
4632+
The max retry counts of reconnecting to DN during
4633+
the striped read process, Default is 1.
4634+
</description>
4635+
</property>
4636+
46284637
<property>
46294638
<name>dfs.client.replica.accessor.builder.classes</name>
46304639
<value></value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.hadoop.hdfs;
1919

20+
import org.apache.hadoop.fs.FSDataOutputStream;
21+
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -50,6 +52,7 @@
5052
import java.util.Arrays;
5153
import java.util.Collections;
5254
import java.util.List;
55+
import java.util.Random;
5356

5457
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
5558
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -735,4 +738,50 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
735738
assertEquals(rangesExpected, ranges);
736739
}
737740

741+
@Test
742+
public void testStatefulReadAfterLongTimeIdle() throws Exception {
743+
HdfsConfiguration hdfsConf = new HdfsConfiguration();
744+
// Set retry count to 0 to reproduce the issue.
745+
hdfsConf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT, 1);
746+
String testBaseDir = "/testECRead";
747+
String testfileName = "testfile";
748+
DFSClientFaultInjector old = DFSClientFaultInjector.get();
749+
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
750+
.numDataNodes(9).build()) {
751+
cluster.waitActive();
752+
final DistributedFileSystem dfs = cluster.getFileSystem();
753+
Path dir = new Path(testBaseDir);
754+
assertTrue(dfs.mkdirs(dir));
755+
dfs.enableErasureCodingPolicy("RS-6-3-1024k");
756+
dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k");
757+
assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName());
758+
759+
int writeBufSize = 30 * 1024 * 1024 + 1;
760+
byte[] writeBuf = new byte[writeBufSize];
761+
try (FSDataOutputStream fsdos = dfs.create(
762+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
763+
Random random = new Random();
764+
random.nextBytes(writeBuf);
765+
fsdos.write(writeBuf, 0, writeBuf.length);
766+
Thread.sleep(2000);
767+
}
768+
769+
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
770+
public void failCreateBlockReader(int curAttempt) throws IOException {
771+
if (0 == curAttempt) {
772+
throw new IOException("Mock IOException when first create blockread.");
773+
}
774+
}
775+
});
776+
777+
byte[] readBuf = new byte[6 * 1024 * 1024];
778+
try (FSDataInputStream fsdis = dfs.open(
779+
new Path(testBaseDir + Path.SEPARATOR + testfileName))) {
780+
while ((fsdis.read(readBuf)) > 0) {}
781+
}
782+
assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true));
783+
} finally {
784+
DFSClientFaultInjector.set(old);
785+
}
786+
}
738787
}

0 commit comments

Comments
 (0)