Skip to content

Commit cdd2780

Browse files
committed
HDFS-17801. EC supports createBlockReader retry when IOException occurs occasionally.
1 parent 636d822 commit cdd2780

File tree

6 files changed

+52
-7
lines changed

6 files changed

+52
-7
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public void delayWhenRenewLeaseTimeout() {}
7272
public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
7373

7474
public void failCreateBlockReader() throws InvalidBlockTokenException {}
75+
76+
public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {}
7577

76-
public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException {};
78+
public void failCreateBlockReader(int curRetryCounts) throws IOException {}
7779
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,12 +234,12 @@ private long getOffsetInBlockGroup(long pos) {
234234

235235
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
236236
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
237-
int chunkIndex, long readTo) throws IOException {
237+
int chunkIndex, long readTo, int readDNMaxRetryCounts) throws IOException {
238238
BlockReader reader = null;
239239
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
240240
DFSInputStream.DNAddrPair dnInfo =
241241
new DFSInputStream.DNAddrPair(null, null, null, null);
242-
242+
int curRetryCounts = 0;
243243
while (true) {
244244
try {
245245
// the cached block location might have been re-fetched, so always
@@ -255,6 +255,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
255255
if (readTo < 0 || readTo > block.getBlockSize()) {
256256
readTo = block.getBlockSize();
257257
}
258+
DFSClientFaultInjector.get().failCreateBlockReader(curRetryCounts);
258259
reader = getBlockReader(block, offsetInBlock,
259260
readTo - offsetInBlock,
260261
dnInfo.addr, dnInfo.storageType, dnInfo.info);
@@ -273,10 +274,17 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
273274
fetchBlockAt(block.getStartOffset());
274275
retry.refetchToken();
275276
} else {
277+
if (curRetryCounts++ < readDNMaxRetryCounts) {
278+
DFSClient.LOG.info("Try to reconnect to {} for block {} for {} time.", dnInfo.addr,
279+
block.getBlock(), curRetryCounts);
280+
// Re-fetch the block in case the block has been moved.
281+
fetchBlockAt(block.getStartOffset());
282+
continue;
283+
}
276284
//TODO: handles connection issues
277-
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
278-
"block" + block.getBlock(), e);
279-
// re-fetch the block in case the block has been moved
285+
DFSClient.LOG.warn("Failed to connect to {} for block {} after retrying {} time.",
286+
dnInfo.addr, block.getBlock(), curRetryCounts, e);
287+
// Re-fetch the block in case the block has been moved
280288
fetchBlockAt(block.getStartOffset());
281289
addToLocalDeadNodes(dnInfo.info);
282290
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ void skip() {
120120
protected final RawErasureDecoder decoder;
121121
protected final DFSStripedInputStream dfsStripedInputStream;
122122
private long readTo = -1;
123+
private final int readDNMaxRetryCounts;
123124

124125
protected ECChunk[] decodeInputs;
125126

@@ -138,6 +139,8 @@ void skip() {
138139
this.corruptedBlocks = corruptedBlocks;
139140
this.decoder = decoder;
140141
this.dfsStripedInputStream = dfsStripedInputStream;
142+
this.readDNMaxRetryCounts = dfsStripedInputStream.getDFSClient()
143+
.getConf().getStripedReadDnMaxRetryCounts();
141144

142145
service = new ExecutorCompletionService<>(
143146
dfsStripedInputStream.getStripedReadsThreadPool());
@@ -309,7 +312,7 @@ boolean readChunk(final LocatedBlock block, int chunkIndex)
309312
if (readerInfos[chunkIndex] == null) {
310313
if (!dfsStripedInputStream.createBlockReader(block,
311314
alignedStripe.getOffsetInBlock(), targetBlocks,
312-
readerInfos, chunkIndex, readTo)) {
315+
readerInfos, chunkIndex, readTo, readDNMaxRetryCounts)) {
313316
chunk.state = StripingChunk.MISSING;
314317
return false;
315318
}

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,14 @@ interface StripedRead {
532532
* span 6 DNs, so this default value accommodates 3 read streams
533533
*/
534534
int THREADPOOL_SIZE_DEFAULT = 18;
535+
536+
/**
537+
* The max retry counts of reconnecting to DN
538+
* during the striped read process, Default is 1.
539+
*/
540+
String DATANODE_MAX_RETRY_COUNT = PREFIX +
541+
"datanode.max.retry.count";
542+
int DATANODE_MAX_RETRY_COUNT_DEFAULT = 1;
535543
}
536544

537545
/** dfs.http.client configuration properties */

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public class DfsClientConf {
167167
private final boolean deadNodeDetectionEnabled;
168168
private final long leaseHardLimitPeriod;
169169
private final boolean recoverLeaseOnCloseException;
170+
private final int stripedReadDnMaxRetryCounts;
170171

171172
public DfsClientConf(Configuration conf) {
172173
// The hdfsTimeout is currently the same as the ipc timeout
@@ -296,6 +297,13 @@ public DfsClientConf(Configuration conf) {
296297
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
297298
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
298299
" must be greater than 0.");
300+
stripedReadDnMaxRetryCounts =
301+
conf.getInt(
302+
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT,
303+
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT_DEFAULT);
304+
Preconditions.checkArgument(stripedReadDnMaxRetryCounts >= 0, "The value of " +
305+
HdfsClientConfigKeys.StripedRead.DATANODE_MAX_RETRY_COUNT +
306+
" must be greater than or equal to 0.");
299307
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
300308

301309
leaseHardLimitPeriod =
@@ -710,6 +718,13 @@ public long getleaseHardLimitPeriod() {
710718
return leaseHardLimitPeriod;
711719
}
712720

721+
/**
722+
* @return the stripedReadDnMaxRetryCounts
723+
*/
724+
public int getStripedReadDnMaxRetryCounts() {
725+
return stripedReadDnMaxRetryCounts;
726+
}
727+
713728
/**
714729
* @return the readUseCachePriority
715730
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4625,6 +4625,15 @@
46254625
</description>
46264626
</property>
46274627

4628+
<property>
4629+
<name>dfs.client.read.striped.datanode.max.retry.count</name>
4630+
<value>1</value>
4631+
<description>
4632+
The max retry counts of reconnecting to DN during
4633+
the striped read process, Default is 1.
4634+
</description>
4635+
</property>
4636+
46284637
<property>
46294638
<name>dfs.client.replica.accessor.builder.classes</name>
46304639
<value></value>

0 commit comments

Comments
 (0)