Skip to content

Commit 15af529

Browse files
HDFS-17358. EC: infinite lease recovery caused by the length of RWR equals to zero or datanode does not have the replica. (#6509). Contributed by farmmamba.
Reviewed-by: Tao Li <tomscut@apache.org> Reviewed-by: Haiyang Hu <haiyang.hu@shopee.com> Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
1 parent a897e74 commit 15af529

File tree

3 files changed

+66
-11
lines changed

3 files changed

+66
-11
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,8 @@ protected void recover() throws IOException {
386386
Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
387387
final int dataBlkNum = ecPolicy.getNumDataUnits();
388388
final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
389+
int zeroLenReplicaCnt = 0;
390+
int dnNotHaveReplicaCnt = 0;
389391
//check generation stamps
390392
for (int i = 0; i < locs.length; i++) {
391393
DatanodeID id = locs[i];
@@ -419,10 +421,14 @@ protected void recover() throws IOException {
419421
if (info == null) {
420422
LOG.debug("Block recovery: DataNode: {} does not have " +
421423
"replica for block: (block={}, internalBlk={})", id, block, internalBlk);
424+
dnNotHaveReplicaCnt++;
422425
} else {
423426
LOG.debug("Block recovery: Ignored replica with invalid "
424427
+ "generation stamp or length: {} from DataNode: {} by block: {}",
425428
info, id, block);
429+
if (info.getNumBytes() == 0) {
430+
zeroLenReplicaCnt++;
431+
}
426432
}
427433
}
428434
} catch (RecoveryInProgressException ripE) {
@@ -436,9 +442,18 @@ protected void recover() throws IOException {
436442
"datanode={})", block, internalBlk, id, e);
437443
}
438444
}
439-
checkLocations(syncBlocks.size());
440445

441-
final long safeLength = getSafeLength(syncBlocks);
446+
final long safeLength;
447+
if (dnNotHaveReplicaCnt + zeroLenReplicaCnt <= locs.length - ecPolicy.getNumDataUnits()) {
448+
checkLocations(syncBlocks.size());
449+
safeLength = getSafeLength(syncBlocks);
450+
} else {
451+
safeLength = 0;
452+
LOG.warn("Block recovery: {} datanodes do not have the replica of block {}." +
453+
" {} datanodes have zero-length replica. Will remove this block.",
454+
dnNotHaveReplicaCnt, block, zeroLenReplicaCnt);
455+
}
456+
442457
LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
443458
block.getNumBytes(), safeLength, syncBlocks);
444459

@@ -452,11 +467,13 @@ protected void recover() throws IOException {
452467
rurList.add(r);
453468
}
454469
}
455-
assert rurList.size() >= dataBlkNum : "incorrect safe length";
456470

457-
// Recovery the striped block by truncating internal blocks to the safe
458-
// length. Abort if there is any failure in this step.
459-
truncatePartialBlock(rurList, safeLength);
471+
if (safeLength > 0) {
472+
Preconditions.checkArgument(rurList.size() >= dataBlkNum, "incorrect safe length");
473+
// Recovery the striped block by truncating internal blocks to the safe
474+
// length. Abort if there is any failure in this step.
475+
truncatePartialBlock(rurList, safeLength);
476+
}
460477

461478
// notify Namenode the new size and locations
462479
final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
@@ -469,11 +486,20 @@ protected void recover() throws IOException {
469486
int index = (int) (r.rInfo.getBlockId() &
470487
HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
471488
newLocs[index] = r.id;
472-
newStorages[index] = r.storageID;
489+
if (r.storageID != null) {
490+
newStorages[index] = r.storageID;
491+
}
473492
}
474493
ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
475494
safeLength, recoveryId);
476495
DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
496+
if (safeLength == 0) {
497+
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
498+
newBlock.getNumBytes(), true, true, newLocs, newStorages);
499+
LOG.info("After block recovery, the length of new block is 0. " +
500+
"Will remove this block: {} from file.", newBlock);
501+
return;
502+
}
477503
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
478504
newBlock.getNumBytes(), true, false, newLocs, newStorages);
479505
}
@@ -527,8 +553,8 @@ long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
527553
private void checkLocations(int locationCount)
528554
throws IOException {
529555
if (locationCount < ecPolicy.getNumDataUnits()) {
530-
throw new IOException(block + " has no enough internal blocks" +
531-
", unable to start recovery. Locations=" + Arrays.asList(locs));
556+
throw new IOException(block + " has no enough internal blocks(current: " + locationCount +
557+
"), unable to start recovery. Locations=" + Arrays.asList(locs));
532558
}
533559
}
534560
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ private BlockReader createBlockReader(long offsetInBlock) {
133133
block.getNumBytes() - offsetInBlock, true, "", peer, source,
134134
null, stripedReader.getCachingStrategy(), -1, conf);
135135
} catch (IOException e) {
136-
LOG.info("Exception while creating remote block reader, datanode {}",
137-
source, e);
136+
LOG.info("Exception while creating remote block reader for {}, datanode {}",
137+
block, source, e);
138138
IOUtils.closeStream(peer);
139139
return null;
140140
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecoveryStriped.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,35 @@ public void testSafeLength() {
259259
checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB
260260
}
261261

262+
/**
263+
* 1. Write 1MB data, then flush it.
264+
* 2. Mock client quiet exceptionally.
265+
* 3. Trigger lease recovery.
266+
* 4. Lease recovery successfully.
267+
*/
268+
@Test
269+
public void testLeaseRecoveryWithManyZeroLengthReplica() {
270+
int curCellSize = (int)1024 * 1024;
271+
try {
272+
final FSDataOutputStream out = dfs.create(p);
273+
final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
274+
.getWrappedStream();
275+
for (int pos = 0; pos < curCellSize; pos++) {
276+
out.write(StripedFileTestUtil.getByte(pos));
277+
}
278+
for (int i = 0; i < dataBlocks + parityBlocks; i++) {
279+
StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
280+
waitStreamerAllAcked(s);
281+
stopBlockStream(s);
282+
}
283+
recoverLease();
284+
LOG.info("Trigger recover lease manually successfully.");
285+
} catch (Throwable e) {
286+
String msg = "failed testCase" + StringUtils.stringifyException(e);
287+
Assert.fail(msg);
288+
}
289+
}
290+
262291
private void checkSafeLength(int blockLength, long expectedSafeLength) {
263292
int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength,
264293
blockLength, blockLength};

0 commit comments

Comments
 (0)